pispas_modules/
utils.rs

1use serde_json::Value;
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use tokio::io::{AsyncRead, AsyncWrite};
6use tokio::net::TcpListener;
7use tokio_tungstenite::{accept_hdr_async, connect_async, tungstenite::protocol::Message};
8use tokio_tungstenite::tungstenite::handshake::server::{Request, Response, ErrorResponse};
9use futures_util::{SinkExt, StreamExt};
10use serde_json::json;
11use url::Url;
12use anyhow::{Result};
13use easy_trace::prelude::{debug, error, info};
14use sharing::utils::ConfigEnv;
15use crate::MyPOSService;
16use crate::prelude::{BaseService};
17use crate::prelude::CashGuardService;
18use crate::prelude::PaytefService;
19use crate::prelude::PrintService;
20use crate::prelude::RfidService;
21use crate::prelude::ScaleService;
22use crate::prelude::{Service, WebSocketWrite};
23
24/// TLS certificate and private key for `local.unpispas.es`.
25///
26/// Issued by Cloudflare Origin CA and embedded at compile time. The CA root
27/// is imported into the Windows trust store during install, so browsers that
28/// connect to `wss://local.unpispas.es:5005` see a publicly-valid chain.
29const LOCAL_TLS_CERT: &[u8] = include_bytes!("../../../resources/local_unpispas.crt");
30const LOCAL_TLS_KEY: &[u8] = include_bytes!("../../../resources/local_unpispas.key");
31
32
33
34/// Loads services based on the environment configuration.
35///
36/// This function reads the required modules from the `MODULES` environment variable.
37/// It initializes and registers the corresponding services into a `HashMap`.
38pub async fn load_services(config: ConfigEnv) -> HashMap<String, Arc<dyn Service>> {
39    let mut services: HashMap<String, Arc<dyn Service>> = HashMap::new();
40
41    // Get the required services from the environment variable or use a default value.
42    let required_services = std::env::var("MODULES")
43        .unwrap_or_else(|_| "base,print,scale,rfid".to_string());
44
45    
46    info!("Loading persistence module: cache");    
47    let mut persistence_service = crate::persistence::PersistenceService::new();
48    if let Err(e) = persistence_service.initialize().await {
49        error!("Failed to initialize persistence service: {}", e);
50    }
51    services.insert("cache".to_string(), Arc::new(persistence_service));
52    // Iterate through the list of service names and instantiate the appropriate service.
53    for service_name in required_services.split(',') {
54        match service_name.trim() {
55            "base" => {
56                services.insert("base".to_string(), Arc::new(BaseService::new()));
57            }
58            "print" => {
59                let print_service = PrintService::new(config.clone()).await;
60                services.insert("print".to_string(), Arc::new(print_service));
61            }
62            "scale" => {
63                services.insert("scale".to_string(), Arc::new(ScaleService::new()));
64            }
65            "rfid" => {
66                services.insert("rfid".to_string(), Arc::new(RfidService::new()));
67            }
68            "cashguard" => {
69                services.insert("cashguard".to_string(), Arc::new(CashGuardService::new("localhost", 8080, "root", "admin")));
70            }
71            "paytef" => {
72                services.insert("paytef".to_string(), Arc::new(PaytefService::new()));
73            }
74            "mypos" => {
75                services.insert("mypos".to_string(), Arc::new(MyPOSService::new()));
76            }
77            "commandViewer" => {
78                std::thread::spawn(|| {
79                    let rt = tokio::runtime::Runtime::new().unwrap();
80                    rt.block_on(async {
81                        if let Err(e) = crate::command_viewer::start_kitchen_server().await {
82                            eprintln!("Failed to start Actix server: {:?}", e);
83                        }
84                    });
85                });
86                
87                services.insert("commandViewer".to_string(), Arc::new(crate::command_viewer::CommandViewerService::new()));
88            }
89            "OrderKitchen" => {
90                services.insert("OrderKitchen".to_string(), Arc::new(crate::order_kitchen::OrderKitchenService::new()));
91            }
92            _ => {
93                error!("Unknown service: {}", service_name.trim());
94            }
95        }
96    }
97    info!("Loaded services: {:?}", services.keys().collect::<Vec<_>>());
98    services
99}
100
101/// Returns `true` when `origin` matches any pattern in
102/// [`sharing::PNA_ALLOWED_ORIGINS`] or is a loopback origin.
103///
104/// Loopback origins (`http(s)://localhost[:port]`,
105/// `http(s)://127.0.0.1[:port]`, and the special value `"null"`) are
106/// always allowed so devtools, native apps, and file-served pages keep
107/// working. Host comparison is **exact** — `localhost.evil.com` does
108/// **not** count as loopback.
109///
110/// Allow-list patterns support:
111///   * exact match,
112///   * `scheme://*.domain` — wildcard subdomain. Matches both the apex
113///     (`domain`) and any subdomain (`x.domain`, `x.y.domain`). This is
114///     intentional so a single pattern covers the root and its children.
115///   * `scheme://host:*` — wildcard port.
116fn is_origin_allowed(origin: &str) -> bool {
117    if origin == "null" || is_loopback_origin(origin) {
118        return true;
119    }
120    sharing::PNA_ALLOWED_ORIGINS
121        .iter()
122        .any(|pattern| match_origin_pattern(pattern, origin))
123}
124
125/// Exact-host loopback check. Parses the origin and compares scheme + host
126/// against `localhost` / `127.0.0.1` / `[::1]` so sneaky hosts like
127/// `localhost.evil.com` do not slip through.
128fn is_loopback_origin(origin: &str) -> bool {
129    let (scheme, rest) = match origin.split_once("://") {
130        Some(v) => v,
131        None => return false,
132    };
133    if scheme != "http" && scheme != "https" {
134        return false;
135    }
136    // IPv6 origins are bracketed: `http://[::1]:port`. Strip the host piece
137    // carefully so the colons inside the brackets do not confuse us.
138    let host = if let Some(after_bracket) = rest.strip_prefix('[') {
139        match after_bracket.find(']') {
140            Some(end) => &rest[..end + 2], // include "[" and "]"
141            None => return false,
142        }
143    } else {
144        rest.split(':').next().unwrap_or(rest)
145    };
146    host == "localhost" || host == "127.0.0.1" || host == "[::1]"
147}
148
149/// Matches a single `scheme://host[:port]` origin against a pattern.
150///
151/// See [`is_origin_allowed`] for the pattern grammar. Pure string work,
152/// no regex, no allocations except `format!` in the subdomain branch.
153fn match_origin_pattern(pattern: &str, origin: &str) -> bool {
154    // Split scheme://host[:port]
155    let (p_scheme, p_rest) = match pattern.split_once("://") {
156        Some(v) => v,
157        None => return false,
158    };
159    let (o_scheme, o_rest) = match origin.split_once("://") {
160        Some(v) => v,
161        None => return false,
162    };
163    if p_scheme != o_scheme {
164        return false;
165    }
166
167    // Split host from port
168    let (p_host, p_port) = p_rest.split_once(':').unwrap_or((p_rest, ""));
169    let (o_host, o_port) = o_rest.split_once(':').unwrap_or((o_rest, ""));
170
171    // Port: "*" matches any, "" matches only empty, exact otherwise
172    let port_ok = match p_port {
173        "" => o_port.is_empty(),
174        "*" => true,
175        _ => p_port == o_port,
176    };
177    if !port_ok {
178        return false;
179    }
180
181    // Host: "*.foo.com" matches both the apex "foo.com" and any subdomain
182    // ("x.foo.com", "x.y.foo.com"). Exact patterns match only the same host.
183    if let Some(suffix) = p_host.strip_prefix("*.") {
184        o_host == suffix || o_host.ends_with(&format!(".{}", suffix))
185    } else {
186        p_host == o_host
187    }
188}
189
190#[cfg(test)]
191mod origin_tests {
192    use super::*;
193
194    #[test]
195    fn loopback_exact_hosts_allowed() {
196        assert!(is_loopback_origin("http://localhost"));
197        assert!(is_loopback_origin("https://localhost:3000"));
198        assert!(is_loopback_origin("http://127.0.0.1"));
199        assert!(is_loopback_origin("http://[::1]"));
200    }
201
202    #[test]
203    fn loopback_lookalikes_rejected() {
204        // The bug Copilot reported must not regress.
205        assert!(!is_loopback_origin("http://localhost.evil.com"));
206        assert!(!is_loopback_origin("http://127.0.0.1.evil.com"));
207        assert!(!is_loopback_origin("ws://localhost"));
208        assert!(!is_loopback_origin("ftp://localhost"));
209    }
210
211    #[test]
212    fn pattern_subdomain_matches_apex_and_children() {
213        assert!(match_origin_pattern(
214            "https://*.unpispas.es",
215            "https://unpispas.es"
216        ));
217        assert!(match_origin_pattern(
218            "https://*.unpispas.es",
219            "https://app.unpispas.es"
220        ));
221        assert!(match_origin_pattern(
222            "https://*.unpispas.es",
223            "https://a.b.unpispas.es"
224        ));
225    }
226
227    #[test]
228    fn pattern_wildcard_port() {
229        assert!(match_origin_pattern(
230            "https://*.unpispas.es:*",
231            "https://app.unpispas.es:8443"
232        ));
233        assert!(match_origin_pattern(
234            "https://*.unpispas.es:*",
235            "https://app.unpispas.es"
236        ));
237    }
238
239    #[test]
240    fn pattern_exact_port_enforced() {
241        assert!(match_origin_pattern(
242            "https://app.example.com:443",
243            "https://app.example.com:443"
244        ));
245        assert!(!match_origin_pattern(
246            "https://app.example.com:443",
247            "https://app.example.com:8080"
248        ));
249    }
250
251    #[test]
252    fn pattern_scheme_mismatch_rejected() {
253        assert!(!match_origin_pattern(
254            "https://*.unpispas.es",
255            "http://app.unpispas.es"
256        ));
257    }
258
259    #[test]
260    fn non_pna_origin_not_allowed() {
261        assert!(!is_origin_allowed("https://evil.com"));
262        assert!(!is_origin_allowed("https://unpispas.es.evil.com"));
263    }
264}
265
266/// Builds a TLS acceptor from the embedded cert + key.
267fn build_tls_acceptor() -> std::result::Result<tokio_native_tls::TlsAcceptor, Box<dyn std::error::Error>> {
268    let identity = native_tls::Identity::from_pkcs8(LOCAL_TLS_CERT, LOCAL_TLS_KEY)?;
269    let tls = native_tls::TlsAcceptor::builder(identity).build()?;
270    Ok(tokio_native_tls::TlsAcceptor::from(tls))
271}
272
273/// Starts a local WebSocket server (plain WS or WSS depending on `use_tls`).
274///
275/// When `use_tls` is true the server presents the embedded `local.unpispas.es`
276/// certificate so browsers can connect via `wss://local.unpispas.es:<port>`
277/// without the Chrome *Local Network Access* prompt.
278pub async fn start_local_server(
279    host: &str,
280    port: u16,
281    services: Arc<HashMap<String, Arc<dyn Service>>>,
282    service_name: &str,
283    service_vers: &str,
284    use_tls: bool,
285) -> Result<(), std::io::Error> {
286    let addr = format!("{}:{}", host, port);
287
288    let tls_acceptor = if use_tls {
289        match build_tls_acceptor() {
290            Ok(a) => {
291                info!("TLS enabled for local server (wss://local.unpispas.es:{})", port);
292                Some(a)
293            }
294            Err(e) => {
295                error!("Failed to build TLS acceptor, falling back to plain WS: {}", e);
296                None
297            }
298        }
299    } else {
300        None
301    };
302
303    loop {
304        match TcpListener::bind(&addr).await {
305            Ok(listener) => {
306                let proto = if tls_acceptor.is_some() { "wss" } else { "ws" };
307                info!("Local WebSocket server listening on {} ({}) service name {}", addr, proto, service_name);
308
309                while let Ok((stream, peer)) = listener.accept().await {
310                    if let Err(e) = stream.set_nodelay(true) {
311                        debug!("set_nodelay (peer={}): {}", peer, e);
312                    }
313                    let services = Arc::clone(&services);
314                    let service_name = service_name.to_string();
315                    let service_vers = service_vers.to_string();
316                    let tls = tls_acceptor.clone();
317
318                    tokio::spawn(async move {
319                        // Dual-mode: peek first byte to detect TLS (0x16) vs plain HTTP
320                        let mut first_byte = [0u8; 1];
321                        let n = match stream.peek(&mut first_byte).await {
322                            Ok(n) => n,
323                            Err(e) => {
324                                debug!("peek failed (peer={}): {}", peer, e);
325                                return;
326                            }
327                        };
328                        if n == 0 {
329                            return;
330                        }
331
332                        let is_tls = first_byte[0] == 0x16;
333                        if is_tls {
334                            // TLS handshake first, then WS
335                            if let Some(acceptor) = tls {
336                                match acceptor.accept(stream).await {
337                                    Ok(tls_stream) => {
338                                        if let Err(e) = websocket_handler(tls_stream, peer, services, &service_name, &service_vers).await {
339                                            error!("WSS handler error (peer={}): {}", peer, e);
340                                        }
341                                    }
342                                    Err(e) => {
343                                        debug!("TLS handshake failed (peer={}): {}", peer, e);
344                                    }
345                                }
346                            } else {
347                                debug!("Got TLS handshake but TLS disabled (peer={})", peer);
348                            }
349                        } else {
350                            // Plain HTTP — legacy ws:// clients
351                            if let Err(e) = websocket_handler(stream, peer, services, &service_name, &service_vers).await {
352                                error!("WS handler error (peer={}): {}", peer, e);
353                            }
354                        }
355                    });
356                }
357            }
358            Err(e) => {
359                error!("Failed to bind local server to {}: {}. Retrying in 5 seconds...", addr, e);
360                tokio::time::sleep(std::time::Duration::from_secs(2)).await;
361            }
362        }
363    }
364}
365
366/// Processes an incoming request by routing it to the appropriate service.
367///
368/// This function handles normalization of incoming messages, extracting metadata,
369/// and calling the appropriate service based on the `TARGET` field in the message.
370pub async fn process_request(
371    mut request: Value,
372    services: Arc<HashMap<String, Arc<dyn Service>>>,
373    service_name: &str,
374    service_vers: &str,
375    write: WebSocketWrite,
376) -> Value {
377    // Attempt to extract UUID from the message or use a default value if absent.
378    let uuid = request
379        .get("UUIDV4")
380        .or_else(|| request.get("MESSAGE_UUID"))
381        .cloned()
382        .unwrap_or(Value::Null);
383
384    // Normalize the message if it contains a `MESSAGE_DATA` field with nested content.
385    if let Some(message_data) = request.get("MESSAGE_DATA") {
386        if message_data.is_object() {
387            info!("Normalizing MESSAGE_DATA for remote request");
388            request = message_data.clone();
389
390            // Ensure `UUIDV4` is present in the normalized message.
391            if uuid != Value::Null && !request.get("UUIDV4").is_some() {
392                request["UUIDV4"] = uuid.clone();
393            }
394        }
395    }
396
397    // Check if the message is a connection response.
398    if request.get("result").is_some() && request.get("server").is_some(){
399        info!("Received CONNECT response: {:?}", request);
400        return json!({
401            "SERVICE_NAME": service_name,
402            "SERVICE_VERS": service_vers,
403            "MESSAGE_TYPE": "INFO",
404            "MESSAGE_EXEC": "SUCCESS",
405            "MESSAGE_UUID": uuid,
406            "MESSAGE_DATA": "Server connection acknowledged",
407        });
408    }
409
410    // Route the request to the target service specified in the `TARGET` field.
411    if let Some(target) = request.get("TARGET").and_then(Value::as_str) {        
412        if let Some(service) = services.get(target.to_lowercase().as_str()) {
413            info!("Running service: {}", target.to_lowercase());
414
415            // Execute the service and build the response.
416            let (status, response) = service.run(request.clone(), write).await;
417            let message_data = if let Ok(json_value) = serde_json::from_str::<Value>(&response) {
418                // Si es JSON válido, usar el Value directamente (sin escapar)
419                json_value
420            } else {
421                // Si no es JSON, usar como string
422                Value::String(response)
423            };
424            return json!({
425                "SERVICE_NAME": service_name,
426                "SERVICE_VERS": service_vers,
427                "MESSAGE_TYPE": "RESPONSE",
428                "MESSAGE_EXEC": if status == 0 { "SUCCESS" } else { "FAILURE" },
429                "MESSAGE_UUID": uuid,
430                "MESSAGE_DATA": message_data,
431            });
432        } else {
433            error!("Unknown service: {}", target);
434            return json!({
435                "SERVICE_NAME": service_name,
436                "SERVICE_VERS": service_vers,
437                "MESSAGE_TYPE": "RESPONSE",
438                "MESSAGE_EXEC": "FAILURE",
439                "MESSAGE_UUID": uuid,
440                "MESSAGE_DATA": format!("Unknown service: {}", target),
441            });
442        }
443    }
444
445    if request.get("SERVICE_NAME").is_some() {
446        error!("Missing TARGET in request.");
447    }
448
449
450    json!({
451        "SERVICE_NAME": service_name,
452        "SERVICE_VERS": service_vers,
453        "MESSAGE_TYPE": "RESPONSE",
454        "MESSAGE_EXEC": "FAILURE",
455        "MESSAGE_UUID": uuid,
456        "MESSAGE_DATA": "Missing TARGET in request.",
457    })
458}
459
460/// Handles a single WebSocket connection end-to-end.
461///
462/// The connection has already passed the dual-mode (TLS vs plain) detection
463/// in `start_local_server`, so `stream` is the appropriate concrete stream
464/// type. We wrap it in [`crate::prelude::BoxedStream`] and run the WS
465/// handshake with [`tokio_tungstenite::accept_hdr_async`] so we can:
466///
467/// 1. Validate the `Origin` header against `sharing::PNA_ALLOWED_ORIGINS`
468///    (with wildcard subdomain / wildcard port matching). Disallowed
469///    origins get HTTP 403.
470/// 2. Inject `Access-Control-Allow-Private-Network: true` into the
471///    101 Switching Protocols response so Chrome's PNA check passes.
472///
473/// After the handshake we read frames, `serde_json`-parse them, and
474/// dispatch to `process_request` which routes by the `TARGET` field.
475///
476/// Note: Chrome does **not** send an OPTIONS preflight before a WebSocket
477/// upgrade for PNA — the PNA check happens inside the WS handshake itself.
478/// So we do not have a separate OPTIONS path.
479pub async fn websocket_handler<S>(
480    stream: S,
481    peer: SocketAddr,
482    services: Arc<HashMap<String, Arc<dyn Service>>>,
483    service_name: &str,
484    service_vers: &str,
485) -> Result<(), Box<dyn std::error::Error>>
486where
487    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
488{
489    let boxed = crate::prelude::BoxedStream(Box::new(stream));
490
491    // Validate Origin against the configured allow-list and inject PNA headers
492    // into the 101 Switching Protocols response.
493    let pna_callback = |req: &Request, mut response: Response| -> std::result::Result<Response, ErrorResponse> {
494        let origin = req.headers().get("Origin").and_then(|v| v.to_str().ok());
495        match origin {
496            None => {
497                // Non-browser clients (native apps, CLI) often omit Origin — allow.
498                let h = response.headers_mut();
499                h.insert("Access-Control-Allow-Private-Network", "true".parse().unwrap());
500                Ok(response)
501            }
502            Some(o) if is_origin_allowed(o) => {
503                let h = response.headers_mut();
504                h.insert("Access-Control-Allow-Private-Network", "true".parse().unwrap());
505                h.insert("Access-Control-Allow-Origin", o.parse().unwrap());
506                h.insert("Vary", "Origin".parse().unwrap());
507                Ok(response)
508            }
509            Some(o) => {
510                error!("Rejected WS handshake from disallowed origin: {}", o);
511                let mut resp = ErrorResponse::new(Some("Forbidden origin".to_string()));
512                *resp.status_mut() = tokio_tungstenite::tungstenite::http::StatusCode::FORBIDDEN;
513                Err(resp)
514            }
515        }
516    };
517
518    let ws_stream = match accept_hdr_async(boxed, pna_callback).await {
519        Ok(ws) => {
520            debug!("WebSocket handshake OK (peer={})", peer);
521            ws
522        }
523        Err(e) => {
524            error!(
525                "WebSocket handshake failed (peer={}): {} (need RFC6455 WebSocket upgrade, e.g. browser WebSocket API)",
526                peer, e
527            );
528            return Err(e.into());
529        }
530    };
531    let (write, mut read) = ws_stream.split();
532
533    // Create a write lock to manage sending responses back to the client.
534    let rw_write = Arc::new(tokio::sync::RwLock::new(write));
535
536    while let Some(msg) = read.next().await {
537        match msg {
538            Ok(Message::Text(text)) => {
539                info!(
540                    "Received text message HANDLER: {}",
541                    if text.len() > 200 {
542                        format!("{}...", &text[..200]) // Truncate long messages
543                    } else {
544                        text.clone() // Use full message if short
545                    }
546                );
547
548                match serde_json::from_str::<serde_json::Value>(&text) {
549                    Ok(request) => {
550                        let response_message = process_request(
551                            request,
552                            Arc::clone(&services),
553                            service_name,
554                            service_vers,
555                            Some(Arc::clone(&rw_write)),
556                        )
557                            .await;
558
559                        let mut ws = rw_write.write().await;
560                        if let Err(e) = ws.send(Message::Text(response_message.to_string())).await {
561                            error!("Failed to send response: {}", e);
562                        }
563                    }
564                    Err(e) => {
565                        //response error!
566                        let error_response = json!({
567                            "SERVICE_NAME": service_name,
568                            "SERVICE_VERS": service_vers,
569                            "MESSAGE_TYPE": "RESPONSE",
570                            "MESSAGE_EXEC": "FAILURE",
571                            "MESSAGE_DATA": format!("Invalid JSON: {}", e),
572                        });
573                        let mut ws = rw_write.write().await;
574                        if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
575                            error!("Failed to send error response: {}", e);
576                        }
577                        error!("Invalid JSON: {}", e);
578                    }
579                }
580            }
581            Err(e) => {
582                let error_response = json!({
583                            "SERVICE_NAME": service_name,
584                            "SERVICE_VERS": service_vers,
585                            "MESSAGE_TYPE": "RESPONSE",
586                            "MESSAGE_EXEC": "FAILURE",
587                            "MESSAGE_DATA": format!("Invalid JSON: {}", e),
588                        });
589                let mut ws = rw_write.write().await;
590                if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
591                    error!("Failed to send error response: {}", e);
592                }
593                debug!("Error processing message: {}", e)
594            },
595            _ => {}
596        }
597    }
598
599    Ok(())
600}
601
602/// Starts the remote WebSocket server with reconnection and message handling.
603///
604/// This function keeps trying to connect to the remote server in a loop,
605/// reconnecting automatically if the connection fails.
606pub async fn start_remote_server(
607    remote_host: &str,
608    remote_port: u16,
609    service_name: &str,
610    service_vers: &str,
611    services: Arc<HashMap<String, Arc<dyn Service>>>,
612) -> Result<(), std::io::Error> {
613    loop {
614        match connect_to_remote_server(
615            remote_host,
616            remote_port,
617            service_name,
618            service_vers,
619            Arc::clone(&services),
620            false,
621        )
622            .await
623        {
624            Ok(_) => info!("Successfully connected to remote server."),
625            Err(e) => {
626                error!("Failed to connect to remote server: {:?}", e);
627                info!("Reconnecting in 5 seconds...");
628                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
629            }
630        }
631    }
632}
633
634/// Handles a single connection to the remote WebSocket server.
635///
636/// This function sends a `CONNECT` message to the server upon successful connection
637/// and processes incoming messages by routing them to the appropriate services.
638async fn connect_to_remote_server(
639    remote_host: &str,
640    remote_port: u16,
641    service_name: &str,
642    service_vers: &str,
643    services: Arc<HashMap<String, Arc<dyn Service>>>,
644    return_after_connect: bool,
645) -> Result<(), Box<dyn std::error::Error>> {
646    let protocol = if std::env::var("REMOTE_USSL").unwrap_or_else(|_| "true".to_string()) == "true" {
647        "wss"
648    } else {
649        "ws"
650    };
651
652    let url = format!("{}://{}:{}", protocol, remote_host, remote_port);
653    info!("Connecting to remote WebSocket server at: {}", url);
654
655    let ws_url = Url::parse(&url)?;
656    let (ws_stream, _) = connect_async(ws_url.as_str()).await?;
657    let (mut write, mut read) = ws_stream.split();
658
659    let connect_message = json!({
660        "SERVICE_NAME": service_name,
661        "SERVICE_VERS": sharing::VERSION,
662        "MESSAGE_TYPE": "CONNECT",
663        "MODULE_LIST": services.iter().map(|(name, service)| {
664            json!({
665                "MODULE_NAME": name,
666                "MODULE_VERS": service.get_version(),
667            })
668        }).collect::<Vec<_>>(),
669    });
670
671    write.send(Message::Text(connect_message.to_string())).await?;
672    info!("CONNECT message sent.");
673
674    if return_after_connect {
675        return Ok(());
676    }
677
678    let write_p = Arc::new(tokio::sync::Mutex::new(write));
679    let service_name = service_name.to_owned();
680    let service_vers = service_vers.to_owned();
681
682    // Keep-alive: envía PING cada 25 segundos
683    let keep_alive_write = Arc::clone(&write_p);
684    let keep_alive_task = tokio::spawn(async move {
685        let mut interval = tokio::time::interval(std::time::Duration::from_secs(25));
686
687        loop {
688            interval.tick().await;
689
690            let mut ws = keep_alive_write.lock().await;
691            if let Err(e) = ws.send(Message::Ping(vec![])).await {
692                error!("Keep-alive failed: {}", e);
693                break;
694            }
695        }
696    });
697
698    // Loop de mensajes con timeout de 90 segundos
699    let result = async {
700        loop {
701            match tokio::time::timeout(
702                std::time::Duration::from_secs(90),
703                read.next()
704            ).await {
705                Ok(Some(Ok(Message::Text(text)))) => {
706                    info!("Received message");
707                    if let Ok(request) = serde_json::from_str::<serde_json::Value>(&text) {
708                        let response = process_request(
709                            request,
710                            Arc::clone(&services),
711                            &service_name,
712                            &service_vers,
713                            None,
714                        ).await;
715
716                        let mut ws = write_p.lock().await;
717                        ws.send(Message::Text(response.to_string())).await?;
718                    }
719                }
720                Ok(Some(Ok(Message::Ping(ping)))) => {
721                    info!("Received PING from server, sending PONG");
722                    let mut ws = write_p.lock().await;
723                    ws.send(Message::Pong(ping)).await?;
724                }
725                Ok(Some(Ok(Message::Pong(_)))) => {
726                    // PONG recibido - conexión sana
727                }
728                Ok(Some(Ok(Message::Close(_)))) => {
729                    error!("Connection closed by server");
730                    break;
731                }
732                Ok(Some(Err(e))) => {
733                    error!("WebSocket error: {}", e);
734                    break;
735                }
736                Ok(None) => {
737                    error!("Connection closed");
738                    break;
739                }
740                Err(_) => {
741                    error!("No activity in 90 seconds, reconnecting");
742                    break;
743                }
744                _ => {}
745            }
746        }
747        Ok::<(), Box<dyn std::error::Error>>(())
748    }.await;
749
750    keep_alive_task.abort();
751    result
752}
753
754/// Sends a message to the WebSocket connection.
755///
756/// # Arguments
757/// - `write`: The WebSocket connection.
758/// - `message_data`: The message to send.
759pub async fn send_message(write: &WebSocketWrite, message_data: String) {
760    if let Some(ws_lock) = &write {
761        let mut ws = ws_lock.write().await;
762
763        match ws.send(Message::Text(message_data.clone())).await {
764            Ok(_) => {
765                info!("Message sent successfully: {}", message_data);
766            }
767            Err(e) => {
768                error!("Failed to send message: {:?}", e);
769            }
770        }
771    } else {
772        info!("No WebSocket available to send message.");
773    }
774}
775