1use std::process::{Child, Command};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use serde_json::{json, Value};
5use std::sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8};
9use tokio::sync::{Mutex, oneshot};
10use tokio::time::{Duration, timeout, sleep};
11use crate::service::{Service, WebSocketWrite};
12use easy_trace::prelude::{info, error, warn};
13use tokio_tungstenite::{connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream};
14use futures_util::{stream::SplitSink, SinkExt, StreamExt, stream::SplitStream};
15use tokio::net::TcpStream;
16
17#[cfg(target_os = "windows")]
19use {sharing::MYPOS_SERVER_FILE, sharing::proc::kill_process_by_name};
20
21pub const MYPOS_VERSION: &str = "1.1.0";
23
24const MAX_RETRIES: u32 = 3;
26
27const RETRY_DELAY_SECS: u64 = 2;
29
30const WATCHDOG_CHECK_INTERVAL_SECS: u64 = 5;
32
33const RESTART_DELAY_SECS: u64 = 3;
35
36const WEBSOCKET_CONNECT_TIMEOUT_SECS: u64 = 10;
38
39#[derive(Debug, Serialize, Deserialize)]
40struct ServerResponse {
41 status: String,
42 message: Option<String>,
43}
44
45type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
46type WsReader = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
47
48pub struct MyPOSService {
50 #[cfg(target_os = "windows")]
52 child_process: Arc<Mutex<Option<Child>>>,
53
54 running: Arc<AtomicBool>,
56
57 watchdog_enabled: Arc<AtomicBool>,
59
60 ws_writer: Arc<Mutex<Option<WsWriter>>>,
62
63 response_sender: Arc<Mutex<Option<oneshot::Sender<ServerResponse>>>>,
65
66 restart_count: Arc<Mutex<u32>>,
68}
69
70impl MyPOSService {
71 pub fn new() -> Self {
73 MyPOSService {
74 #[cfg(target_os = "windows")]
75 child_process: Arc::new(Mutex::new(None)),
76 running: Arc::new(AtomicBool::new(false)),
77 watchdog_enabled: Arc::new(AtomicBool::new(false)),
78 ws_writer: Arc::new(Mutex::new(None)),
79 response_sender: Arc::new(Mutex::new(None)),
80 restart_count: Arc::new(Mutex::new(0)),
81 }
82 }
83
84 #[cfg(target_os = "windows")]
90 async fn launch_process(&self) -> Result<(), String> {
91 let exe_path = sharing::get_path_mypos_server();
92 info!("Launching MyPOS server at: {}", exe_path);
93
94 let mut guard = self.child_process.lock().await;
95
96 match Command::new(exe_path).spawn() {
97 Ok(child) => {
98 *guard = Some(child);
99 info!("MyPOS server process started successfully.");
100 Ok(())
101 }
102 Err(e) => {
103 error!("Failed to start MyPOS server: {:?}", e);
104 Err(format!("Failed to start process: {}", e))
105 }
106 }
107 }
108
109 async fn connect_websocket(&self) -> Result<(), String> {
111 let url = "ws://127.0.0.1:8765";
112 info!("Connecting to WebSocket at: {}", url);
113
114 let connect_future = connect_async(url);
115
116 let (ws_stream, _) = match timeout(
117 Duration::from_secs(WEBSOCKET_CONNECT_TIMEOUT_SECS),
118 connect_future
119 ).await {
120 Ok(Ok(result)) => result,
121 Ok(Err(e)) => return Err(format!("Failed to connect to WebSocket: {}", e)),
122 Err(_) => return Err("WebSocket connection timeout".to_string()),
123 };
124
125 let (write, read) = ws_stream.split();
126
127 *self.ws_writer.lock().await = Some(write);
129
130 let response_sender = self.response_sender.clone();
132 tokio::spawn(async move {
133 Self::handle_websocket_responses(read, response_sender).await;
134 });
135
136 info!("Connected to WebSocket successfully");
137 Ok(())
138 }
139
140 async fn handle_websocket_responses(
142 mut read: WsReader,
143 response_sender: Arc<Mutex<Option<oneshot::Sender<ServerResponse>>>>
144 ) {
145 while let Some(msg_result) = read.next().await {
146 match msg_result {
147 Ok(Message::Text(response)) => {
148 if let Ok(server_response) = serde_json::from_str::<ServerResponse>(&response) {
149 info!("Received response: {:?}", server_response);
150
151 let mut sender_guard = response_sender.lock().await;
153 if let Some(sender) = sender_guard.take() {
154 if sender.send(server_response).is_err() {
155 warn!("Failed to send response to waiting transaction");
156 }
157 }
158 } else {
159 warn!("Received unexpected response: {}", response);
160 }
161 }
162 Ok(Message::Close(_)) => {
163 warn!("WebSocket connection closed");
164 break;
165 }
166 Err(e) => {
167 error!("WebSocket error: {:?}", e);
168 break;
169 }
170 _ => {}
171 }
172 }
173 info!("WebSocket response handler terminated");
174 }
175
176 #[cfg(target_os = "windows")]
182 async fn launch_and_monitor(&self) {
183 self.watchdog_enabled.store(true, Ordering::Relaxed);
185
186 if let Err(e) = self.launch_process().await {
188 error!("Failed to launch process: {}", e);
189 return;
190 }
191
192 self.running.store(true, Ordering::Relaxed);
193
194 sleep(Duration::from_secs(RESTART_DELAY_SECS)).await;
196
197 if let Err(e) = self.connect_websocket().await {
199 error!("Failed to connect to WebSocket: {:?}", e);
200 return;
201 }
202
203 self.start_watchdog().await;
205 }
206
207 #[cfg(target_os = "macos")]
209 async fn launch_and_monitor(&self) {
210 info!("macOS: Skipping process launch, connecting to existing WebSocket server...");
211
212 self.running.store(true, Ordering::Relaxed);
213 self.watchdog_enabled.store(true, Ordering::Relaxed);
214
215 if let Err(e) = self.connect_websocket().await {
217 error!("Failed to connect to WebSocket: {:?}", e);
218 self.running.store(false, Ordering::Relaxed);
219 return;
220 }
221
222 info!("macOS: Connected to existing WebSocket server");
223
224 self.start_watchdog().await;
226 }
227
228 async fn start_watchdog(&self) {
230 let running = self.running.clone();
231 let watchdog_enabled = self.watchdog_enabled.clone();
232 let ws_writer = self.ws_writer.clone();
233 let restart_count = self.restart_count.clone();
234 let response_sender = self.response_sender.clone();
235
236 #[cfg(target_os = "windows")]
237 let child_process = self.child_process.clone();
238
239 tokio::spawn(async move {
240 info!("Watchdog started");
241
242 while watchdog_enabled.load(Ordering::Relaxed) {
243 sleep(Duration::from_secs(WATCHDOG_CHECK_INTERVAL_SECS)).await;
244
245 if !watchdog_enabled.load(Ordering::Relaxed) {
246 info!("Watchdog disabled, stopping...");
247 break;
248 }
249
250 let mut needs_restart = false;
251 let mut process_died = false;
252
253 #[cfg(target_os = "windows")]
255 {
256 let mut child_guard = child_process.lock().await;
257
258 if let Some(child) = child_guard.as_mut() {
259 match child.try_wait() {
260 Ok(Some(status)) => {
261 error!("MyPOS server process exited with status: {:?}", status);
263 *child_guard = None;
264 drop(child_guard);
265 needs_restart = true;
266 process_died = true;
267 }
268 Ok(None) => {
269 drop(child_guard);
271 }
272 Err(e) => {
273 error!("Error checking process status: {:?}", e);
275 *child_guard = None;
276 drop(child_guard);
277 needs_restart = true;
278 process_died = true;
279 }
280 }
281 } else {
282 drop(child_guard);
284 if running.load(Ordering::Relaxed) {
285 warn!("Process handle lost, attempting restart");
286 needs_restart = true;
287 process_died = true;
288 }
289 }
290 }
291
292 if ws_writer.lock().await.is_none() && running.load(Ordering::Relaxed) {
294 warn!("WebSocket connection lost");
295 needs_restart = true;
296 }
297
298 if needs_restart {
300 let mut count = restart_count.lock().await;
301 *count += 1;
302 let attempt = *count;
303 drop(count);
304
305 warn!("🔄 Watchdog detected failure. Restart attempt #{}", attempt);
306
307 *ws_writer.lock().await = None;
309 running.store(false, Ordering::Relaxed);
310
311 #[cfg(target_os = "windows")]
313 {
314 info!("Watchdog: Attempting to restart MyPOS server...");
315
316 if process_died {
318 info!("Watchdog: Cleaning up residual processes...");
319 kill_process_by_name(MYPOS_SERVER_FILE);
320 sleep(Duration::from_millis(500)).await;
321 }
322
323 sleep(Duration::from_secs(RESTART_DELAY_SECS)).await;
325
326 let exe_path = sharing::get_path_mypos_server();
328 info!("Watchdog: Spawning new process at: {}", exe_path);
329
330 match Command::new(&exe_path).spawn() {
331 Ok(child) => {
332 *child_process.lock().await = Some(child);
333 info!("Watchdog: ✓ Process restarted successfully");
334
335 info!("Watchdog: Waiting {} seconds for server to start...", RESTART_DELAY_SECS);
337 sleep(Duration::from_secs(RESTART_DELAY_SECS)).await;
338
339 info!("Watchdog: Attempting to reconnect WebSocket...");
341 match Self::reconnect_websocket_static(ws_writer.clone(), response_sender.clone()).await {
342 Ok(_) => {
343 running.store(true, Ordering::Relaxed);
344 info!("Watchdog: ✓ WebSocket reconnected successfully");
345 info!("Watchdog: ✓ Full recovery completed on attempt #{}", attempt);
346 }
347 Err(e) => {
348 error!("Watchdog: ✗ Failed to reconnect WebSocket: {}", e);
349 }
350 }
351 }
352 Err(e) => {
353 error!("Watchdog: ✗ Failed to restart process: {:?}", e);
354 }
355 }
356 }
357
358 #[cfg(target_os = "macos")]
360 {
361 sleep(Duration::from_secs(RESTART_DELAY_SECS)).await;
362 info!("Watchdog: Attempting to reconnect WebSocket...");
363 match Self::reconnect_websocket_static(ws_writer.clone(), response_sender.clone()).await {
364 Ok(_) => {
365 running.store(true, Ordering::Relaxed);
366 info!("Watchdog: WebSocket reconnected successfully");
367 }
368 Err(e) => {
369 error!("Watchdog: Failed to reconnect: {}", e);
370 }
371 }
372 }
373 }
374 }
375
376 info!("Watchdog stopped");
377 });
378 }
379
380 async fn reconnect_websocket_static(
382 ws_writer: Arc<Mutex<Option<WsWriter>>>,
383 response_sender: Arc<Mutex<Option<oneshot::Sender<ServerResponse>>>>
384 ) -> Result<(), String> {
385 let url = "ws://127.0.0.1:8765";
386 info!("Reconnecting to WebSocket at: {}", url);
387
388 let connect_future = connect_async(url);
389 let (ws_stream, _) = match timeout(
390 Duration::from_secs(WEBSOCKET_CONNECT_TIMEOUT_SECS),
391 connect_future
392 ).await {
393 Ok(Ok(result)) => result,
394 Ok(Err(e)) => return Err(format!("Failed to reconnect: {}", e)),
395 Err(_) => return Err("Reconnection timeout".to_string()),
396 };
397
398 let (write, read) = ws_stream.split();
399 *ws_writer.lock().await = Some(write);
400
401 tokio::spawn(async move {
403 Self::handle_websocket_responses(read, response_sender).await;
404 });
405
406 info!("WebSocket reconnection completed");
407 Ok(())
408 }
409
410 #[cfg(target_os = "windows")]
416 async fn stop(&self) {
417 info!("Stopping MyPOS service...");
418
419 self.watchdog_enabled.store(false, Ordering::Relaxed);
421
422 sleep(Duration::from_millis(500)).await;
424
425 *self.ws_writer.lock().await = None;
427
428 let mut guard = self.child_process.lock().await;
430 if let Some(mut child) = guard.take() {
431 if let Err(e) = child.kill() {
432 error!("Error killing MyPOS server: {:?}", e);
433 } else {
434 kill_process_by_name(MYPOS_SERVER_FILE);
435 info!("MyPOS server stopped.");
436 }
437 }
438
439 self.running.store(false, Ordering::Relaxed);
440 *self.restart_count.lock().await = 0;
441 }
442
443 #[cfg(target_os = "macos")]
445 async fn stop(&self) {
446 info!("macOS: Closing WebSocket connection...");
447
448 self.watchdog_enabled.store(false, Ordering::Relaxed);
450
451 sleep(Duration::from_millis(500)).await;
453
454 *self.ws_writer.lock().await = None;
456
457 self.running.store(false, Ordering::Relaxed);
458 *self.restart_count.lock().await = 0;
459 info!("MyPOS WebSocket connection closed.");
460 }
461
462 async fn send_websocket_message(&self, message: Value) -> Result<ServerResponse, String> {
468 let (sender, receiver) = oneshot::channel();
470
471 {
473 let mut response_sender = self.response_sender.lock().await;
474 *response_sender = Some(sender);
475 }
476
477 {
479 let mut ws_writer = self.ws_writer.lock().await;
480 if let Some(writer) = ws_writer.as_mut() {
481 let msg = Message::Text(message.to_string());
482 if let Err(e) = writer.send(msg).await {
483 return Err(format!("Failed to send WebSocket message: {}", e));
484 }
485 } else {
486 return Err("WebSocket not connected".to_string());
487 }
488 }
489
490 match timeout(Duration::from_secs(60), receiver).await {
492 Ok(Ok(response)) => Ok(response),
493 Ok(Err(_)) => Err("Response channel closed".to_string()),
494 Err(_) => Err("Transaction timeout".to_string()),
495 }
496 }
497
498 async fn attempt_transaction_with_retries(
504 &self,
505 action: &str,
506 amount: f64,
507 ) -> Result<ServerResponse, String> {
508 for attempt in 1..=MAX_RETRIES {
509 info!("Attempt {}/{} for {} of {:.2} EUR", attempt, MAX_RETRIES, action, amount);
510
511 let message = json!({
512 "action": action,
513 "amount": amount
514 });
515
516 match self.send_websocket_message(message).await {
517 Ok(response) => {
518 let status_lower = response.status.to_lowercase();
519
520 if status_lower == "success" {
522 info!("Transaction successful on attempt {}", attempt);
523 return Ok(response);
524 }
525
526 if status_lower == "usercancel" {
528 warn!("Transaction cancelled by user, no retries");
529 return Ok(response);
530 }
531
532 if attempt < MAX_RETRIES {
534 warn!("Attempt {} failed with status: {}. Retrying in {} seconds...",
535 attempt, response.status, RETRY_DELAY_SECS);
536 sleep(Duration::from_secs(RETRY_DELAY_SECS)).await;
537 } else {
538 error!("All {} attempts failed. Last status: {}", MAX_RETRIES, response.status);
539 return Ok(response);
540 }
541 }
542 Err(e) => {
543 if attempt < MAX_RETRIES {
544 warn!("Attempt {} failed with error: {}. Retrying in {} seconds...",
545 attempt, e, RETRY_DELAY_SECS);
546 sleep(Duration::from_secs(RETRY_DELAY_SECS)).await;
547 } else {
548 error!("All {} attempts failed. Last error: {}", MAX_RETRIES, e);
549 return Err(e);
550 }
551 }
552 }
553 }
554
555 Err("Maximum retries exceeded".to_string())
556 }
557
558 async fn handle_purchase(&self, amount: f64, write: WebSocketWrite) -> (i32, String) {
564 info!("Handling purchase of: {:.2} EUR (with up to {} retries)", amount, MAX_RETRIES);
565
566 match self.attempt_transaction_with_retries("purchase", amount).await {
567 Ok(response) => {
568 let success = response.status.to_lowercase() == "success";
569 let message = response.message.unwrap_or_else(||
570 if success {
571 format!("Purchase of {:.2} EUR completed successfully", amount)
572 } else {
573 format!("Purchase of {:.2} EUR failed after {} attempts: {}",
574 amount, MAX_RETRIES, response.status)
575 }
576 );
577
578 if let Some(ws_lock) = &write {
580 let response_msg = json!({
581 "type": "purchase_result",
582 "success": success,
583 "amount": amount,
584 "message": message,
585 "status": response.status
586 });
587
588 let mut ws = ws_lock.write().await;
589 if let Err(e) = ws.send(Message::Text(response_msg.to_string())).await {
590 error!("Failed to send purchase result: {:?}", e);
591 }
592 }
593
594 if success { (0, message) } else { (1, message) }
595 }
596 Err(e) => {
597 let error_msg = format!("Purchase failed after {} attempts: {}", MAX_RETRIES, e);
598 error!("{}", error_msg);
599 (1, error_msg)
600 }
601 }
602 }
603
604 async fn handle_refund(&self, amount: f64, write: WebSocketWrite) -> (i32, String) {
606 info!("Handling refund of: {:.2} EUR (with up to {} retries)", amount, MAX_RETRIES);
607
608 match self.attempt_transaction_with_retries("refund", amount).await {
609 Ok(response) => {
610 let success = response.status.to_lowercase() == "success";
611 let message = response.message.unwrap_or_else(||
612 if success {
613 format!("Refund of {:.2} EUR completed successfully", amount)
614 } else {
615 format!("Refund of {:.2} EUR failed after {} attempts: {}",
616 amount, MAX_RETRIES, response.status)
617 }
618 );
619
620 if let Some(ws_lock) = &write {
622 let response_msg = json!({
623 "type": "refund_result",
624 "success": success,
625 "amount": amount,
626 "message": message,
627 "status": response.status
628 });
629
630 let mut ws = ws_lock.write().await;
631 if let Err(e) = ws.send(Message::Text(response_msg.to_string())).await {
632 error!("Failed to send refund result: {:?}", e);
633 }
634 }
635
636 if success { (0, message) } else { (1, message) }
637 }
638 Err(e) => {
639 let error_msg = format!("Refund failed after {} attempts: {}", MAX_RETRIES, e);
640 error!("{}", error_msg);
641 (1, error_msg)
642 }
643 }
644 }
645}
646
647#[async_trait]
652impl Service for MyPOSService {
653 async fn run(&self, action: Value, write: WebSocketWrite) -> (i32, String) {
655 info!("MyPOSService: Running action: {:?}", action);
656
657 let parsed_action = if let Some(action_str) = action.get("ACTION").and_then(|a| a.as_str()) {
659 serde_json::from_str::<Value>(action_str).unwrap_or_else(|_| {
661 json!({"command": action_str})
663 })
664 } else if let Some(action_obj) = action.get("ACTION") {
665 action_obj.clone()
667 } else {
668 action
670 };
671
672 let command = parsed_action
673 .get("command")
674 .and_then(|c| c.as_str())
675 .unwrap_or("UNKNOWN");
676
677 info!("MyPOS Command: {}", command);
678
679 match command {
680 "START" => {
681 self.launch_and_monitor().await;
682
683 #[cfg(target_os = "windows")]
684 let msg = "MyPOS server started with watchdog monitoring enabled.";
685
686 #[cfg(target_os = "macos")]
687 let msg = "MyPOS WebSocket connection established with watchdog monitoring (no process launched on macOS).";
688
689 (0, msg.to_string())
690 }
691 "STOP" => {
692 self.stop().await;
693 (0, "MyPOS stopped.".to_string())
694 }
695 "PURCHASE" => {
696 let amount = parsed_action.get("amount").and_then(|a| a.as_f64()).unwrap_or(0.0);
697 if amount > 0.0 {
698 self.handle_purchase(amount, write).await
699 } else {
700 (1, "Invalid purchase amount.".to_string())
701 }
702 }
703 "REFUND" => {
704 let amount = parsed_action.get("amount").and_then(|a| a.as_f64()).unwrap_or(0.0);
705 if amount > 0.0 {
706 self.handle_refund(amount, write).await
707 } else {
708 (1, "Invalid refund amount.".to_string())
709 }
710 }
711 _ => (1, format!("Unknown command: {}", command)),
712 }
713 }
714
715 fn as_any(&self) -> &dyn std::any::Any {
717 self
718 }
719
720 fn stop_service(&self) {
722 let running = self.running.clone();
723 let watchdog_enabled = self.watchdog_enabled.clone();
724
725 #[cfg(target_os = "windows")]
726 let child_process = self.child_process.clone();
727
728 let ws_writer = self.ws_writer.clone();
729
730 tokio::spawn(async move {
731 if running.load(Ordering::Relaxed) {
732 watchdog_enabled.store(false, Ordering::Relaxed);
734 sleep(Duration::from_millis(500)).await;
735
736 *ws_writer.lock().await = None;
738
739 #[cfg(target_os = "windows")]
741 {
742 if let Some(mut child) = child_process.lock().await.take() {
743 if let Err(e) = child.kill() {
744 error!("Error stopping MyPOS server: {:?}", e);
745 } else {
746 info!("MyPOS server stopped cleanly.");
747 }
748 }
749 }
750
751 #[cfg(target_os = "macos")]
752 info!("MyPOS WebSocket closed (no process to stop on macOS).");
753
754 running.store(false, Ordering::Relaxed);
755 }
756 });
757 }
758
759 fn get_version(&self) -> String {
761 MYPOS_VERSION.to_string()
762 }
763}