pispas_modules/
mypos.rs

1use std::process::{Child, Command};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use serde_json::{json, Value};
5use std::sync::{
6    atomic::{AtomicBool, Ordering},
7    Arc,
8};
9use tokio::sync::{Mutex, oneshot};
10use tokio::time::{Duration, timeout, sleep};
11use crate::service::{Service, WebSocketWrite};
12use easy_trace::prelude::{info, error, warn};
13use tokio_tungstenite::{connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream};
14use futures_util::{stream::SplitSink, SinkExt, StreamExt, stream::SplitStream};
15use tokio::net::TcpStream;
16
17// Imports específicos de Windows
18#[cfg(target_os = "windows")]
19use {sharing::MYPOS_SERVER_FILE, sharing::proc::kill_process_by_name};
20
21/// MyPOS service version
22pub const MYPOS_VERSION: &str = "1.1.0";
23
24/// Número máximo de reintentos para operaciones
25const MAX_RETRIES: u32 = 3;
26
27/// Tiempo de espera entre reintentos (en segundos)
28const RETRY_DELAY_SECS: u64 = 2;
29
30/// Intervalo de verificación del watchdog (en segundos)
31const WATCHDOG_CHECK_INTERVAL_SECS: u64 = 5;
32
33/// Tiempo de espera antes de reiniciar el servidor después de un fallo (en segundos)
34const RESTART_DELAY_SECS: u64 = 3;
35
36/// Tiempo máximo de espera para la conexión WebSocket (en segundos)
37const WEBSOCKET_CONNECT_TIMEOUT_SECS: u64 = 10;
38
39#[derive(Debug, Serialize, Deserialize)]
40struct ServerResponse {
41    status: String,
42    message: Option<String>,
43}
44
45type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
46type WsReader = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
47
48/// MyPOS Service implementation with automatic retries and watchdog monitoring
49pub struct MyPOSService {
50    /// Handle for the `mypos_server.exe` process (solo Windows).
51    #[cfg(target_os = "windows")]
52    child_process: Arc<Mutex<Option<Child>>>,
53
54    /// State indicating if the service is running.
55    running: Arc<AtomicBool>,
56
57    /// State indicating if watchdog should be active
58    watchdog_enabled: Arc<AtomicBool>,
59
60    /// WebSocket writer for communicating with the Python server
61    ws_writer: Arc<Mutex<Option<WsWriter>>>,
62
63    /// Channel for sending responses (store the sender, not receiver)
64    response_sender: Arc<Mutex<Option<oneshot::Sender<ServerResponse>>>>,
65
66    /// Counter for restart attempts
67    restart_count: Arc<Mutex<u32>>,
68}
69
70impl MyPOSService {
71    /// Creates a new instance of the MyPOSService.
72    pub fn new() -> Self {
73        MyPOSService {
74            #[cfg(target_os = "windows")]
75            child_process: Arc::new(Mutex::new(None)),
76            running: Arc::new(AtomicBool::new(false)),
77            watchdog_enabled: Arc::new(AtomicBool::new(false)),
78            ws_writer: Arc::new(Mutex::new(None)),
79            response_sender: Arc::new(Mutex::new(None)),
80            restart_count: Arc::new(Mutex::new(0)),
81        }
82    }
83
84    // ========================================================================
85    // SECCIÓN: GESTIÓN DEL PROCESO Y CONEXIÓN
86    // ========================================================================
87
88    /// Launches the MyPOS server process (Windows only)
89    #[cfg(target_os = "windows")]
90    async fn launch_process(&self) -> Result<(), String> {
91        let exe_path = sharing::get_path_mypos_server();
92        info!("Launching MyPOS server at: {}", exe_path);
93
94        let mut guard = self.child_process.lock().await;
95
96        match Command::new(exe_path).spawn() {
97            Ok(child) => {
98                *guard = Some(child);
99                info!("MyPOS server process started successfully.");
100                Ok(())
101            }
102            Err(e) => {
103                error!("Failed to start MyPOS server: {:?}", e);
104                Err(format!("Failed to start process: {}", e))
105            }
106        }
107    }
108
109    /// Connects to the WebSocket server with timeout
110    async fn connect_websocket(&self) -> Result<(), String> {
111        let url = "ws://127.0.0.1:8765";
112        info!("Connecting to WebSocket at: {}", url);
113
114        let connect_future = connect_async(url);
115
116        let (ws_stream, _) = match timeout(
117            Duration::from_secs(WEBSOCKET_CONNECT_TIMEOUT_SECS),
118            connect_future
119        ).await {
120            Ok(Ok(result)) => result,
121            Ok(Err(e)) => return Err(format!("Failed to connect to WebSocket: {}", e)),
122            Err(_) => return Err("WebSocket connection timeout".to_string()),
123        };
124
125        let (write, read) = ws_stream.split();
126
127        // Store the writer
128        *self.ws_writer.lock().await = Some(write);
129
130        // Start reading responses
131        let response_sender = self.response_sender.clone();
132        tokio::spawn(async move {
133            Self::handle_websocket_responses(read, response_sender).await;
134        });
135
136        info!("Connected to WebSocket successfully");
137        Ok(())
138    }
139
140    /// Handles incoming WebSocket responses
141    async fn handle_websocket_responses(
142        mut read: WsReader,
143        response_sender: Arc<Mutex<Option<oneshot::Sender<ServerResponse>>>>
144    ) {
145        while let Some(msg_result) = read.next().await {
146            match msg_result {
147                Ok(Message::Text(response)) => {
148                    if let Ok(server_response) = serde_json::from_str::<ServerResponse>(&response) {
149                        info!("Received response: {:?}", server_response);
150
151                        // Send to the current waiting sender
152                        let mut sender_guard = response_sender.lock().await;
153                        if let Some(sender) = sender_guard.take() {
154                            if sender.send(server_response).is_err() {
155                                warn!("Failed to send response to waiting transaction");
156                            }
157                        }
158                    } else {
159                        warn!("Received unexpected response: {}", response);
160                    }
161                }
162                Ok(Message::Close(_)) => {
163                    warn!("WebSocket connection closed");
164                    break;
165                }
166                Err(e) => {
167                    error!("WebSocket error: {:?}", e);
168                    break;
169                }
170                _ => {}
171            }
172        }
173        info!("WebSocket response handler terminated");
174    }
175
176    // ========================================================================
177    // SECCIÓN: WATCHDOG Y MONITOREO
178    // ========================================================================
179
180    /// Launches and monitors the MyPOS server with watchdog (Windows)
181    #[cfg(target_os = "windows")]
182    async fn launch_and_monitor(&self) {
183        // Habilitar watchdog
184        self.watchdog_enabled.store(true, Ordering::Relaxed);
185
186        // Intentar lanzar el proceso
187        if let Err(e) = self.launch_process().await {
188            error!("Failed to launch process: {}", e);
189            return;
190        }
191
192        self.running.store(true, Ordering::Relaxed);
193
194        // Wait for the server to start
195        sleep(Duration::from_secs(RESTART_DELAY_SECS)).await;
196
197        // Connect to WebSocket
198        if let Err(e) = self.connect_websocket().await {
199            error!("Failed to connect to WebSocket: {:?}", e);
200            return;
201        }
202
203        // Iniciar watchdog
204        self.start_watchdog().await;
205    }
206
207    /// Launches connection monitoring for macOS (no process management)
208    #[cfg(target_os = "macos")]
209    async fn launch_and_monitor(&self) {
210        info!("macOS: Skipping process launch, connecting to existing WebSocket server...");
211
212        self.running.store(true, Ordering::Relaxed);
213        self.watchdog_enabled.store(true, Ordering::Relaxed);
214
215        // Intentar conectar al WebSocket
216        if let Err(e) = self.connect_websocket().await {
217            error!("Failed to connect to WebSocket: {:?}", e);
218            self.running.store(false, Ordering::Relaxed);
219            return;
220        }
221
222        info!("macOS: Connected to existing WebSocket server");
223
224        // Iniciar watchdog solo para monitorear la conexión
225        self.start_watchdog().await;
226    }
227
228    /// Starts the watchdog that monitors the process and connection health
229    async fn start_watchdog(&self) {
230        let running = self.running.clone();
231        let watchdog_enabled = self.watchdog_enabled.clone();
232        let ws_writer = self.ws_writer.clone();
233        let restart_count = self.restart_count.clone();
234        let response_sender = self.response_sender.clone();
235
236        #[cfg(target_os = "windows")]
237        let child_process = self.child_process.clone();
238
239        tokio::spawn(async move {
240            info!("Watchdog started");
241
242            while watchdog_enabled.load(Ordering::Relaxed) {
243                sleep(Duration::from_secs(WATCHDOG_CHECK_INTERVAL_SECS)).await;
244
245                if !watchdog_enabled.load(Ordering::Relaxed) {
246                    info!("Watchdog disabled, stopping...");
247                    break;
248                }
249
250                let mut needs_restart = false;
251                let mut process_died = false;
252
253                // Verificar si el proceso sigue corriendo (solo Windows)
254                #[cfg(target_os = "windows")]
255                {
256                    let mut child_guard = child_process.lock().await;
257
258                    if let Some(child) = child_guard.as_mut() {
259                        match child.try_wait() {
260                            Ok(Some(status)) => {
261                                // Proceso murió
262                                error!("MyPOS server process exited with status: {:?}", status);
263                                *child_guard = None;
264                                drop(child_guard);
265                                needs_restart = true;
266                                process_died = true;
267                            }
268                            Ok(None) => {
269                                // Proceso sigue corriendo
270                                drop(child_guard);
271                            }
272                            Err(e) => {
273                                // Error al verificar estado
274                                error!("Error checking process status: {:?}", e);
275                                *child_guard = None;
276                                drop(child_guard);
277                                needs_restart = true;
278                                process_died = true;
279                            }
280                        }
281                    } else {
282                        // No hay proceso hijo
283                        drop(child_guard);
284                        if running.load(Ordering::Relaxed) {
285                            warn!("Process handle lost, attempting restart");
286                            needs_restart = true;
287                            process_died = true;
288                        }
289                    }
290                }
291
292                // Verificar si la conexión WebSocket está activa
293                if ws_writer.lock().await.is_none() && running.load(Ordering::Relaxed) {
294                    warn!("WebSocket connection lost");
295                    needs_restart = true;
296                }
297
298                // Si necesita reiniciar
299                if needs_restart {
300                    let mut count = restart_count.lock().await;
301                    *count += 1;
302                    let attempt = *count;
303                    drop(count);
304
305                    warn!("🔄 Watchdog detected failure. Restart attempt #{}", attempt);
306
307                    // Limpiar estado
308                    *ws_writer.lock().await = None;
309                    running.store(false, Ordering::Relaxed);
310
311                    // Reintentar solo en Windows
312                    #[cfg(target_os = "windows")]
313                    {
314                        info!("Watchdog: Attempting to restart MyPOS server...");
315
316                        // Si el proceso murió, limpiar procesos residuales
317                        if process_died {
318                            info!("Watchdog: Cleaning up residual processes...");
319                            kill_process_by_name(MYPOS_SERVER_FILE);
320                            sleep(Duration::from_millis(500)).await;
321                        }
322
323                        // Esperar antes de reintentar
324                        sleep(Duration::from_secs(RESTART_DELAY_SECS)).await;
325
326                        // Lanzar proceso nuevamente
327                        let exe_path = sharing::get_path_mypos_server();
328                        info!("Watchdog: Spawning new process at: {}", exe_path);
329
330                        match Command::new(&exe_path).spawn() {
331                            Ok(child) => {
332                                *child_process.lock().await = Some(child);
333                                info!("Watchdog: ✓ Process restarted successfully");
334
335                                // Esperar a que el servidor inicie
336                                info!("Watchdog: Waiting {} seconds for server to start...", RESTART_DELAY_SECS);
337                                sleep(Duration::from_secs(RESTART_DELAY_SECS)).await;
338
339                                // Reconectar WebSocket
340                                info!("Watchdog: Attempting to reconnect WebSocket...");
341                                match Self::reconnect_websocket_static(ws_writer.clone(), response_sender.clone()).await {
342                                    Ok(_) => {
343                                        running.store(true, Ordering::Relaxed);
344                                        info!("Watchdog: ✓ WebSocket reconnected successfully");
345                                        info!("Watchdog: ✓ Full recovery completed on attempt #{}", attempt);
346                                    }
347                                    Err(e) => {
348                                        error!("Watchdog: ✗ Failed to reconnect WebSocket: {}", e);
349                                    }
350                                }
351                            }
352                            Err(e) => {
353                                error!("Watchdog: ✗ Failed to restart process: {:?}", e);
354                            }
355                        }
356                    }
357
358                    // En macOS solo intentar reconectar
359                    #[cfg(target_os = "macos")]
360                    {
361                        sleep(Duration::from_secs(RESTART_DELAY_SECS)).await;
362                        info!("Watchdog: Attempting to reconnect WebSocket...");
363                        match Self::reconnect_websocket_static(ws_writer.clone(), response_sender.clone()).await {
364                            Ok(_) => {
365                                running.store(true, Ordering::Relaxed);
366                                info!("Watchdog: WebSocket reconnected successfully");
367                            }
368                            Err(e) => {
369                                error!("Watchdog: Failed to reconnect: {}", e);
370                            }
371                        }
372                    }
373                }
374            }
375
376            info!("Watchdog stopped");
377        });
378    }
379
380    /// Helper estático para reconectar WebSocket desde el watchdog
381    async fn reconnect_websocket_static(
382        ws_writer: Arc<Mutex<Option<WsWriter>>>,
383        response_sender: Arc<Mutex<Option<oneshot::Sender<ServerResponse>>>>
384    ) -> Result<(), String> {
385        let url = "ws://127.0.0.1:8765";
386        info!("Reconnecting to WebSocket at: {}", url);
387
388        let connect_future = connect_async(url);
389        let (ws_stream, _) = match timeout(
390            Duration::from_secs(WEBSOCKET_CONNECT_TIMEOUT_SECS),
391            connect_future
392        ).await {
393            Ok(Ok(result)) => result,
394            Ok(Err(e)) => return Err(format!("Failed to reconnect: {}", e)),
395            Err(_) => return Err("Reconnection timeout".to_string()),
396        };
397
398        let (write, read) = ws_stream.split();
399        *ws_writer.lock().await = Some(write);
400
401        // Restart the WebSocket response handler
402        tokio::spawn(async move {
403            Self::handle_websocket_responses(read, response_sender).await;
404        });
405
406        info!("WebSocket reconnection completed");
407        Ok(())
408    }
409
410    // ========================================================================
411    // SECCIÓN: DETENCIÓN DEL SERVICIO
412    // ========================================================================
413
414    /// Stops the MyPOS server (Windows)
415    #[cfg(target_os = "windows")]
416    async fn stop(&self) {
417        info!("Stopping MyPOS service...");
418
419        // Deshabilitar watchdog primero
420        self.watchdog_enabled.store(false, Ordering::Relaxed);
421
422        // Esperar un momento para que el watchdog termine
423        sleep(Duration::from_millis(500)).await;
424
425        // Close WebSocket connection
426        *self.ws_writer.lock().await = None;
427
428        // Stop process
429        let mut guard = self.child_process.lock().await;
430        if let Some(mut child) = guard.take() {
431            if let Err(e) = child.kill() {
432                error!("Error killing MyPOS server: {:?}", e);
433            } else {
434                kill_process_by_name(MYPOS_SERVER_FILE);
435                info!("MyPOS server stopped.");
436            }
437        }
438
439        self.running.store(false, Ordering::Relaxed);
440        *self.restart_count.lock().await = 0;
441    }
442
443    /// Stops the service (macOS - only closes WebSocket)
444    #[cfg(target_os = "macos")]
445    async fn stop(&self) {
446        info!("macOS: Closing WebSocket connection...");
447
448        // Deshabilitar watchdog
449        self.watchdog_enabled.store(false, Ordering::Relaxed);
450
451        // Esperar un momento
452        sleep(Duration::from_millis(500)).await;
453
454        // Close WebSocket connection
455        *self.ws_writer.lock().await = None;
456
457        self.running.store(false, Ordering::Relaxed);
458        *self.restart_count.lock().await = 0;
459        info!("MyPOS WebSocket connection closed.");
460    }
461
462    // ========================================================================
463    // SECCIÓN: COMUNICACIÓN CON EL SERVIDOR
464    // ========================================================================
465
466    /// Sends a message to the WebSocket server and waits for response
467    async fn send_websocket_message(&self, message: Value) -> Result<ServerResponse, String> {
468        // Create a channel for the response
469        let (sender, receiver) = oneshot::channel();
470
471        // Store the sender for this transaction
472        {
473            let mut response_sender = self.response_sender.lock().await;
474            *response_sender = Some(sender);
475        }
476
477        // Send the message
478        {
479            let mut ws_writer = self.ws_writer.lock().await;
480            if let Some(writer) = ws_writer.as_mut() {
481                let msg = Message::Text(message.to_string());
482                if let Err(e) = writer.send(msg).await {
483                    return Err(format!("Failed to send WebSocket message: {}", e));
484                }
485            } else {
486                return Err("WebSocket not connected".to_string());
487            }
488        }
489
490        // Wait for response with timeout
491        match timeout(Duration::from_secs(60), receiver).await {
492            Ok(Ok(response)) => Ok(response),
493            Ok(Err(_)) => Err("Response channel closed".to_string()),
494            Err(_) => Err("Transaction timeout".to_string()),
495        }
496    }
497
498    // ========================================================================
499    // SECCIÓN: REINTENTOS AUTOMÁTICOS
500    // ========================================================================
501
502    /// Attempts a transaction with automatic retries
503    async fn attempt_transaction_with_retries(
504        &self,
505        action: &str,
506        amount: f64,
507    ) -> Result<ServerResponse, String> {
508        for attempt in 1..=MAX_RETRIES {
509            info!("Attempt {}/{} for {} of {:.2} EUR", attempt, MAX_RETRIES, action, amount);
510
511            let message = json!({
512                "action": action,
513                "amount": amount
514            });
515
516            match self.send_websocket_message(message).await {
517                Ok(response) => {
518                    let status_lower = response.status.to_lowercase();
519
520                    // Si es exitoso, retornar inmediatamente
521                    if status_lower == "success" {
522                        info!("Transaction successful on attempt {}", attempt);
523                        return Ok(response);
524                    }
525
526                    // Si fue cancelado por el usuario, no reintentar
527                    if status_lower == "usercancel" {
528                        warn!("Transaction cancelled by user, no retries");
529                        return Ok(response);
530                    }
531
532                    // Para otros errores, continuar con reintentos si quedan intentos
533                    if attempt < MAX_RETRIES {
534                        warn!("Attempt {} failed with status: {}. Retrying in {} seconds...",
535                              attempt, response.status, RETRY_DELAY_SECS);
536                        sleep(Duration::from_secs(RETRY_DELAY_SECS)).await;
537                    } else {
538                        error!("All {} attempts failed. Last status: {}", MAX_RETRIES, response.status);
539                        return Ok(response);
540                    }
541                }
542                Err(e) => {
543                    if attempt < MAX_RETRIES {
544                        warn!("Attempt {} failed with error: {}. Retrying in {} seconds...",
545                              attempt, e, RETRY_DELAY_SECS);
546                        sleep(Duration::from_secs(RETRY_DELAY_SECS)).await;
547                    } else {
548                        error!("All {} attempts failed. Last error: {}", MAX_RETRIES, e);
549                        return Err(e);
550                    }
551                }
552            }
553        }
554
555        Err("Maximum retries exceeded".to_string())
556    }
557
558    // ========================================================================
559    // SECCIÓN: MANEJO DE COMPRAS Y DEVOLUCIONES
560    // ========================================================================
561
562    /// Handles a purchase with automatic retries
563    async fn handle_purchase(&self, amount: f64, write: WebSocketWrite) -> (i32, String) {
564        info!("Handling purchase of: {:.2} EUR (with up to {} retries)", amount, MAX_RETRIES);
565
566        match self.attempt_transaction_with_retries("purchase", amount).await {
567            Ok(response) => {
568                let success = response.status.to_lowercase() == "success";
569                let message = response.message.unwrap_or_else(||
570                    if success {
571                        format!("Purchase of {:.2} EUR completed successfully", amount)
572                    } else {
573                        format!("Purchase of {:.2} EUR failed after {} attempts: {}",
574                                amount, MAX_RETRIES, response.status)
575                    }
576                );
577
578                // Send response to the main WebSocket client if available
579                if let Some(ws_lock) = &write {
580                    let response_msg = json!({
581                        "type": "purchase_result",
582                        "success": success,
583                        "amount": amount,
584                        "message": message,
585                        "status": response.status
586                    });
587
588                    let mut ws = ws_lock.write().await;
589                    if let Err(e) = ws.send(Message::Text(response_msg.to_string())).await {
590                        error!("Failed to send purchase result: {:?}", e);
591                    }
592                }
593
594                if success { (0, message) } else { (1, message) }
595            }
596            Err(e) => {
597                let error_msg = format!("Purchase failed after {} attempts: {}", MAX_RETRIES, e);
598                error!("{}", error_msg);
599                (1, error_msg)
600            }
601        }
602    }
603
604    /// Handles a refund with automatic retries
605    async fn handle_refund(&self, amount: f64, write: WebSocketWrite) -> (i32, String) {
606        info!("Handling refund of: {:.2} EUR (with up to {} retries)", amount, MAX_RETRIES);
607
608        match self.attempt_transaction_with_retries("refund", amount).await {
609            Ok(response) => {
610                let success = response.status.to_lowercase() == "success";
611                let message = response.message.unwrap_or_else(||
612                    if success {
613                        format!("Refund of {:.2} EUR completed successfully", amount)
614                    } else {
615                        format!("Refund of {:.2} EUR failed after {} attempts: {}",
616                                amount, MAX_RETRIES, response.status)
617                    }
618                );
619
620                // Send response to the main WebSocket client if available
621                if let Some(ws_lock) = &write {
622                    let response_msg = json!({
623                        "type": "refund_result",
624                        "success": success,
625                        "amount": amount,
626                        "message": message,
627                        "status": response.status
628                    });
629
630                    let mut ws = ws_lock.write().await;
631                    if let Err(e) = ws.send(Message::Text(response_msg.to_string())).await {
632                        error!("Failed to send refund result: {:?}", e);
633                    }
634                }
635
636                if success { (0, message) } else { (1, message) }
637            }
638            Err(e) => {
639                let error_msg = format!("Refund failed after {} attempts: {}", MAX_RETRIES, e);
640                error!("{}", error_msg);
641                (1, error_msg)
642            }
643        }
644    }
645}
646
647// ========================================================================
648// IMPLEMENTACIÓN DEL TRAIT SERVICE
649// ========================================================================
650
651#[async_trait]
652impl Service for MyPOSService {
653    /// Handles incoming service actions (START, STOP, PURCHASE, REFUND)
654    async fn run(&self, action: Value, write: WebSocketWrite) -> (i32, String) {
655        info!("MyPOSService: Running action: {:?}", action);
656
657        // Parse ACTION similar to ScaleService
658        let parsed_action = if let Some(action_str) = action.get("ACTION").and_then(|a| a.as_str()) {
659            // If ACTION is a string, parse it as JSON
660            serde_json::from_str::<Value>(action_str).unwrap_or_else(|_| {
661                // If parsing fails, treat the string as a simple command
662                json!({"command": action_str})
663            })
664        } else if let Some(action_obj) = action.get("ACTION") {
665            // If ACTION is already an object, use it directly
666            action_obj.clone()
667        } else {
668            // Fallback: use the action itself
669            action
670        };
671
672        let command = parsed_action
673            .get("command")
674            .and_then(|c| c.as_str())
675            .unwrap_or("UNKNOWN");
676
677        info!("MyPOS Command: {}", command);
678
679        match command {
680            "START" => {
681                self.launch_and_monitor().await;
682
683                #[cfg(target_os = "windows")]
684                let msg = "MyPOS server started with watchdog monitoring enabled.";
685
686                #[cfg(target_os = "macos")]
687                let msg = "MyPOS WebSocket connection established with watchdog monitoring (no process launched on macOS).";
688
689                (0, msg.to_string())
690            }
691            "STOP" => {
692                self.stop().await;
693                (0, "MyPOS stopped.".to_string())
694            }
695            "PURCHASE" => {
696                let amount = parsed_action.get("amount").and_then(|a| a.as_f64()).unwrap_or(0.0);
697                if amount > 0.0 {
698                    self.handle_purchase(amount, write).await
699                } else {
700                    (1, "Invalid purchase amount.".to_string())
701                }
702            }
703            "REFUND" => {
704                let amount = parsed_action.get("amount").and_then(|a| a.as_f64()).unwrap_or(0.0);
705                if amount > 0.0 {
706                    self.handle_refund(amount, write).await
707                } else {
708                    (1, "Invalid refund amount.".to_string())
709                }
710            }
711            _ => (1, format!("Unknown command: {}", command)),
712        }
713    }
714
715    /// Converts the service instance into a `dyn Any` reference
716    fn as_any(&self) -> &dyn std::any::Any {
717        self
718    }
719
720    /// Stops the service and cleans up any running processes
721    fn stop_service(&self) {
722        let running = self.running.clone();
723        let watchdog_enabled = self.watchdog_enabled.clone();
724
725        #[cfg(target_os = "windows")]
726        let child_process = self.child_process.clone();
727
728        let ws_writer = self.ws_writer.clone();
729
730        tokio::spawn(async move {
731            if running.load(Ordering::Relaxed) {
732                // Deshabilitar watchdog
733                watchdog_enabled.store(false, Ordering::Relaxed);
734                sleep(Duration::from_millis(500)).await;
735
736                // Close WebSocket
737                *ws_writer.lock().await = None;
738
739                // Stop process (solo Windows)
740                #[cfg(target_os = "windows")]
741                {
742                    if let Some(mut child) = child_process.lock().await.take() {
743                        if let Err(e) = child.kill() {
744                            error!("Error stopping MyPOS server: {:?}", e);
745                        } else {
746                            info!("MyPOS server stopped cleanly.");
747                        }
748                    }
749                }
750
751                #[cfg(target_os = "macos")]
752                info!("MyPOS WebSocket closed (no process to stop on macOS).");
753
754                running.store(false, Ordering::Relaxed);
755            }
756        });
757    }
758
759    /// Returns the version of the MyPOS service
760    fn get_version(&self) -> String {
761        MYPOS_VERSION.to_string()
762    }
763}