pispas_modules/command_viewer.rs
1use actix_web::middleware::Logger;
2use actix_web::{web, App, HttpResponse, HttpServer, Responder};
3use easy_trace::prelude::{debug, error, info};
4use futures_util::SinkExt;
5use futures_util::StreamExt;
6use serde::{Deserialize, Serialize};
7use sharing::utils::ConfigEnv;
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::net::TcpListener;
13use tokio::sync::Mutex;
14use tokio_tungstenite::{accept_async, connect_async, tungstenite::protocol::Message};
15
16use crate::service::{Service, WebSocketWrite};
17use async_trait::async_trait;
18use easy_trace::instruments::tracing;
19use serde_json::Value;
20use std::any::Any;
21
22/// The version of the CommandViewerService module.
23pub const COMMAND_VIEWER_VERSION: &str = "1.0.0";
24
25/// A minimal service that only returns its version and performs no other operations.
26#[derive(Clone)]
27pub struct CommandViewerService;
28
29impl CommandViewerService {
30 /// Creates a new instance of `CommandViewerService`.
31 pub fn new() -> Self {
32 info!(
33 "CommandViewerService initialized (version: {})",
34 COMMAND_VIEWER_VERSION
35 );
36 CommandViewerService
37 }
38}
39
40#[async_trait]
41impl Service for CommandViewerService {
42 /// This method does nothing meaningful. It only logs the action received and returns a placeholder result.
43 async fn run(&self, _action: Value, _write: WebSocketWrite) -> (i32, String) {
44 info!("CommandViewerService: run method called, but no action is performed.");
45 (0, "No action performed".to_string())
46 }
47
48 /// Allows dynamic casting to `Any` for type-safe downcasting.
49 fn as_any(&self) -> &dyn Any {
50 self
51 }
52
53 /// Stops the service. Since this service does nothing, the method just logs a message.
54 fn stop_service(&self) {
55 info!("CommandViewerService: stop_service called, but nothing to stop.");
56 }
57
58 /// Returns the version of the CommandViewerService module.
59 fn get_version(&self) -> String {
60 COMMAND_VIEWER_VERSION.to_string()
61 }
62}
63fn current_timestamp() -> u64 {
64 //fix panic
65 match SystemTime::now().duration_since(UNIX_EPOCH) {
66 Ok(n) => n.as_secs(),
67 Err(_) => {
68 return 0
69 }
70 }
71}
72
73const MAX_COMMANDS_PER_PRINTER: usize = 45;
74// Estructura para una comanda
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct Command {
77 pub id: String,
78 pub order_no: String,
79 pub html: String,
80 pub archived: bool,
81 pub printer: String,
82 pub timestamp: Option<u64>,
83}
84
85// Archivo de persistencia
86const PERSISTENCE_FILE: &str = "commands.json";
87pub struct KitchenState {
88 pub commands: Mutex<HashMap<String, Vec<Command>>>, // GUID -> Comandas
89 pub connections: Mutex<HashMap<u64, tokio::sync::mpsc::UnboundedSender<Message>>>, // Connection ID -> Sender
90 pub subscriptions: Mutex<HashMap<String, Vec<u64>>>, // GUID -> Connection IDs
91 pub config_env: ConfigEnv,
92}
93
94pub type SharedKitchenState = Arc<KitchenState>;
95
96/// Agregar una nueva comanda a la impresora correspondiente
97async fn add_command(command: Command, state: SharedKitchenState) -> Result<(), String> {
98 let printer = command.printer.clone();
99
100 // Si la impresora es "CommandViewer", saltamos la verificación de suscriptores
101 if printer != "CommandViewer" {
102 let subscriptions = state.subscriptions.lock().await;
103 if let Some(connection_ids) = subscriptions.get(&printer) {
104 if connection_ids.is_empty() {
105 info!(
106 "Dont save command because there are no subscribers for printer: {}",
107 printer
108 );
109 return Err(format!(
110 "There are no subscribers for printer: {}",
111 printer
112 ));
113 }
114 } else {
115 let commands = state.commands.lock().await;
116 if !commands.contains_key(&printer) {
117 info!(
118 "Dont save command because there are no subscribers for printer: {}",
119 printer
120 );
121 return Err(format!(
122 "There are no subscribers for printer: {}",
123 printer
124 ));
125 }
126 }
127 drop(subscriptions);
128 }
129 let mut commands = state.commands.lock().await;
130
131 let printer_commands = commands
132 .entry(command.printer.clone())
133 .or_insert_with(Vec::new);
134
135 printer_commands.push(command);
136 printer_commands.sort_by_key(|c| c.timestamp.unwrap_or(0));
137 tracing::warn!("comandas para esa impresra: {}", printer_commands.len());
138
139 // Limitar a un máximo de 200 comandas
140 if printer_commands.len() > MAX_COMMANDS_PER_PRINTER {
141 let drain_amount = printer_commands.len() / 3; // delete 25% of the oldest commands
142 tracing::warn!("Draining {}", drain_amount);
143 printer_commands.drain(0..drain_amount); // FIFO
144
145 //send lenArchivedCommand to all clients
146 let response = serde_json::json!({
147 "status": "lenArchivedCommand",
148 "printer": printer.clone(),
149 "len": printer_commands.len(),
150 });
151 broadcast_to_clients(Message::Text(response.to_string()), &printer, state.clone()).await;
152 tracing::warn!("SEND ALL CLIENTS {}", printer_commands.len());
153 }
154
155 save_commands_to_disk(&*commands).await;
156
157 Ok(())
158}
159
160/// Load persistent commands from disk
161async fn load_commands_from_disk() -> HashMap<String, Vec<Command>> {
162 if let Ok(data) = std::fs::read_to_string(PERSISTENCE_FILE) {
163 if let Ok(mut commands) = serde_json::from_str::<HashMap<String, Vec<Command>>>(&data) {
164 for printer_commands in commands.values_mut() {
165 for command in printer_commands.iter_mut() {
166 // Asigna un timestamp si no existe
167 if command.timestamp.is_none() || command.timestamp == Some(0) {
168 command.timestamp = Some(current_timestamp());
169 }
170 }
171 }
172 return commands;
173 }
174 }
175 HashMap::new()
176}
177
178/// Save commands to disk
179async fn save_commands_to_disk(commands: &HashMap<String, Vec<Command>>) {
180 if let Ok(json) = serde_json::to_string_pretty(commands) {
181 if let Err(e) = std::fs::write(PERSISTENCE_FILE, json) {
182 error!("Error saving commands to disk: {}", e);
183 }
184 }
185}
186
187/// GET /commands
188async fn list_commands(state: web::Data<SharedKitchenState>) -> impl Responder {
189 let commands = state.commands.lock().await;
190 HttpResponse::Ok().json(&*commands)
191}
192
193/// incude all static files in the binary
194const INDEX_HTML: &str = include_str!("../src/static/index.html");
195const APP_JS: &str = include_str!("../src/static/app.js");
196const STYLE_CSS: &str = include_str!("../src/static/style.css");
197pub const FAVICON_ICO: &[u8] = include_bytes!("../src/static/favicon.ico");
198
199/// Configuration and start of the Actix HTTP server
200async fn start_http_server(addr: &str, state: SharedKitchenState) -> std::io::Result<()> {
201 HttpServer::new(move || {
202 App::new()
203 .app_data(web::Data::new(state.clone())) // Compartir el estado
204 .wrap(Logger::default()) // Logging
205 // Servir el contenido embebido para HTML, JS, y CSS
206 .route(
207 "/",
208 web::get().to(|| async {
209 HttpResponse::Ok()
210 .content_type("text/html")
211 .body(INDEX_HTML)
212 }),
213 )
214 .route(
215 "/app.js",
216 web::get().to(|| async {
217 HttpResponse::Ok()
218 .content_type("application/javascript")
219 .body(APP_JS)
220 }),
221 )
222 .route(
223 "/style.css",
224 web::get()
225 .to(|| async { HttpResponse::Ok().content_type("text/css").body(STYLE_CSS) }),
226 )
227 .route(
228 "/favicon.ico", // Ruta para favicon.ico
229 web::get().to(|| async {
230 HttpResponse::Ok()
231 .content_type("image/x-icon")
232 .body(FAVICON_ICO)
233 }),
234 )
235 .route("/commands", web::get().to(list_commands)) // Listar todas las comandas
236 })
237 .bind(addr)? // Dirección para el servidor
238 .run() // Ejecutar el servidor
239 .await
240}
241
242static CONNECTION_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
243
244/// process a WebSocket connection
245async fn process_websocket_connection(
246 ws_stream: tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
247 state: SharedKitchenState,
248) {
249 let (mut write, mut read) = ws_stream.split();
250 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
251 let connection_id = CONNECTION_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
252
253 {
254 let mut connections = state.connections.lock().await;
255 connections.insert(connection_id, tx.clone());
256 }
257
258 let send_task = tokio::spawn(async move {
259 let mut rx = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
260 while let Some(msg) = rx.next().await {
261 if let Err(e) = write.send(msg).await {
262 error!("Error sending message to client: {}", e);
263 break;
264 }
265 }
266 });
267
268 while let Some(Ok(msg)) = read.next().await {
269 if let Message::Text(txt) = msg {
270 match serde_json::from_str::<serde_json::Value>(&txt) {
271 Ok(json) => {
272 debug!("Mensaje WebSocket recibido: {:?}", json);
273 if let Some(action) = json.get("action").and_then(|v| v.as_str()) {
274 match action {
275 "getCommands" => {
276 let commands = state.commands.lock().await.clone();
277 let response = serde_json::json!({
278 "status": "success",
279 "commands": commands
280 });
281 let _ = tx.send(Message::Text(response.to_string()));
282 }
283 "sendPdfPrint" => {
284 info!("sendPdfPrint action received: {:?}", json);
285
286 let guid = json.get("guid").and_then(|v| v.as_str());
287 if let Some(printers) =
288 json.get("printers").and_then(|v| v.as_array())
289 {
290 info!("Printers to send to: {:?}", printers);
291
292 for printer in printers {
293 if let Some(printer_name) = printer.as_str() {
294 info!("Processing printer: {}", printer_name);
295
296 if let Some(html) =
297 json.get("html").and_then(|v| v.as_str())
298 {
299 info!(
300 "HTML content found for printer {}",
301 printer_name
302 );
303
304 // Serializar el campo ACTION como una cadena JSON
305 let action_object = serde_json::json!({
306 printer_name: {
307 "print": html.to_string(),
308 "copies": 1,
309 "open": false,
310 }
311 });
312
313 let action_serialized = action_object.to_string();
314
315 // Construir el mensaje completo
316 let message = serde_json::json!({
317 "UUIDV4": guid,
318 "TARGET": "PRINT",
319 "ACTION": action_serialized // Serializado como cadena
320 });
321
322 debug!(
323 "Message to send to printsvc: {:?}",
324 message
325 );
326
327 match send_to_printsvc(&state.config_env, message)
328 .await
329 {
330 Ok(response) => {
331 info!(
332 "Successfully sent to printsvc: {:?}",
333 response
334 );
335 let _ = tx.send(Message::Text(
336 response.to_string(),
337 ));
338 }
339 Err(e) => {
340 error!("Error sending to printsvc for printer {}: {}", printer_name, e);
341 let error_response = serde_json::json!({
342 "status": "error",
343 "message": format!("Failed to send to printer {}: {}", printer_name, e),
344 });
345 let _ = tx.send(Message::Text(
346 error_response.to_string(),
347 ));
348 }
349 }
350 } else {
351 error!(
352 "Missing HTML content for printer {}",
353 printer_name
354 );
355 }
356 } else {
357 error!(
358 "Invalid printer name in printers array: {:?}",
359 printer
360 );
361 }
362 }
363 } else {
364 error!("Missing or invalid printers array in sendPdfPrint message: {:?}", json);
365 }
366 }
367
368 "subscribe" => {
369 // Agregar traza para el mensaje recibido
370 info!("Procesando mensaje de suscripción: {:?}", json);
371
372 if let Some(printers) =
373 json.get("printers").and_then(|v| v.as_array())
374 {
375 // Trazar las impresoras enviadas en el mensaje
376 info!(
377 "Lista de impresoras recibida para suscripción: {:?}",
378 printers
379 );
380
381 if let Some(guid) = json.get("guid").and_then(|v| v.as_str()) {
382 let mut subscriptions = state.subscriptions.lock().await;
383
384 // Trazar estado actual de suscripciones antes de procesar
385 info!(
386 "Suscripciones actuales antes de agregar: {:?}",
387 *subscriptions
388 );
389
390 // Iterar sobre cada impresora y procesar la suscripción
391 for printer in printers {
392 if let Some(printer_name) = printer.as_str() {
393 info!(
394 "Agregando conexión {} a la impresora: {}",
395 connection_id, printer_name
396 );
397
398 // Agregar conexión a la lista de suscriptores de la impresora
399 subscriptions
400 .entry(printer_name.to_string())
401 .or_insert_with(Vec::new)
402 .push(connection_id);
403
404 // Confirmar que se ha agregado
405 info!(
406 "Conexión {} suscrita correctamente a la impresora: {}",
407 connection_id, printer_name
408 );
409 } else {
410 // Si algún valor en la lista no es válido
411 error!(
412 "Impresora inválida detectada en el array: {:?}",
413 printer
414 );
415 }
416 }
417
418 // Trazar estado de las suscripciones después de agregar
419 info!(
420 "Suscripciones actuales después de agregar: {:?}",
421 *subscriptions
422 );
423
424 // Enviar respuesta de éxito al cliente
425 let response = serde_json::json!({
426 "status": "success",
427 "message": format!(
428 "Subscribed to printers: {:?} with GUID: {}",
429 printers, guid
430 ),
431 });
432
433 let _ = tx.send(Message::Text(response.to_string()));
434 } else {
435 // Log para un GUID faltante
436 error!(
437 "Campo 'guid' faltante en el mensaje de suscripción."
438 );
439 let error_response = serde_json::json!({
440 "status": "error",
441 "message": "Missing 'guid' field in subscription",
442 });
443
444 let _ = tx.send(Message::Text(error_response.to_string()));
445 }
446 } else {
447 // Log para impresoras faltantes o inválidas
448 error!(
449 "Campo 'printers' faltante o inválido en el mensaje: {:?}",
450 json
451 );
452 let error_response = serde_json::json!({
453 "status": "error",
454 "message": "Missing or invalid 'printers' field in subscription",
455 });
456
457 let _ = tx.send(Message::Text(error_response.to_string()));
458 }
459 }
460 "getPrinters" => {
461 let message = serde_json::json!({
462 "UUIDV4": uuid::Uuid::new_v4().to_string(),
463 "TARGET": "PRINT",
464 "ACTION": "getPrinters",
465 });
466
467 match send_to_printsvc(&state.config_env, message).await {
468 Ok(response) => {
469 let _ = tx.send(Message::Text(response.to_string()));
470 }
471 Err(e) => {
472 let error_response = serde_json::json!({
473 "status": "error",
474 "message": format!("Failed to get printers: {}", e),
475 });
476 let _ = tx.send(Message::Text(error_response.to_string()));
477 }
478 }
479 }
480 "updateCommandArchive" => {
481 if let Some(id) = json.get("id").and_then(|v| v.as_str()) {
482 if let Some(archived) =
483 json.get("archived").and_then(|v| v.as_bool())
484 {
485 let updated_html =
486 json.get("html").and_then(|v| v.as_str());
487 let mut commands = state.commands.lock().await;
488
489 // Buscar y actualizar la comanda en todas las listas
490 for (guid, command_list) in commands.iter_mut() {
491 if let Some(command) =
492 command_list.iter_mut().find(|cmd| cmd.id == id)
493 {
494 command.archived = archived;
495
496 // Actualiza el HTML si se proporciona
497 if let Some(html) = updated_html {
498 command.html = html.to_string();
499 }
500
501 let affected_guid = Some(guid.clone());
502 if let Some(guid) = affected_guid {
503 // Notificar a los suscriptores del GUID
504 let response = serde_json::json!({
505 "status": "updateCommand",
506 "id": id,
507 "archived": archived,
508 "html": command.html, // Enviar HTML actualizado
509 "timestamp": command.timestamp,
510 });
511
512 broadcast_to_clients_excluding(
513 Message::Text(response.to_string()),
514 &guid, // GUID del cliente que actualizó
515 &connection_id, // Excluye este connection_id
516 state.clone(),
517 )
518 .await;
519 } else {
520 // Si no se encontró la comanda
521 let error_response = serde_json::json!({
522 "status": "error",
523 "message": "Command not found",
524 });
525 let _ = tx.send(Message::Text(
526 error_response.to_string(),
527 ));
528 }
529 break;
530 }
531 }
532
533 // Guardar cambios en disco después de liberar el préstamo mutable
534 let commands_to_save = commands.clone(); // Clonar antes de liberar el MutexGuard
535 drop(commands);
536 save_commands_to_disk(&commands_to_save).await;
537 info!("Comanda actualizada: {}", id);
538 }
539 }
540 }
541 "lenArchivedCommand" => {
542 //recorrer las impresoras y mandar el len de cada una de ellas
543 let mut commands = state.commands.lock().await;
544 for (printer, command_list) in commands.iter_mut() {
545 let mut archived = 0;
546 for command in command_list.iter() {
547 if command.archived {
548 archived += 1;
549 }
550 }
551 let response = serde_json::json!({
552 "status": "lenArchivedCommand",
553 "printer": printer,
554 "len": archived,
555 });
556 broadcast_to_clients(
557 Message::Text(response.to_string()),
558 &printer,
559 state.clone(),
560 )
561 .await;
562 }
563 }
564 "deleteCommand" => {
565 if let Some(command_id) = json.get("id").and_then(|v| v.as_str()) {
566 let mut commands = state.commands.lock().await;
567
568 // Buscar la comanda en todas las listas de suscriptores
569 let mut found_printer: Option<String> = None;
570 for (_guid, command_list) in commands.iter_mut() {
571 if let Some(pos) =
572 command_list.iter().position(|c| c.id == command_id)
573 {
574 // Obtener la impresora asociada a la comanda
575 found_printer = Some(command_list[pos].printer.clone());
576 command_list.remove(pos);
577 break;
578 }
579 }
580
581 if let Some(printer) = found_printer {
582 save_commands_to_disk(&commands).await;
583
584 // Confirmación al cliente que solicitó la acción
585 let response = serde_json::json!({
586 "status": "success",
587 "message": "commandDeleted",
588 "id": command_id,
589 });
590
591 // Notificar a los clientes suscritos a la impresora correspondiente
592 broadcast_to_clients(
593 Message::Text(response.to_string()),
594 &printer,
595 state.clone(),
596 )
597 .await;
598 } else {
599 // Manejar caso de comanda no encontrada
600 let response = serde_json::json!({
601 "status": "error",
602 "message": "Command not found"
603 });
604
605 let _ = tx.send(Message::Text(response.to_string()));
606 }
607 } else {
608 error!("ID de comanda no encontrado en el mensaje: {}", txt);
609 }
610 }
611 "addCommand" => {
612 if let Some(command) = json
613 .get("data")
614 .and_then(|v| serde_json::from_value::<Command>(v.clone()).ok())
615 {
616 // Validar el contenido de la comanda (HTML)
617 if let Some(title_start) = command.html.find("<title>") {
618 if let Some(title_end) = command.html.find("</title>") {
619 let new_id =
620 &command.html[(title_start + 7)..title_end];
621
622 // Verificar que sea una comanda válida
623 // if command.html.contains("Factura simplificada") {
624 // info!("No es una comanda válida. Ignorada.");
625 // return;
626 // }
627
628 info!(
629 "Nueva comanda recibida con ID del título: {:?}",
630 new_id
631 );
632
633 // Crear la comanda actualizada con el nuevo ID
634 let updated_command = Command {
635 id: command.id.clone(),
636 order_no: new_id.to_string(),
637 html: command.html.clone(),
638 archived: false,
639 printer: command.printer.clone(),
640 timestamp: Some(current_timestamp()),
641 };
642
643 // Agregar la comanda utilizando el método con FIFO
644 if let Err(e) =
645 add_command(updated_command.clone(), state.clone())
646 .await
647 {
648 error!("Error al agregar la comanda: {}", e);
649 }
650
651 // Notificar a los clientes suscritos
652 let broadcast_message = Message::Text(
653 serde_json::json!({
654 "status": "newCommand",
655 "id": updated_command.id,
656 "html": updated_command.html,
657 "timestamp": updated_command.timestamp,
658 })
659 .to_string(),
660 );
661
662 let subscriptions = state.subscriptions.lock().await;
663 info!("Suscripciones actuales: {:?}", subscriptions);
664
665 if let Some(connection_ids) =
666 subscriptions.get(&updated_command.printer)
667 {
668 info!(
669 "Conexiones suscritas a {}: {:?}",
670 updated_command.printer, connection_ids
671 );
672 for connection_id in connection_ids {
673 info!(
674 "Intentando enviar a conexión: {}",
675 connection_id
676 );
677 if let Some(tx) = state
678 .connections
679 .lock()
680 .await
681 .get(connection_id)
682 {
683 if tx
684 .send(broadcast_message.clone())
685 .is_err()
686 {
687 error!("Error enviando mensaje a la conexión: {}", connection_id);
688 } else {
689 info!(
690 "Mensaje enviado a conexión: {}",
691 connection_id
692 );
693 }
694 }
695 }
696 } else {
697 error!(
698 "No hay conexiones suscritas a la impresora: {}",
699 updated_command.printer
700 );
701 }
702 } else {
703 error!("Error: No se encontró el cierre del tag <title> en el HTML.");
704 }
705 } else {
706 error!("Error: No se encontró el tag <title> en el HTML.");
707 }
708 } else {
709 error!("Error: Formato de comando inválido o datos faltantes.");
710 }
711 }
712
713 _ => error!("Acción desconocida: {}", action),
714 }
715 }
716 }
717 Err(e) => error!("Error procesando el mensaje WebSocket: {}", e),
718 }
719 }
720 }
721
722 {
723 let mut connections = state.connections.lock().await;
724 connections.remove(&connection_id);
725 info!("Connection {} removed", connection_id);
726 }
727 {
728 let mut subscriptions = state.subscriptions.lock().await;
729
730 for (_printer, connection_ids) in subscriptions.iter_mut() {
731 connection_ids.retain(|&id| id != connection_id);
732 }
733
734 subscriptions.retain(|_, v| !v.is_empty());
735
736 info!(
737 "Connection {} unsubscribed from all printers",
738 connection_id
739 );
740 }
741
742 let _ = send_task.await;
743}
744
745async fn send_to_printsvc(
746 config: &ConfigEnv,
747 message: serde_json::Value,
748) -> Result<serde_json::Value, String> {
749 // When TLS is on the embedded cert is issued for `local.unpispas.es`,
750 // not for the IP we bind to (`config.local_host` is usually 127.0.0.1).
751 // Use the cert hostname so verification passes — DNS still resolves to 127.0.0.1.
752 let (proto, host) = if config.local_ussl {
753 ("wss", "local.unpispas.es")
754 } else {
755 ("ws", config.local_host.as_str())
756 };
757 let url = format!("{}://{}:{}", proto, host, config.local_port);
758 info!("Connecting to printsvc at {}", url);
759 match connect_async(url).await {
760 Ok((mut ws_stream, _)) => {
761 // Enviar el mensaje
762 let msg_text = serde_json::to_string(&message)
763 .map_err(|e| format!("Error serializing message to JSON: {}", e))?;
764
765 ws_stream
766 .send(Message::Text(msg_text))
767 .await
768 .map_err(|e| e.to_string())?;
769
770 // Esperar la respuesta
771 if let Some(Ok(Message::Text(response_text))) = ws_stream.next().await {
772 serde_json::from_str(&response_text).map_err(|e| e.to_string())
773 } else {
774 Err("No response from printsvc".to_string())
775 }
776 }
777 Err(e) => Err(format!("Failed to connect to printsvc: {}", e)),
778 }
779}
780
781/// Configuración y arranque del servidor WebSocket
782async fn start_websocket_server(addr: &str, state: SharedKitchenState) -> tokio::io::Result<()> {
783 let listener = TcpListener::bind(addr).await?;
784 info!("WebSocket server running on {}", addr);
785
786 while let Ok((stream, _)) = listener.accept().await {
787 let state = Arc::clone(&state);
788 tokio::spawn(async move {
789 if let Ok(ws_stream) = accept_async(stream).await {
790 process_websocket_connection(ws_stream, state).await;
791 }
792 });
793 }
794
795 Ok(())
796}
797
798/// Arranque combinado de HTTP y WebSocket
799pub async fn start_kitchen_server() -> std::io::Result<()> {
800 let state = Arc::new(KitchenState {
801 commands: Mutex::new(load_commands_from_disk().await),
802 connections: Mutex::new(HashMap::new()),
803 subscriptions: Mutex::new(HashMap::new()),
804 config_env: ConfigEnv::load(),
805 });
806
807 // Levantar ambos servidores en paralelo
808 tokio::try_join!(
809 start_http_server("0.0.0.0:80", state.clone()),
810 start_websocket_server("0.0.0.0:9001", state),
811 )?;
812
813 Ok(())
814}
815async fn broadcast_to_clients_excluding(
816 message: Message,
817 printer: &str,
818 exclude_connection_id: &u64, // Nuevo parámetro para excluir un cliente
819 state: SharedKitchenState,
820) {
821 let connections = state.connections.lock().await;
822 let subscriptions = state.subscriptions.lock().await;
823
824 if let Some(connection_ids) = subscriptions.get(printer) {
825 for connection_id in connection_ids {
826 if *connection_id != *exclude_connection_id {
827 if let Some(tx) = connections.get(connection_id) {
828 if tx.send(message.clone()).is_err() {
829 error!("Failed to send message to connection {}", connection_id);
830 } else {
831 info!("Message sent to connection {}", connection_id);
832 }
833 }
834 }
835 }
836 } else {
837 info!("No hay suscriptores asociados a la impresora: {}", printer);
838 }
839}
840
841async fn broadcast_to_clients(message: Message, printer: &str, state: SharedKitchenState) {
842 let connections = state.connections.lock().await;
843 let subscriptions = state.subscriptions.lock().await;
844
845 if let Some(connection_ids) = subscriptions.get(printer) {
846 for connection_id in connection_ids {
847 if let Some(tx) = connections.get(connection_id) {
848 if tx.send(message.clone()).is_err() {
849 error!("Failed to send message to connection {}", connection_id);
850 } else {
851 info!("Message sent to connection {}", connection_id);
852 }
853 }
854 }
855 } else {
856 info!("No hay suscriptores asociados a la impresora: {}", printer);
857 }
858}