1use serde_json::Value;
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use tokio::io::{AsyncRead, AsyncWrite};
6use tokio::net::TcpListener;
7use tokio_tungstenite::{accept_hdr_async, connect_async, tungstenite::protocol::Message};
8use tokio_tungstenite::tungstenite::handshake::server::{Request, Response, ErrorResponse};
9use futures_util::{SinkExt, StreamExt};
10use serde_json::json;
11use url::Url;
12use anyhow::{Result};
13use easy_trace::prelude::{debug, error, info};
14use sharing::utils::ConfigEnv;
15use crate::MyPOSService;
16use crate::prelude::{BaseService};
17use crate::prelude::CashGuardService;
18use crate::prelude::PaytefService;
19use crate::prelude::PrintService;
20use crate::prelude::RfidService;
21use crate::prelude::ScaleService;
22use crate::prelude::{Service, WebSocketWrite};
23
24const LOCAL_TLS_CERT: &[u8] = include_bytes!("../../../resources/local_unpispas.crt");
30const LOCAL_TLS_KEY: &[u8] = include_bytes!("../../../resources/local_unpispas.key");
31
32
33
34pub async fn load_services(config: ConfigEnv) -> HashMap<String, Arc<dyn Service>> {
39 let mut services: HashMap<String, Arc<dyn Service>> = HashMap::new();
40
41 let required_services = std::env::var("MODULES")
43 .unwrap_or_else(|_| "base,print,scale,rfid".to_string());
44
45
46 info!("Loading persistence module: cache");
47 let mut persistence_service = crate::persistence::PersistenceService::new();
48 if let Err(e) = persistence_service.initialize().await {
49 error!("Failed to initialize persistence service: {}", e);
50 }
51 services.insert("cache".to_string(), Arc::new(persistence_service));
52 for service_name in required_services.split(',') {
54 match service_name.trim() {
55 "base" => {
56 services.insert("base".to_string(), Arc::new(BaseService::new()));
57 }
58 "print" => {
59 let print_service = PrintService::new(config.clone()).await;
60 services.insert("print".to_string(), Arc::new(print_service));
61 }
62 "scale" => {
63 services.insert("scale".to_string(), Arc::new(ScaleService::new()));
64 }
65 "rfid" => {
66 services.insert("rfid".to_string(), Arc::new(RfidService::new()));
67 }
68 "cashguard" => {
69 services.insert("cashguard".to_string(), Arc::new(CashGuardService::new("localhost", 8080, "root", "admin")));
70 }
71 "paytef" => {
72 services.insert("paytef".to_string(), Arc::new(PaytefService::new()));
73 }
74 "mypos" => {
75 services.insert("mypos".to_string(), Arc::new(MyPOSService::new()));
76 }
77 "commandViewer" => {
78 std::thread::spawn(|| {
79 let rt = tokio::runtime::Runtime::new().unwrap();
80 rt.block_on(async {
81 if let Err(e) = crate::command_viewer::start_kitchen_server().await {
82 eprintln!("Failed to start Actix server: {:?}", e);
83 }
84 });
85 });
86
87 services.insert("commandViewer".to_string(), Arc::new(crate::command_viewer::CommandViewerService::new()));
88 }
89 "OrderKitchen" => {
90 services.insert("OrderKitchen".to_string(), Arc::new(crate::order_kitchen::OrderKitchenService::new()));
91 }
92 _ => {
93 error!("Unknown service: {}", service_name.trim());
94 }
95 }
96 }
97 info!("Loaded services: {:?}", services.keys().collect::<Vec<_>>());
98 services
99}
100
101fn is_origin_allowed(origin: &str) -> bool {
117 if origin == "null" || is_loopback_origin(origin) {
118 return true;
119 }
120 sharing::PNA_ALLOWED_ORIGINS
121 .iter()
122 .any(|pattern| match_origin_pattern(pattern, origin))
123}
124
125fn is_loopback_origin(origin: &str) -> bool {
129 let (scheme, rest) = match origin.split_once("://") {
130 Some(v) => v,
131 None => return false,
132 };
133 if scheme != "http" && scheme != "https" {
134 return false;
135 }
136 let host = if let Some(after_bracket) = rest.strip_prefix('[') {
139 match after_bracket.find(']') {
140 Some(end) => &rest[..end + 2], None => return false,
142 }
143 } else {
144 rest.split(':').next().unwrap_or(rest)
145 };
146 host == "localhost" || host == "127.0.0.1" || host == "[::1]"
147}
148
149fn match_origin_pattern(pattern: &str, origin: &str) -> bool {
154 let (p_scheme, p_rest) = match pattern.split_once("://") {
156 Some(v) => v,
157 None => return false,
158 };
159 let (o_scheme, o_rest) = match origin.split_once("://") {
160 Some(v) => v,
161 None => return false,
162 };
163 if p_scheme != o_scheme {
164 return false;
165 }
166
167 let (p_host, p_port) = p_rest.split_once(':').unwrap_or((p_rest, ""));
169 let (o_host, o_port) = o_rest.split_once(':').unwrap_or((o_rest, ""));
170
171 let port_ok = match p_port {
173 "" => o_port.is_empty(),
174 "*" => true,
175 _ => p_port == o_port,
176 };
177 if !port_ok {
178 return false;
179 }
180
181 if let Some(suffix) = p_host.strip_prefix("*.") {
184 o_host == suffix || o_host.ends_with(&format!(".{}", suffix))
185 } else {
186 p_host == o_host
187 }
188}
189
190#[cfg(test)]
191mod origin_tests {
192 use super::*;
193
194 #[test]
195 fn loopback_exact_hosts_allowed() {
196 assert!(is_loopback_origin("http://localhost"));
197 assert!(is_loopback_origin("https://localhost:3000"));
198 assert!(is_loopback_origin("http://127.0.0.1"));
199 assert!(is_loopback_origin("http://[::1]"));
200 }
201
202 #[test]
203 fn loopback_lookalikes_rejected() {
204 assert!(!is_loopback_origin("http://localhost.evil.com"));
206 assert!(!is_loopback_origin("http://127.0.0.1.evil.com"));
207 assert!(!is_loopback_origin("ws://localhost"));
208 assert!(!is_loopback_origin("ftp://localhost"));
209 }
210
211 #[test]
212 fn pattern_subdomain_matches_apex_and_children() {
213 assert!(match_origin_pattern(
214 "https://*.unpispas.es",
215 "https://unpispas.es"
216 ));
217 assert!(match_origin_pattern(
218 "https://*.unpispas.es",
219 "https://app.unpispas.es"
220 ));
221 assert!(match_origin_pattern(
222 "https://*.unpispas.es",
223 "https://a.b.unpispas.es"
224 ));
225 }
226
227 #[test]
228 fn pattern_wildcard_port() {
229 assert!(match_origin_pattern(
230 "https://*.unpispas.es:*",
231 "https://app.unpispas.es:8443"
232 ));
233 assert!(match_origin_pattern(
234 "https://*.unpispas.es:*",
235 "https://app.unpispas.es"
236 ));
237 }
238
239 #[test]
240 fn pattern_exact_port_enforced() {
241 assert!(match_origin_pattern(
242 "https://app.example.com:443",
243 "https://app.example.com:443"
244 ));
245 assert!(!match_origin_pattern(
246 "https://app.example.com:443",
247 "https://app.example.com:8080"
248 ));
249 }
250
251 #[test]
252 fn pattern_scheme_mismatch_rejected() {
253 assert!(!match_origin_pattern(
254 "https://*.unpispas.es",
255 "http://app.unpispas.es"
256 ));
257 }
258
259 #[test]
260 fn non_pna_origin_not_allowed() {
261 assert!(!is_origin_allowed("https://evil.com"));
262 assert!(!is_origin_allowed("https://unpispas.es.evil.com"));
263 }
264}
265
266fn build_tls_acceptor() -> std::result::Result<tokio_native_tls::TlsAcceptor, Box<dyn std::error::Error>> {
268 let identity = native_tls::Identity::from_pkcs8(LOCAL_TLS_CERT, LOCAL_TLS_KEY)?;
269 let tls = native_tls::TlsAcceptor::builder(identity).build()?;
270 Ok(tokio_native_tls::TlsAcceptor::from(tls))
271}
272
273pub async fn start_local_server(
279 host: &str,
280 port: u16,
281 services: Arc<HashMap<String, Arc<dyn Service>>>,
282 service_name: &str,
283 service_vers: &str,
284 use_tls: bool,
285) -> Result<(), std::io::Error> {
286 let addr = format!("{}:{}", host, port);
287
288 let tls_acceptor = if use_tls {
289 match build_tls_acceptor() {
290 Ok(a) => {
291 info!("TLS enabled for local server (wss://local.unpispas.es:{})", port);
292 Some(a)
293 }
294 Err(e) => {
295 error!("Failed to build TLS acceptor, falling back to plain WS: {}", e);
296 None
297 }
298 }
299 } else {
300 None
301 };
302
303 loop {
304 match TcpListener::bind(&addr).await {
305 Ok(listener) => {
306 let proto = if tls_acceptor.is_some() { "wss" } else { "ws" };
307 info!("Local WebSocket server listening on {} ({}) service name {}", addr, proto, service_name);
308
309 while let Ok((stream, peer)) = listener.accept().await {
310 if let Err(e) = stream.set_nodelay(true) {
311 debug!("set_nodelay (peer={}): {}", peer, e);
312 }
313 let services = Arc::clone(&services);
314 let service_name = service_name.to_string();
315 let service_vers = service_vers.to_string();
316 let tls = tls_acceptor.clone();
317
318 tokio::spawn(async move {
319 let mut first_byte = [0u8; 1];
321 let n = match stream.peek(&mut first_byte).await {
322 Ok(n) => n,
323 Err(e) => {
324 debug!("peek failed (peer={}): {}", peer, e);
325 return;
326 }
327 };
328 if n == 0 {
329 return;
330 }
331
332 let is_tls = first_byte[0] == 0x16;
333 if is_tls {
334 if let Some(acceptor) = tls {
336 match acceptor.accept(stream).await {
337 Ok(tls_stream) => {
338 if let Err(e) = websocket_handler(tls_stream, peer, services, &service_name, &service_vers).await {
339 error!("WSS handler error (peer={}): {}", peer, e);
340 }
341 }
342 Err(e) => {
343 debug!("TLS handshake failed (peer={}): {}", peer, e);
344 }
345 }
346 } else {
347 debug!("Got TLS handshake but TLS disabled (peer={})", peer);
348 }
349 } else {
350 if let Err(e) = websocket_handler(stream, peer, services, &service_name, &service_vers).await {
352 error!("WS handler error (peer={}): {}", peer, e);
353 }
354 }
355 });
356 }
357 }
358 Err(e) => {
359 error!("Failed to bind local server to {}: {}. Retrying in 5 seconds...", addr, e);
360 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
361 }
362 }
363 }
364}
365
366pub async fn process_request(
371 mut request: Value,
372 services: Arc<HashMap<String, Arc<dyn Service>>>,
373 service_name: &str,
374 service_vers: &str,
375 write: WebSocketWrite,
376) -> Value {
377 let uuid = request
379 .get("UUIDV4")
380 .or_else(|| request.get("MESSAGE_UUID"))
381 .cloned()
382 .unwrap_or(Value::Null);
383
384 if let Some(message_data) = request.get("MESSAGE_DATA") {
386 if message_data.is_object() {
387 info!("Normalizing MESSAGE_DATA for remote request");
388 request = message_data.clone();
389
390 if uuid != Value::Null && !request.get("UUIDV4").is_some() {
392 request["UUIDV4"] = uuid.clone();
393 }
394 }
395 }
396
397 if request.get("result").is_some() && request.get("server").is_some(){
399 info!("Received CONNECT response: {:?}", request);
400 return json!({
401 "SERVICE_NAME": service_name,
402 "SERVICE_VERS": service_vers,
403 "MESSAGE_TYPE": "INFO",
404 "MESSAGE_EXEC": "SUCCESS",
405 "MESSAGE_UUID": uuid,
406 "MESSAGE_DATA": "Server connection acknowledged",
407 });
408 }
409
410 if let Some(target) = request.get("TARGET").and_then(Value::as_str) {
412 if let Some(service) = services.get(target.to_lowercase().as_str()) {
413 info!("Running service: {}", target.to_lowercase());
414
415 let (status, response) = service.run(request.clone(), write).await;
417 let message_data = if let Ok(json_value) = serde_json::from_str::<Value>(&response) {
418 json_value
420 } else {
421 Value::String(response)
423 };
424 return json!({
425 "SERVICE_NAME": service_name,
426 "SERVICE_VERS": service_vers,
427 "MESSAGE_TYPE": "RESPONSE",
428 "MESSAGE_EXEC": if status == 0 { "SUCCESS" } else { "FAILURE" },
429 "MESSAGE_UUID": uuid,
430 "MESSAGE_DATA": message_data,
431 });
432 } else {
433 error!("Unknown service: {}", target);
434 return json!({
435 "SERVICE_NAME": service_name,
436 "SERVICE_VERS": service_vers,
437 "MESSAGE_TYPE": "RESPONSE",
438 "MESSAGE_EXEC": "FAILURE",
439 "MESSAGE_UUID": uuid,
440 "MESSAGE_DATA": format!("Unknown service: {}", target),
441 });
442 }
443 }
444
445 if request.get("SERVICE_NAME").is_some() {
446 error!("Missing TARGET in request.");
447 }
448
449
450 json!({
451 "SERVICE_NAME": service_name,
452 "SERVICE_VERS": service_vers,
453 "MESSAGE_TYPE": "RESPONSE",
454 "MESSAGE_EXEC": "FAILURE",
455 "MESSAGE_UUID": uuid,
456 "MESSAGE_DATA": "Missing TARGET in request.",
457 })
458}
459
460pub async fn websocket_handler<S>(
480 stream: S,
481 peer: SocketAddr,
482 services: Arc<HashMap<String, Arc<dyn Service>>>,
483 service_name: &str,
484 service_vers: &str,
485) -> Result<(), Box<dyn std::error::Error>>
486where
487 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
488{
489 let boxed = crate::prelude::BoxedStream(Box::new(stream));
490
491 let pna_callback = |req: &Request, mut response: Response| -> std::result::Result<Response, ErrorResponse> {
494 let origin = req.headers().get("Origin").and_then(|v| v.to_str().ok());
495 match origin {
496 None => {
497 let h = response.headers_mut();
499 h.insert("Access-Control-Allow-Private-Network", "true".parse().unwrap());
500 Ok(response)
501 }
502 Some(o) if is_origin_allowed(o) => {
503 let h = response.headers_mut();
504 h.insert("Access-Control-Allow-Private-Network", "true".parse().unwrap());
505 h.insert("Access-Control-Allow-Origin", o.parse().unwrap());
506 h.insert("Vary", "Origin".parse().unwrap());
507 Ok(response)
508 }
509 Some(o) => {
510 error!("Rejected WS handshake from disallowed origin: {}", o);
511 let mut resp = ErrorResponse::new(Some("Forbidden origin".to_string()));
512 *resp.status_mut() = tokio_tungstenite::tungstenite::http::StatusCode::FORBIDDEN;
513 Err(resp)
514 }
515 }
516 };
517
518 let ws_stream = match accept_hdr_async(boxed, pna_callback).await {
519 Ok(ws) => {
520 debug!("WebSocket handshake OK (peer={})", peer);
521 ws
522 }
523 Err(e) => {
524 error!(
525 "WebSocket handshake failed (peer={}): {} (need RFC6455 WebSocket upgrade, e.g. browser WebSocket API)",
526 peer, e
527 );
528 return Err(e.into());
529 }
530 };
531 let (write, mut read) = ws_stream.split();
532
533 let rw_write = Arc::new(tokio::sync::RwLock::new(write));
535
536 while let Some(msg) = read.next().await {
537 match msg {
538 Ok(Message::Text(text)) => {
539 info!(
540 "Received text message HANDLER: {}",
541 if text.len() > 200 {
542 format!("{}...", &text[..200]) } else {
544 text.clone() }
546 );
547
548 match serde_json::from_str::<serde_json::Value>(&text) {
549 Ok(request) => {
550 let response_message = process_request(
551 request,
552 Arc::clone(&services),
553 service_name,
554 service_vers,
555 Some(Arc::clone(&rw_write)),
556 )
557 .await;
558
559 let mut ws = rw_write.write().await;
560 if let Err(e) = ws.send(Message::Text(response_message.to_string())).await {
561 error!("Failed to send response: {}", e);
562 }
563 }
564 Err(e) => {
565 let error_response = json!({
567 "SERVICE_NAME": service_name,
568 "SERVICE_VERS": service_vers,
569 "MESSAGE_TYPE": "RESPONSE",
570 "MESSAGE_EXEC": "FAILURE",
571 "MESSAGE_DATA": format!("Invalid JSON: {}", e),
572 });
573 let mut ws = rw_write.write().await;
574 if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
575 error!("Failed to send error response: {}", e);
576 }
577 error!("Invalid JSON: {}", e);
578 }
579 }
580 }
581 Err(e) => {
582 let error_response = json!({
583 "SERVICE_NAME": service_name,
584 "SERVICE_VERS": service_vers,
585 "MESSAGE_TYPE": "RESPONSE",
586 "MESSAGE_EXEC": "FAILURE",
587 "MESSAGE_DATA": format!("Invalid JSON: {}", e),
588 });
589 let mut ws = rw_write.write().await;
590 if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
591 error!("Failed to send error response: {}", e);
592 }
593 debug!("Error processing message: {}", e)
594 },
595 _ => {}
596 }
597 }
598
599 Ok(())
600}
601
602pub async fn start_remote_server(
607 remote_host: &str,
608 remote_port: u16,
609 service_name: &str,
610 service_vers: &str,
611 services: Arc<HashMap<String, Arc<dyn Service>>>,
612) -> Result<(), std::io::Error> {
613 loop {
614 match connect_to_remote_server(
615 remote_host,
616 remote_port,
617 service_name,
618 service_vers,
619 Arc::clone(&services),
620 false,
621 )
622 .await
623 {
624 Ok(_) => info!("Successfully connected to remote server."),
625 Err(e) => {
626 error!("Failed to connect to remote server: {:?}", e);
627 info!("Reconnecting in 5 seconds...");
628 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
629 }
630 }
631 }
632}
633
634async fn connect_to_remote_server(
639 remote_host: &str,
640 remote_port: u16,
641 service_name: &str,
642 service_vers: &str,
643 services: Arc<HashMap<String, Arc<dyn Service>>>,
644 return_after_connect: bool,
645) -> Result<(), Box<dyn std::error::Error>> {
646 let protocol = if std::env::var("REMOTE_USSL").unwrap_or_else(|_| "true".to_string()) == "true" {
647 "wss"
648 } else {
649 "ws"
650 };
651
652 let url = format!("{}://{}:{}", protocol, remote_host, remote_port);
653 info!("Connecting to remote WebSocket server at: {}", url);
654
655 let ws_url = Url::parse(&url)?;
656 let (ws_stream, _) = connect_async(ws_url.as_str()).await?;
657 let (mut write, mut read) = ws_stream.split();
658
659 let connect_message = json!({
660 "SERVICE_NAME": service_name,
661 "SERVICE_VERS": sharing::VERSION,
662 "MESSAGE_TYPE": "CONNECT",
663 "MODULE_LIST": services.iter().map(|(name, service)| {
664 json!({
665 "MODULE_NAME": name,
666 "MODULE_VERS": service.get_version(),
667 })
668 }).collect::<Vec<_>>(),
669 });
670
671 write.send(Message::Text(connect_message.to_string())).await?;
672 info!("CONNECT message sent.");
673
674 if return_after_connect {
675 return Ok(());
676 }
677
678 let write_p = Arc::new(tokio::sync::Mutex::new(write));
679 let service_name = service_name.to_owned();
680 let service_vers = service_vers.to_owned();
681
682 let keep_alive_write = Arc::clone(&write_p);
684 let keep_alive_task = tokio::spawn(async move {
685 let mut interval = tokio::time::interval(std::time::Duration::from_secs(25));
686
687 loop {
688 interval.tick().await;
689
690 let mut ws = keep_alive_write.lock().await;
691 if let Err(e) = ws.send(Message::Ping(vec![])).await {
692 error!("Keep-alive failed: {}", e);
693 break;
694 }
695 }
696 });
697
698 let result = async {
700 loop {
701 match tokio::time::timeout(
702 std::time::Duration::from_secs(90),
703 read.next()
704 ).await {
705 Ok(Some(Ok(Message::Text(text)))) => {
706 info!("Received message");
707 if let Ok(request) = serde_json::from_str::<serde_json::Value>(&text) {
708 let response = process_request(
709 request,
710 Arc::clone(&services),
711 &service_name,
712 &service_vers,
713 None,
714 ).await;
715
716 let mut ws = write_p.lock().await;
717 ws.send(Message::Text(response.to_string())).await?;
718 }
719 }
720 Ok(Some(Ok(Message::Ping(ping)))) => {
721 info!("Received PING from server, sending PONG");
722 let mut ws = write_p.lock().await;
723 ws.send(Message::Pong(ping)).await?;
724 }
725 Ok(Some(Ok(Message::Pong(_)))) => {
726 }
728 Ok(Some(Ok(Message::Close(_)))) => {
729 error!("Connection closed by server");
730 break;
731 }
732 Ok(Some(Err(e))) => {
733 error!("WebSocket error: {}", e);
734 break;
735 }
736 Ok(None) => {
737 error!("Connection closed");
738 break;
739 }
740 Err(_) => {
741 error!("No activity in 90 seconds, reconnecting");
742 break;
743 }
744 _ => {}
745 }
746 }
747 Ok::<(), Box<dyn std::error::Error>>(())
748 }.await;
749
750 keep_alive_task.abort();
751 result
752}
753
754pub async fn send_message(write: &WebSocketWrite, message_data: String) {
760 if let Some(ws_lock) = &write {
761 let mut ws = ws_lock.write().await;
762
763 match ws.send(Message::Text(message_data.clone())).await {
764 Ok(_) => {
765 info!("Message sent successfully: {}", message_data);
766 }
767 Err(e) => {
768 error!("Failed to send message: {:?}", e);
769 }
770 }
771 } else {
772 info!("No WebSocket available to send message.");
773 }
774}
775