pispas_modules/
command_viewer.rs

1use actix_web::middleware::Logger;
2use actix_web::{web, App, HttpResponse, HttpServer, Responder};
3use easy_trace::prelude::{debug, error, info};
4use futures_util::SinkExt;
5use futures_util::StreamExt;
6use serde::{Deserialize, Serialize};
7use sharing::utils::ConfigEnv;
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::net::TcpListener;
13use tokio::sync::Mutex;
14use tokio_tungstenite::{accept_async, connect_async, tungstenite::protocol::Message};
15
16use crate::service::{Service, WebSocketWrite};
17use async_trait::async_trait;
18use easy_trace::instruments::tracing;
19use serde_json::Value;
20use std::any::Any;
21
22/// The version of the CommandViewerService module.
23pub const COMMAND_VIEWER_VERSION: &str = "1.0.0";
24
25/// A minimal service that only returns its version and performs no other operations.
26#[derive(Clone)]
27pub struct CommandViewerService;
28
29impl CommandViewerService {
30    /// Creates a new instance of `CommandViewerService`.
31    pub fn new() -> Self {
32        info!(
33            "CommandViewerService initialized (version: {})",
34            COMMAND_VIEWER_VERSION
35        );
36        CommandViewerService
37    }
38}
39
40#[async_trait]
41impl Service for CommandViewerService {
42    /// This method does nothing meaningful. It only logs the action received and returns a placeholder result.
43    async fn run(&self, _action: Value, _write: WebSocketWrite) -> (i32, String) {
44        info!("CommandViewerService: run method called, but no action is performed.");
45        (0, "No action performed".to_string())
46    }
47
48    /// Allows dynamic casting to `Any` for type-safe downcasting.
49    fn as_any(&self) -> &dyn Any {
50        self
51    }
52
53    /// Stops the service. Since this service does nothing, the method just logs a message.
54    fn stop_service(&self) {
55        info!("CommandViewerService: stop_service called, but nothing to stop.");
56    }
57
58    /// Returns the version of the CommandViewerService module.
59    fn get_version(&self) -> String {
60        COMMAND_VIEWER_VERSION.to_string()
61    }
62}
63fn current_timestamp() -> u64 {
64    //fix panic
65    match SystemTime::now().duration_since(UNIX_EPOCH) {
66        Ok(n) => n.as_secs(),
67        Err(_) => {
68            return 0
69        }
70    }    
71}
72
73const MAX_COMMANDS_PER_PRINTER: usize = 45;
74// Estructura para una comanda
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct Command {
77    pub id: String,
78    pub order_no: String,
79    pub html: String,
80    pub archived: bool,
81    pub printer: String,
82    pub timestamp: Option<u64>,
83}
84
85// Archivo de persistencia
86const PERSISTENCE_FILE: &str = "commands.json";
87pub struct KitchenState {
88    pub commands: Mutex<HashMap<String, Vec<Command>>>, // GUID -> Comandas
89    pub connections: Mutex<HashMap<u64, tokio::sync::mpsc::UnboundedSender<Message>>>, // Connection ID -> Sender
90    pub subscriptions: Mutex<HashMap<String, Vec<u64>>>, // GUID -> Connection IDs
91    pub config_env: ConfigEnv,
92}
93
94pub type SharedKitchenState = Arc<KitchenState>;
95
96/// Agregar una nueva comanda a la impresora correspondiente
97async fn add_command(command: Command, state: SharedKitchenState) -> Result<(), String> {
98    let printer = command.printer.clone();
99
100    // Si la impresora es "CommandViewer", saltamos la verificación de suscriptores
101    if printer != "CommandViewer" {
102        let subscriptions = state.subscriptions.lock().await;
103        if let Some(connection_ids) = subscriptions.get(&printer) {
104            if connection_ids.is_empty() {
105                info!(
106                    "Dont save command because there are no subscribers for printer: {}",
107                    printer
108                );
109                return Err(format!(
110                    "There are no subscribers for printer: {}",
111                    printer
112                ));
113            }
114        } else {
115            let commands = state.commands.lock().await;
116            if !commands.contains_key(&printer) {
117                info!(
118                    "Dont save command because there are no subscribers for printer: {}",
119                    printer
120                );
121                return Err(format!(
122                    "There are no subscribers for printer: {}",
123                    printer
124                ));
125            }
126        }
127        drop(subscriptions);
128    }
129    let mut commands = state.commands.lock().await;
130
131    let printer_commands = commands
132        .entry(command.printer.clone())
133        .or_insert_with(Vec::new);
134
135    printer_commands.push(command);
136    printer_commands.sort_by_key(|c| c.timestamp.unwrap_or(0));
137    tracing::warn!("comandas para esa impresra: {}", printer_commands.len());
138
139    // Limitar a un máximo de 200 comandas
140    if printer_commands.len() > MAX_COMMANDS_PER_PRINTER {
141        let drain_amount = printer_commands.len() / 3; // delete 25% of the oldest commands
142        tracing::warn!("Draining {}", drain_amount);
143        printer_commands.drain(0..drain_amount); // FIFO
144
145        //send lenArchivedCommand to all clients
146        let response = serde_json::json!({
147            "status": "lenArchivedCommand",
148            "printer": printer.clone(),
149            "len": printer_commands.len(),
150        });
151        broadcast_to_clients(Message::Text(response.to_string()), &printer, state.clone()).await;
152        tracing::warn!("SEND ALL CLIENTS {}", printer_commands.len());
153    }
154
155    save_commands_to_disk(&*commands).await;
156
157    Ok(())
158}
159
160/// Load persistent commands from disk
161async fn load_commands_from_disk() -> HashMap<String, Vec<Command>> {
162    if let Ok(data) = std::fs::read_to_string(PERSISTENCE_FILE) {
163        if let Ok(mut commands) = serde_json::from_str::<HashMap<String, Vec<Command>>>(&data) {
164            for printer_commands in commands.values_mut() {
165                for command in printer_commands.iter_mut() {
166                    // Asigna un timestamp si no existe
167                    if command.timestamp.is_none() || command.timestamp == Some(0) {
168                        command.timestamp = Some(current_timestamp());
169                    }
170                }
171            }
172            return commands;
173        }
174    }
175    HashMap::new()
176}
177
178/// Save commands to disk
179async fn save_commands_to_disk(commands: &HashMap<String, Vec<Command>>) {
180    if let Ok(json) = serde_json::to_string_pretty(commands) {
181        if let Err(e) = std::fs::write(PERSISTENCE_FILE, json) {
182            error!("Error saving commands to disk: {}", e);
183        }
184    }
185}
186
187/// GET /commands
188async fn list_commands(state: web::Data<SharedKitchenState>) -> impl Responder {
189    let commands = state.commands.lock().await;
190    HttpResponse::Ok().json(&*commands)
191}
192
193/// incude all static files in the binary
194const INDEX_HTML: &str = include_str!("../src/static/index.html");
195const APP_JS: &str = include_str!("../src/static/app.js");
196const STYLE_CSS: &str = include_str!("../src/static/style.css");
197pub const FAVICON_ICO: &[u8] = include_bytes!("../src/static/favicon.ico");
198
199/// Configuration and start of the Actix HTTP server
200async fn start_http_server(addr: &str, state: SharedKitchenState) -> std::io::Result<()> {
201    HttpServer::new(move || {
202        App::new()
203            .app_data(web::Data::new(state.clone())) // Compartir el estado
204            .wrap(Logger::default()) // Logging
205            // Servir el contenido embebido para HTML, JS, y CSS
206            .route(
207                "/",
208                web::get().to(|| async {
209                    HttpResponse::Ok()
210                        .content_type("text/html")
211                        .body(INDEX_HTML)
212                }),
213            )
214            .route(
215                "/app.js",
216                web::get().to(|| async {
217                    HttpResponse::Ok()
218                        .content_type("application/javascript")
219                        .body(APP_JS)
220                }),
221            )
222            .route(
223                "/style.css",
224                web::get()
225                    .to(|| async { HttpResponse::Ok().content_type("text/css").body(STYLE_CSS) }),
226            )
227            .route(
228                "/favicon.ico", // Ruta para favicon.ico
229                web::get().to(|| async {
230                    HttpResponse::Ok()
231                        .content_type("image/x-icon")
232                        .body(FAVICON_ICO)
233                }),
234            )
235            .route("/commands", web::get().to(list_commands)) // Listar todas las comandas
236    })
237    .bind(addr)? // Dirección para el servidor
238    .run() // Ejecutar el servidor
239    .await
240}
241
242static CONNECTION_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
243
244/// process a WebSocket connection
245async fn process_websocket_connection(
246    ws_stream: tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
247    state: SharedKitchenState,
248) {
249    let (mut write, mut read) = ws_stream.split();
250    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
251    let connection_id = CONNECTION_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
252
253    {
254        let mut connections = state.connections.lock().await;
255        connections.insert(connection_id, tx.clone());
256    }
257
258    let send_task = tokio::spawn(async move {
259        let mut rx = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
260        while let Some(msg) = rx.next().await {
261            if let Err(e) = write.send(msg).await {
262                error!("Error sending message to client: {}", e);
263                break;
264            }
265        }
266    });
267
268    while let Some(Ok(msg)) = read.next().await {
269        if let Message::Text(txt) = msg {
270            match serde_json::from_str::<serde_json::Value>(&txt) {
271                Ok(json) => {
272                    debug!("Mensaje WebSocket recibido: {:?}", json);
273                    if let Some(action) = json.get("action").and_then(|v| v.as_str()) {
274                        match action {
275                            "getCommands" => {
276                                let commands = state.commands.lock().await.clone();
277                                let response = serde_json::json!({
278                                    "status": "success",
279                                    "commands": commands
280                                });
281                                let _ = tx.send(Message::Text(response.to_string()));
282                            }
283                            "sendPdfPrint" => {
284                                info!("sendPdfPrint action received: {:?}", json);
285
286                                let guid = json.get("guid").and_then(|v| v.as_str());
287                                if let Some(printers) =
288                                    json.get("printers").and_then(|v| v.as_array())
289                                {
290                                    info!("Printers to send to: {:?}", printers);
291
292                                    for printer in printers {
293                                        if let Some(printer_name) = printer.as_str() {
294                                            info!("Processing printer: {}", printer_name);
295
296                                            if let Some(html) =
297                                                json.get("html").and_then(|v| v.as_str())
298                                            {
299                                                info!(
300                                                    "HTML content found for printer {}",
301                                                    printer_name
302                                                );
303
304                                                // Serializar el campo ACTION como una cadena JSON
305                                                let action_object = serde_json::json!({
306                                                    printer_name: {
307                                                        "print": html.to_string(),
308                                                        "copies": 1,
309                                                        "open": false,
310                                                    }
311                                                });
312
313                                                let action_serialized = action_object.to_string();
314
315                                                // Construir el mensaje completo
316                                                let message = serde_json::json!({
317                                                    "UUIDV4": guid,
318                                                    "TARGET": "PRINT",
319                                                    "ACTION": action_serialized  // Serializado como cadena
320                                                });
321
322                                                debug!(
323                                                    "Message to send to printsvc: {:?}",
324                                                    message
325                                                );
326
327                                                match send_to_printsvc(&state.config_env, message)
328                                                    .await
329                                                {
330                                                    Ok(response) => {
331                                                        info!(
332                                                            "Successfully sent to printsvc: {:?}",
333                                                            response
334                                                        );
335                                                        let _ = tx.send(Message::Text(
336                                                            response.to_string(),
337                                                        ));
338                                                    }
339                                                    Err(e) => {
340                                                        error!("Error sending to printsvc for printer {}: {}", printer_name, e);
341                                                        let error_response = serde_json::json!({
342                                                            "status": "error",
343                                                            "message": format!("Failed to send to printer {}: {}", printer_name, e),
344                                                        });
345                                                        let _ = tx.send(Message::Text(
346                                                            error_response.to_string(),
347                                                        ));
348                                                    }
349                                                }
350                                            } else {
351                                                error!(
352                                                    "Missing HTML content for printer {}",
353                                                    printer_name
354                                                );
355                                            }
356                                        } else {
357                                            error!(
358                                                "Invalid printer name in printers array: {:?}",
359                                                printer
360                                            );
361                                        }
362                                    }
363                                } else {
364                                    error!("Missing or invalid printers array in sendPdfPrint message: {:?}", json);
365                                }
366                            }
367
368                            "subscribe" => {
369                                // Agregar traza para el mensaje recibido
370                                info!("Procesando mensaje de suscripción: {:?}", json);
371
372                                if let Some(printers) =
373                                    json.get("printers").and_then(|v| v.as_array())
374                                {
375                                    // Trazar las impresoras enviadas en el mensaje
376                                    info!(
377                                        "Lista de impresoras recibida para suscripción: {:?}",
378                                        printers
379                                    );
380
381                                    if let Some(guid) = json.get("guid").and_then(|v| v.as_str()) {
382                                        let mut subscriptions = state.subscriptions.lock().await;
383
384                                        // Trazar estado actual de suscripciones antes de procesar
385                                        info!(
386                                            "Suscripciones actuales antes de agregar: {:?}",
387                                            *subscriptions
388                                        );
389
390                                        // Iterar sobre cada impresora y procesar la suscripción
391                                        for printer in printers {
392                                            if let Some(printer_name) = printer.as_str() {
393                                                info!(
394                                                    "Agregando conexión {} a la impresora: {}",
395                                                    connection_id, printer_name
396                                                );
397
398                                                // Agregar conexión a la lista de suscriptores de la impresora
399                                                subscriptions
400                                                    .entry(printer_name.to_string())
401                                                    .or_insert_with(Vec::new)
402                                                    .push(connection_id);
403
404                                                // Confirmar que se ha agregado
405                                                info!(
406                        "Conexión {} suscrita correctamente a la impresora: {}",
407                        connection_id, printer_name
408                    );
409                                            } else {
410                                                // Si algún valor en la lista no es válido
411                                                error!(
412                        "Impresora inválida detectada en el array: {:?}",
413                        printer
414                    );
415                                            }
416                                        }
417
418                                        // Trazar estado de las suscripciones después de agregar
419                                        info!(
420                                            "Suscripciones actuales después de agregar: {:?}",
421                                            *subscriptions
422                                        );
423
424                                        // Enviar respuesta de éxito al cliente
425                                        let response = serde_json::json!({
426                                            "status": "success",
427                                            "message": format!(
428                                                "Subscribed to printers: {:?} with GUID: {}",
429                                                printers, guid
430                                            ),
431                                        });
432
433                                        let _ = tx.send(Message::Text(response.to_string()));
434                                    } else {
435                                        // Log para un GUID faltante
436                                        error!(
437                                            "Campo 'guid' faltante en el mensaje de suscripción."
438                                        );
439                                        let error_response = serde_json::json!({
440                                            "status": "error",
441                                            "message": "Missing 'guid' field in subscription",
442                                        });
443
444                                        let _ = tx.send(Message::Text(error_response.to_string()));
445                                    }
446                                } else {
447                                    // Log para impresoras faltantes o inválidas
448                                    error!(
449                                        "Campo 'printers' faltante o inválido en el mensaje: {:?}",
450                                        json
451                                    );
452                                    let error_response = serde_json::json!({
453                                        "status": "error",
454                                        "message": "Missing or invalid 'printers' field in subscription",
455                                    });
456
457                                    let _ = tx.send(Message::Text(error_response.to_string()));
458                                }
459                            }
460                            "getPrinters" => {
461                                let message = serde_json::json!({
462                                    "UUIDV4": uuid::Uuid::new_v4().to_string(),
463                                    "TARGET": "PRINT",
464                                    "ACTION": "getPrinters",
465                                });
466
467                                match send_to_printsvc(&state.config_env, message).await {
468                                    Ok(response) => {
469                                        let _ = tx.send(Message::Text(response.to_string()));
470                                    }
471                                    Err(e) => {
472                                        let error_response = serde_json::json!({
473                                            "status": "error",
474                                            "message": format!("Failed to get printers: {}", e),
475                                        });
476                                        let _ = tx.send(Message::Text(error_response.to_string()));
477                                    }
478                                }
479                            }
480                            "updateCommandArchive" => {
481                                if let Some(id) = json.get("id").and_then(|v| v.as_str()) {
482                                    if let Some(archived) =
483                                        json.get("archived").and_then(|v| v.as_bool())
484                                    {
485                                        let updated_html =
486                                            json.get("html").and_then(|v| v.as_str());
487                                        let mut commands = state.commands.lock().await;
488
489                                        // Buscar y actualizar la comanda en todas las listas
490                                        for (guid, command_list) in commands.iter_mut() {
491                                            if let Some(command) =
492                                                command_list.iter_mut().find(|cmd| cmd.id == id)
493                                            {
494                                                command.archived = archived;
495
496                                                // Actualiza el HTML si se proporciona
497                                                if let Some(html) = updated_html {
498                                                    command.html = html.to_string();
499                                                }
500
501                                                let affected_guid = Some(guid.clone());
502                                                if let Some(guid) = affected_guid {
503                                                    // Notificar a los suscriptores del GUID
504                                                    let response = serde_json::json!({
505                                                        "status": "updateCommand",
506                                                        "id": id,
507                                                        "archived": archived,
508                                                        "html": command.html, // Enviar HTML actualizado
509                                                        "timestamp": command.timestamp,
510                                                    });
511
512                                                    broadcast_to_clients_excluding(
513                                                        Message::Text(response.to_string()),
514                                                        &guid, // GUID del cliente que actualizó
515                                                        &connection_id, // Excluye este connection_id
516                                                        state.clone(),
517                                                    )
518                                                    .await;
519                                                } else {
520                                                    // Si no se encontró la comanda
521                                                    let error_response = serde_json::json!({
522                                                        "status": "error",
523                                                        "message": "Command not found",
524                                                    });
525                                                    let _ = tx.send(Message::Text(
526                                                        error_response.to_string(),
527                                                    ));
528                                                }
529                                                break;
530                                            }
531                                        }
532
533                                        // Guardar cambios en disco después de liberar el préstamo mutable
534                                        let commands_to_save = commands.clone(); // Clonar antes de liberar el MutexGuard
535                                        drop(commands);
536                                        save_commands_to_disk(&commands_to_save).await;
537                                        info!("Comanda actualizada: {}", id);
538                                    }
539                                }
540                            }
541                            "lenArchivedCommand" => {
542                                //recorrer las impresoras y mandar el len de cada una de ellas
543                                let mut commands = state.commands.lock().await;
544                                for (printer, command_list) in commands.iter_mut() {
545                                    let mut archived = 0;
546                                    for command in command_list.iter() {
547                                        if command.archived {
548                                            archived += 1;
549                                        }
550                                    }
551                                    let response = serde_json::json!({
552                                        "status": "lenArchivedCommand",
553                                        "printer": printer,
554                                        "len": archived,
555                                    });
556                                    broadcast_to_clients(
557                                        Message::Text(response.to_string()),
558                                        &printer,
559                                        state.clone(),
560                                    )
561                                    .await;
562                                }
563                            }
564                            "deleteCommand" => {
565                                if let Some(command_id) = json.get("id").and_then(|v| v.as_str()) {
566                                    let mut commands = state.commands.lock().await;
567
568                                    // Buscar la comanda en todas las listas de suscriptores
569                                    let mut found_printer: Option<String> = None;
570                                    for (_guid, command_list) in commands.iter_mut() {
571                                        if let Some(pos) =
572                                            command_list.iter().position(|c| c.id == command_id)
573                                        {
574                                            // Obtener la impresora asociada a la comanda
575                                            found_printer = Some(command_list[pos].printer.clone());
576                                            command_list.remove(pos);
577                                            break;
578                                        }
579                                    }
580
581                                    if let Some(printer) = found_printer {
582                                        save_commands_to_disk(&commands).await;
583
584                                        // Confirmación al cliente que solicitó la acción
585                                        let response = serde_json::json!({
586                                            "status": "success",
587                                            "message": "commandDeleted",
588                                            "id": command_id,
589                                        });
590
591                                        // Notificar a los clientes suscritos a la impresora correspondiente
592                                        broadcast_to_clients(
593                                            Message::Text(response.to_string()),
594                                            &printer,
595                                            state.clone(),
596                                        )
597                                        .await;
598                                    } else {
599                                        // Manejar caso de comanda no encontrada
600                                        let response = serde_json::json!({
601                                            "status": "error",
602                                            "message": "Command not found"
603                                        });
604
605                                        let _ = tx.send(Message::Text(response.to_string()));
606                                    }
607                                } else {
608                                    error!("ID de comanda no encontrado en el mensaje: {}", txt);
609                                }
610                            }
611                            "addCommand" => {
612                                if let Some(command) = json
613                                    .get("data")
614                                    .and_then(|v| serde_json::from_value::<Command>(v.clone()).ok())
615                                {
616                                    // Validar el contenido de la comanda (HTML)
617                                    if let Some(title_start) = command.html.find("<title>") {
618                                        if let Some(title_end) = command.html.find("</title>") {
619                                            let new_id =
620                                                &command.html[(title_start + 7)..title_end];
621
622                                            // Verificar que sea una comanda válida
623                                            // if command.html.contains("Factura simplificada") {
624                                            //     info!("No es una comanda válida. Ignorada.");
625                                            //     return;
626                                            // }
627
628                                            info!(
629                                                "Nueva comanda recibida con ID del título: {:?}",
630                                                new_id
631                                            );
632
633                                            // Crear la comanda actualizada con el nuevo ID
634                                            let updated_command = Command {
635                                                id: command.id.clone(),
636                                                order_no: new_id.to_string(),
637                                                html: command.html.clone(),
638                                                archived: false,
639                                                printer: command.printer.clone(),
640                                                timestamp: Some(current_timestamp()),
641                                            };
642
643                                            // Agregar la comanda utilizando el método con FIFO
644                                            if let Err(e) =
645                                                add_command(updated_command.clone(), state.clone())
646                                                    .await
647                                            {
648                                                error!("Error al agregar la comanda: {}", e);
649                                            }
650
651                                            // Notificar a los clientes suscritos
652                                            let broadcast_message = Message::Text(
653                                                serde_json::json!({
654                                                    "status": "newCommand",
655                                                    "id": updated_command.id,
656                                                    "html": updated_command.html,
657                                                    "timestamp": updated_command.timestamp,
658                                                })
659                                                .to_string(),
660                                            );
661
662                                            let subscriptions = state.subscriptions.lock().await;
663                                            info!("Suscripciones actuales: {:?}", subscriptions);
664
665                                            if let Some(connection_ids) =
666                                                subscriptions.get(&updated_command.printer)
667                                            {
668                                                info!(
669                                                    "Conexiones suscritas a {}: {:?}",
670                                                    updated_command.printer, connection_ids
671                                                );
672                                                for connection_id in connection_ids {
673                                                    info!(
674                                                        "Intentando enviar a conexión: {}",
675                                                        connection_id
676                                                    );
677                                                    if let Some(tx) = state
678                                                        .connections
679                                                        .lock()
680                                                        .await
681                                                        .get(connection_id)
682                                                    {
683                                                        if tx
684                                                            .send(broadcast_message.clone())
685                                                            .is_err()
686                                                        {
687                                                            error!("Error enviando mensaje a la conexión: {}", connection_id);
688                                                        } else {
689                                                            info!(
690                                                                "Mensaje enviado a conexión: {}",
691                                                                connection_id
692                                                            );
693                                                        }
694                                                    }
695                                                }
696                                            } else {
697                                                error!(
698        "No hay conexiones suscritas a la impresora: {}",
699        updated_command.printer
700    );
701                                            }
702                                        } else {
703                                            error!("Error: No se encontró el cierre del tag <title> en el HTML.");
704                                        }
705                                    } else {
706                                        error!("Error: No se encontró el tag <title> en el HTML.");
707                                    }
708                                } else {
709                                    error!("Error: Formato de comando inválido o datos faltantes.");
710                                }
711                            }
712
713                            _ => error!("Acción desconocida: {}", action),
714                        }
715                    }
716                }
717                Err(e) => error!("Error procesando el mensaje WebSocket: {}", e),
718            }
719        }
720    }
721
722    {
723        let mut connections = state.connections.lock().await;
724        connections.remove(&connection_id);
725        info!("Connection {} removed", connection_id);
726    }
727    {
728        let mut subscriptions = state.subscriptions.lock().await;
729
730        for (_printer, connection_ids) in subscriptions.iter_mut() {
731            connection_ids.retain(|&id| id != connection_id);
732        }
733
734        subscriptions.retain(|_, v| !v.is_empty());
735
736        info!(
737            "Connection {} unsubscribed from all printers",
738            connection_id
739        );
740    }
741
742    let _ = send_task.await;
743}
744
745async fn send_to_printsvc(
746    config: &ConfigEnv,
747    message: serde_json::Value,
748) -> Result<serde_json::Value, String> {
749    // When TLS is on the embedded cert is issued for `local.unpispas.es`,
750    // not for the IP we bind to (`config.local_host` is usually 127.0.0.1).
751    // Use the cert hostname so verification passes — DNS still resolves to 127.0.0.1.
752    let (proto, host) = if config.local_ussl {
753        ("wss", "local.unpispas.es")
754    } else {
755        ("ws", config.local_host.as_str())
756    };
757    let url = format!("{}://{}:{}", proto, host, config.local_port);
758    info!("Connecting to printsvc at {}", url);
759    match connect_async(url).await {
760        Ok((mut ws_stream, _)) => {
761            // Enviar el mensaje
762            let msg_text = serde_json::to_string(&message)
763                .map_err(|e| format!("Error serializing message to JSON: {}", e))?;
764
765            ws_stream
766                .send(Message::Text(msg_text))
767                .await
768                .map_err(|e| e.to_string())?;
769
770            // Esperar la respuesta
771            if let Some(Ok(Message::Text(response_text))) = ws_stream.next().await {
772                serde_json::from_str(&response_text).map_err(|e| e.to_string())
773            } else {
774                Err("No response from printsvc".to_string())
775            }
776        }
777        Err(e) => Err(format!("Failed to connect to printsvc: {}", e)),
778    }
779}
780
781/// Configuración y arranque del servidor WebSocket
782async fn start_websocket_server(addr: &str, state: SharedKitchenState) -> tokio::io::Result<()> {
783    let listener = TcpListener::bind(addr).await?;
784    info!("WebSocket server running on {}", addr);
785
786    while let Ok((stream, _)) = listener.accept().await {
787        let state = Arc::clone(&state);
788        tokio::spawn(async move {
789            if let Ok(ws_stream) = accept_async(stream).await {
790                process_websocket_connection(ws_stream, state).await;
791            }
792        });
793    }
794
795    Ok(())
796}
797
798/// Arranque combinado de HTTP y WebSocket
799pub async fn start_kitchen_server() -> std::io::Result<()> {
800    let state = Arc::new(KitchenState {
801        commands: Mutex::new(load_commands_from_disk().await),
802        connections: Mutex::new(HashMap::new()),
803        subscriptions: Mutex::new(HashMap::new()),
804        config_env: ConfigEnv::load(),
805    });
806
807    // Levantar ambos servidores en paralelo
808    tokio::try_join!(
809        start_http_server("0.0.0.0:80", state.clone()),
810        start_websocket_server("0.0.0.0:9001", state),
811    )?;
812
813    Ok(())
814}
815async fn broadcast_to_clients_excluding(
816    message: Message,
817    printer: &str,
818    exclude_connection_id: &u64, // Nuevo parámetro para excluir un cliente
819    state: SharedKitchenState,
820) {
821    let connections = state.connections.lock().await;
822    let subscriptions = state.subscriptions.lock().await;
823
824    if let Some(connection_ids) = subscriptions.get(printer) {
825        for connection_id in connection_ids {
826            if *connection_id != *exclude_connection_id {
827                if let Some(tx) = connections.get(connection_id) {
828                    if tx.send(message.clone()).is_err() {
829                        error!("Failed to send message to connection {}", connection_id);
830                    } else {
831                        info!("Message sent to connection {}", connection_id);
832                    }
833                }
834            }
835        }
836    } else {
837        info!("No hay suscriptores asociados a la impresora: {}", printer);
838    }
839}
840
841async fn broadcast_to_clients(message: Message, printer: &str, state: SharedKitchenState) {
842    let connections = state.connections.lock().await;
843    let subscriptions = state.subscriptions.lock().await;
844
845    if let Some(connection_ids) = subscriptions.get(printer) {
846        for connection_id in connection_ids {
847            if let Some(tx) = connections.get(connection_id) {
848                if tx.send(message.clone()).is_err() {
849                    error!("Failed to send message to connection {}", connection_id);
850                } else {
851                    info!("Message sent to connection {}", connection_id);
852                }
853            }
854        }
855    } else {
856        info!("No hay suscriptores asociados a la impresora: {}", printer);
857    }
858}