1use crate::service::{Service, WebSocketWrite};
2use async_trait::async_trait;
3use futures_util::SinkExt;
4use serde_json::{json, Value};
5use std::sync::Arc;
6use tokio::sync::Mutex;
7use tokio::time::{sleep, Duration};
8use tokio_tungstenite::tungstenite::protocol::Message;
9use easy_trace::prelude::{error, info};
10
11pub const CASH_GUARD_VERSION: &str = "1.0.0";
12
13pub struct CashGuardService {
15 base_url: String,
16 processing_ids: Arc<Mutex<Vec<String>>>, auth: (String, String), }
19
20impl Clone for CashGuardService {
21 fn clone(&self) -> Self {
22 Self {
23 base_url: self.base_url.clone(),
24 processing_ids: Arc::clone(&self.processing_ids),
25 auth: self.auth.clone(),
26 }
27 }
28}
29
30impl CashGuardService {
31 pub fn new(host: &str, port: u16, username: &str, password: &str) -> Self {
40 let base_url = format!("http://{}:{}/cashguard/posApi/4.0", host, port);
41 Self {
42 base_url,
43 processing_ids: Arc::new(Mutex::new(Vec::new())),
44 auth: (username.to_string(), password.to_string()),
45 }
46 }
47
48 async fn _send_response(&self, write: &WebSocketWrite, uuid: &str, message: Value) {
56 let full_message = json!({
57 "SERVICE_NAME": "CASHGUARD",
58 "SERVICE_VERS": CASH_GUARD_VERSION,
59 "MESSAGE_TYPE": "RESPONSE",
60 "MESSAGE_EXEC": "SUCCESS",
61 "MESSAGE_UUID": uuid,
62 "MESSAGE_DATA": message,
63 });
64
65 self.send_message(write, full_message).await;
66 }
67
68 async fn send_request(
80 &self,
81 method: &str,
82 endpoint: &str,
83 params: Option<Value>,
84 ) -> Result<Value, String> {
85 let url = format!("{}/{}", self.base_url, endpoint);
86 let client = reqwest::Client::new();
87 let (username, password) = &self.auth;
88 info!("Sending request to: {} with params: {:?}", url, params);
89
90 let query_params: Vec<(String, String)> = match params.as_ref().and_then(|value| value.as_object()) {
91 Some(map) => map.iter()
92 .map(|(k, v)| {
93 let value_as_string = if v.is_number() {
94 if let Some(num) = v.as_f64() {
95 format!("{}", num as i64) } else if let Some(num) = v.as_i64() {
97 format!("{}", num)
98 } else {
99 v.to_string()
100 }
101 } else {
102 v.to_string()
103 };
104 (k.clone(), value_as_string)
105 })
106 .collect(),
107 None => Vec::new(),
108 };
109
110 let response = match method.to_uppercase().as_str() {
111 "GET" => {
112 client
113 .get(&url)
114 .basic_auth(username, Some(password))
115 .header("accept", "*/*")
116 .query(&query_params) .send()
118 .await
119 }
120 "POST" => {
121 client
122 .post(&url)
123 .basic_auth(username, Some(password))
124 .header("accept", "*/*")
125 .json(¶ms.unwrap_or_else(|| json!({}))) .send()
127 .await
128 }
129 "PUT" => {
130 client
131 .put(&url)
132 .basic_auth(username, Some(password))
133 .header("accept", "*/*")
134 .query(&query_params)
135 .send()
136 .await
137 }
138 "DELETE" => {
139 client
140 .delete(&url)
141 .basic_auth(username, Some(password))
142 .header("accept", "*/*")
143 .query(&query_params)
144 .send()
145 .await
146 }
147 _ => {
148 return Err(format!("Unsupported HTTP method: {}", method));
149 }
150 };
151
152
153 match response {
154 Ok(res) if res.status().is_success() => {
155 let body = res.text().await.unwrap_or_else(|_| "{}".to_string());
156 Ok(serde_json::from_str(&body).unwrap_or(json!({})))
157 }
158 Ok(res) => Err(format!(
159 "HTTP Error: {} - {}",
160 res.status(),
161 res.text().await.unwrap_or_default()
162 )),
163 Err(err) => Err(format!("Request failed: {}", err)),
164 }
165 }
166
167
168 async fn status(&self) -> Result<Value, String> {
174 self.send_request("GET", "status", None).await
175 }
176
177 async fn login(&self) -> Result<(), String> {
183 self.send_request("PUT", "login", None).await.map(|_| ())
184 }
185
186 async fn logout(&self) -> Result<(), String> {
192 self.send_request("PUT", "logout", None).await.map(|_| ())
193 }
194
195 async fn set_amount_due(&self, transaction_id: &str, amount_due: f64) -> Result<(), String> {
206 let amount_due_int = amount_due as i64;
207
208 let endpoint = format!(
209 "transactions/{}?status=amountDue&amountDue={}",
210 transaction_id, amount_due_int
211 );
212
213 self.send_request("PUT", &endpoint, None).await.map(|_| ())
214 }
215
216
217 async fn enqueue_transaction(
228 &self,
229 transaction_id: &str,
230 amount_due: Option<f64>,
231 ) -> Result<(), String> {
232 let mut endpoint = format!("queue/{}", transaction_id);
233 if let Some(amount) = amount_due {
234 endpoint.push_str(&format!("?amountDue={}", amount));
235 }
236
237 self.send_request("PUT", &endpoint, None).await.map(|_| ())
238 }
239
240
241 async fn initiate_transaction(
252 &self,
253 transaction_id: &str,
254 amount_due: f64,
255 ) -> Result<(), String> {
256 let endpoint = format!(
258 "transactions/{}?status=initiated¤cy=EUR&amountDue={}",
259 transaction_id, amount_due
260 );
261
262 self.send_request("PUT", &endpoint, None).await.map(|_| ())
264 }
265
266
267 async fn pre_start_transaction(
279 &self,
280 transaction_id: &str,
281 polling_interval: Duration,
282 write: WebSocketWrite,
283 ) -> Result<(), String> {
284 loop {
285 let status = self.status().await?;
286 let system_status = status.get("status").and_then(|s| s.as_str()).unwrap_or("");
287
288 match system_status {
289 "Logged Off" => {
290 info!("System is logged off, logging in...");
291 self.login().await?;
292 }
293 "Busy" => {
294 info!("System is busy, waiting...");
295 sleep(polling_interval).await;
296 }
297 _ => break,
298 }
299
300 if self
301 .processing_ids
302 .lock()
303 .await
304 .contains(&transaction_id.to_string())
305 {
306 if let Some(ws_lock) = &write {
307 let mut ws = ws_lock.write().await;
308 ws.send(Message::Text(
309 json!({
310 "status": "error",
311 "message": "Transaction no longer in queue.",
312 })
313 .to_string(),
314 ))
315 .await
316 .unwrap_or_default();
317 }
318 }
319 }
320 Ok(())
321 }
322
323 async fn final_response(&self, write: &WebSocketWrite, response: &str) {
330 if let Some(write) = &write {
331 let mut ws = write.write().await;
332 ws.send(Message::Text(response.to_string()))
333 .await
334 .unwrap_or_default();
335 }
336 }
337
338 async fn process_transaction(
350 &self,
351 transaction_id: &str,
352 amount_due: f64,
353 write: WebSocketWrite,
354 ) -> Result<(), String> {
355 let mut processing_ids = self.processing_ids.lock().await;
356
357 if processing_ids.contains(&transaction_id.to_string()) {
358 return Err(format!(
359 "Transaction {} is already being processed.",
360 transaction_id
361 ));
362 } else {
363 info!("Transaction {} is not being processed.", transaction_id);
364 processing_ids.push(transaction_id.to_string());
365 }
366
367 drop(processing_ids); info!("Processing transaction: {} pre-start", transaction_id);
371 self.pre_start_transaction(transaction_id, Duration::from_secs(2), write.clone())
372 .await?;
373
374 info!("Processing transaction: {} enqueue", transaction_id);
376 self.enqueue_transaction(transaction_id, Some(amount_due))
377 .await?;
378
379 info!("Processing transaction: {} initiate", transaction_id);
381 self.initiate_transaction(transaction_id, amount_due)
382 .await?;
383
384 let mut current_amount = 0.0;
385 let mut last_reported_amount = 0.0;
386
387 while current_amount < amount_due {
389 tokio::time::sleep(Duration::from_secs(2)).await;
390
391 let status = self.status().await?;
392 current_amount = status.get("amount").and_then(|a| a.as_f64()).unwrap_or(0.0);
393
394 if current_amount != last_reported_amount {
395 last_reported_amount = current_amount;
396 let response = json!({
397 "status": "progress",
398 "transactionId": transaction_id,
399 "currentAmount": current_amount,
400 "amountDue": amount_due,
401 });
402
403 self.final_response(&write, &response.to_string()).await;
404 }
405
406 let processing_ids = self.processing_ids.lock().await;
408 if !processing_ids.contains(&transaction_id.to_string()) {
409 drop(processing_ids); let error_response = json!({
411 "status": "error",
412 "message": "Transaction no longer in queue.",
413 "transactionId": transaction_id,
414 });
415 self.final_response(&write, &error_response.to_string()).await;
416
417 return Err(format!(
418 "Transaction {} is no longer in the queue. Aborting.",
419 transaction_id
420 ));
421 }
422 }
423
424 info!("Confirming amount received: {}", amount_due);
426 self.set_amount_due(transaction_id, amount_due).await?;
427
428 let refund = if current_amount > amount_due {
430 let refund_amount = current_amount - amount_due;
431 info!("Refund issued: {}", refund_amount);
432 refund_amount
433 } else {
434 info!("No refund necessary.");
435 0.0
436 };
437
438 let final_response = json!({
440 "status": "success",
441 "transactionId": transaction_id,
442 "amountDue": amount_due,
443 "amountRefund": refund,
444 });
445
446 self.final_response(&write, &final_response.to_string()).await;
447
448 let mut processing_ids = self.processing_ids.lock().await;
449 processing_ids.retain(|id| id != transaction_id);
450 drop(processing_ids); Ok(())
453 }
454
455
456 async fn cash_out_transaction(
468 &self,
469 transaction_id: &str,
470 amount: f64,
471 write: WebSocketWrite,
472 ) -> Result<(), String> {
473 let negative_amount = -amount.abs();
474 self.pre_start_transaction(transaction_id, Duration::from_secs(2), write.clone())
475 .await?;
476 self.enqueue_transaction(transaction_id, Some(negative_amount))
477 .await?;
478 self.initiate_transaction(transaction_id, negative_amount)
479 .await?;
480 self.set_amount_due(transaction_id, negative_amount).await?;
481
482 let response = json!({
483 "status": "success",
484 "message": "Cash out completed.",
485 "transactionId": transaction_id,
486 });
487
488 self.final_response(&write, &response.to_string()).await;
489
490 Ok(())
491 }
492
493 async fn send_message(&self, write: &WebSocketWrite, message: Value) {
500 if let Some(ws_lock) = write {
501 let serialized_message = Message::Text(message.to_string());
502 let mut ws = ws_lock.write().await;
503 if let Err(e) = ws.send(serialized_message).await {
504 error!("Failed to send WebSocket message: {}", e);
505 }
506 }
507 }
508
509 async fn cancel_transaction(&self, transaction_id: &str, status: &str) -> Result<(), String> {
520 if status == "PendingActivation" {
521 self.set_amount_due(transaction_id, 0.0).await?;
522 self.send_request(
523 "PUT",
524 &format!("transactions/{}?status=released", transaction_id),
525 None,
526 )
527 .await?;
528 } else {
529 self.set_amount_due(transaction_id, 0.0).await?;
530 }
531 Ok(())
532 }
533
534 async fn reset(&self) -> Result<(), String> {
540 self.send_request("PUT", "Reset", None).await.map(|_| ())
541 }
542
543 async fn _transaction(
558 &self,
559 transaction_id: &str,
560 status: &str,
561 currency: Option<&str>,
562 amount_due: Option<f64>,
563 in_amount: Option<f64>,
564 out_amount: Option<f64>,
565 ) -> Result<(), String> {
566 let mut endpoint = format!("transactions/{}?status={}", transaction_id, status);
567 if let Some(curr) = currency {
568 endpoint.push_str(&format!("¤cy={}", curr));
569 }
570 if let Some(amount) = amount_due {
571 endpoint.push_str(&format!("&amountDue={}", amount));
572 }
573 if let Some(in_amt) = in_amount {
574 endpoint.push_str(&format!("&in={}", in_amt));
575 }
576 if let Some(out_amt) = out_amount {
577 endpoint.push_str(&format!("&out={}", out_amt));
578 }
579 self.send_request("PUT", &endpoint, None).await.map(|_| ())
580 }
581
582 async fn _repay_transaction(
594 &self,
595 transaction_id: &str,
596 in_amount: f64,
597 out_amount: f64,
598 ) -> Result<(), String> {
599 self._transaction(transaction_id, "repaid", Some("EUR"), None, Some(in_amount), Some(out_amount))
600 .await
601 }
602
603 async fn force_release_transaction(&self, transaction_id: &str) -> Result<(), String> {
605 info!("Force releasing transaction: {}", transaction_id);
606
607 info!("Attempting complete cancellation process for transaction {}", transaction_id);
609
610 let _ = self.set_amount_due(transaction_id, 0.0).await;
612 tokio::time::sleep(Duration::from_millis(300)).await;
613
614 let repay_result = self.send_request(
616 "PUT",
617 &format!("transactions/{}?status=repaid¤cy=EUR", transaction_id),
618 None,
619 ).await;
620
621 if repay_result.is_ok() {
622 info!("Repay successful for transaction {}, proceeding with release", transaction_id);
623 tokio::time::sleep(Duration::from_millis(1000)).await;
624
625 let release_result = self.send_request(
627 "PUT",
628 &format!("transactions/{}?status=released¤cy=EUR&in=0&out=0", transaction_id),
629 None,
630 ).await;
631
632 if release_result.is_ok() {
633 info!("Complete repay+release successful for transaction {}", transaction_id);
634 return Ok(());
635 }
636 }
637
638 info!("Trying direct release for transaction {}", transaction_id);
640 let result1 = self.send_request(
641 "PUT",
642 &format!("transactions/{}?status=released", transaction_id),
643 None,
644 ).await;
645
646 if result1.is_ok() {
647 return Ok(());
648 }
649
650 info!("Trying queue deletion for transaction {}", transaction_id);
652 let result2 = self.send_request("DELETE", &format!("queue/{}", transaction_id), None).await;
653
654 if result2.is_ok() {
655 return Ok(());
656 }
657
658 error!("Could not completely cancel transaction {}, but continuing", transaction_id);
660 Ok(()) }
662 async fn clear_queue_and_process_new_transaction(
665 &self,
666 new_transaction_id: &str,
667 amount_due: f64,
668 write: WebSocketWrite,
669 ) -> Result<(), String> {
670 info!("Starting queue cleanup before processing transaction: {}", new_transaction_id);
671
672 let queue = self.get_queue().await.unwrap_or(json!({}));
674
675 if let Some(queue_data) = queue.as_array() {
676 if !queue_data.is_empty() {
677 info!("Found {} transactions in queue, proceeding to cancel one by one", queue_data.len());
678
679 for item in queue_data {
681 let transaction_id = item
682 .get("transactionId")
683 .and_then(|t| t.as_str())
684 .unwrap_or("");
685 let status = item.get("status").and_then(|s| s.as_str()).unwrap_or("");
686 let being_handled = item.get("beingHandledBySystem").and_then(|b| b.as_bool()).unwrap_or(false);
687
688 if !transaction_id.is_empty() {
689 info!("Cancelling transaction: {} with status: {} (beingHandled: {})", transaction_id, status, being_handled);
690
691 match self.cancel_single_transaction(transaction_id, status, being_handled).await {
692 Ok(_) => {
693 info!("Transaction {} cancelled successfully", transaction_id);
694
695 let progress_response = json!({
697 "status": "clearing_queue",
698 "message": format!("Cancelled transaction: {}", transaction_id),
699 "transactionId": transaction_id,
700 });
701 self.final_response(&write, &progress_response.to_string()).await;
702
703 tokio::time::sleep(Duration::from_millis(1000)).await;
705 }
706 Err(e) => {
707 error!("Error cancelling transaction {}: {}", transaction_id, e);
708 }
710 }
711
712 let mut processing_ids = self.processing_ids.lock().await;
714 processing_ids.retain(|id| id != transaction_id);
715 drop(processing_ids);
716 }
717 }
718
719 for retry_attempt in 0..3 {
721 let final_queue = self.get_queue().await.unwrap_or(json!({}));
722 if let Some(final_queue_data) = final_queue.as_array() {
723 if !final_queue_data.is_empty() {
724 error!("Queue still has {} transactions after cleanup attempt {}", final_queue_data.len(), retry_attempt + 1);
725 for item in final_queue_data {
727 let transaction_id = item.get("transactionId").and_then(|t| t.as_str()).unwrap_or("");
728 let status = item.get("status").and_then(|s| s.as_str()).unwrap_or("");
729 let being_handled = item.get("beingHandledBySystem").and_then(|b| b.as_bool()).unwrap_or(false);
730 if !transaction_id.is_empty() {
731 info!("Retry cleanup: cancelling {} with status {} (beingHandled: {})", transaction_id, status, being_handled);
732 let _ = self.force_release_transaction(transaction_id).await;
733 tokio::time::sleep(Duration::from_millis(1000)).await;
734 }
735 }
736 } else {
737 info!("Queue cleaned successfully, it's empty");
738 break;
739 }
740 }
741 }
742 } else {
743 info!("Queue is already empty, no cleanup needed");
744 }
745 }
746
747 info!("Queue cleaned, starting new transaction: {}", new_transaction_id);
749
750 let success_response = json!({
751 "status": "queue_cleared",
752 "message": "Queue cleared, starting new transaction",
753 "newTransactionId": new_transaction_id,
754 });
755 self.final_response(&write, &success_response.to_string()).await;
756
757 self.process_transaction(new_transaction_id, amount_due, write).await
759 }
760
761 async fn _release_transaction(
776 &self,
777 transaction_id: &str,
778 in_amount: f64,
779 out_amount: f64,
780 ) -> Result<(), String> {
781 self._transaction(
782 transaction_id,
783 "released",
784 Some("EUR"),
785 None,
786 Some(in_amount),
787 Some(out_amount),
788 )
789 .await
790 }
791 async fn cancel_single_transaction(&self, transaction_id: &str, status: &str, being_handled: bool) -> Result<(), String> {
792 info!("Cancelling individual transaction: {} with status: {} (beingHandled: {})", transaction_id, status, being_handled);
793
794 match status {
796 "InQueue" => {
797 info!("Transaction {} is queued, removing directly", transaction_id);
799 self.send_request("DELETE", &format!("queue/{}", transaction_id), None).await.map(|_| ())
800 }
801 "PendingActivation" => {
802 info!("Transaction {} is pending, cancelling with amountDue=0", transaction_id);
804 self.set_amount_due(transaction_id, 0.0).await?;
805 tokio::time::sleep(Duration::from_millis(300)).await;
806 self.send_request(
807 "PUT",
808 &format!("transactions/{}?status=released", transaction_id),
809 None,
810 ).await.map(|_| ())
811 }
812 "MovedFromQueue" => {
813 info!("Transaction {} is moved from queue, following complete cancellation process", transaction_id);
816
817 if being_handled {
818 info!("Transaction {} is being handled by system, doing repay+release", transaction_id);
820
821 info!("Step 1: Repaying transaction {}", transaction_id);
823 let repay_result = self.send_request(
824 "PUT",
825 &format!("transactions/{}?status=repaid¤cy=EUR", transaction_id),
826 None,
827 ).await;
828
829 if let Err(e) = repay_result {
830 error!("Repay failed for transaction {}: {}, trying alternative", transaction_id, e);
831 let _ = self.set_amount_due(transaction_id, 0.0).await;
833 }
834
835 tokio::time::sleep(Duration::from_millis(1000)).await;
837
838 info!("Step 2: Releasing transaction {} with values at 0", transaction_id);
840 self.send_request(
841 "PUT",
842 &format!("transactions/{}?status=released¤cy=EUR&in=0&out=0", transaction_id),
843 None,
844 ).await.map(|_| ())
845 } else {
846 info!("Transaction {} moved but not being handled, direct release", transaction_id);
848 self.set_amount_due(transaction_id, 0.0).await?;
849 tokio::time::sleep(Duration::from_millis(300)).await;
850 self.send_request(
851 "PUT",
852 &format!("transactions/{}?status=released", transaction_id),
853 None,
854 ).await.map(|_| ())
855 }
856 }
857 "Initiated" => {
858 info!("Transaction {} is initiated, following repay+release process", transaction_id);
860
861 let _ = self.set_amount_due(transaction_id, 0.0).await;
863 tokio::time::sleep(Duration::from_millis(300)).await;
864
865 info!("Step 1: Repaying transaction {}", transaction_id);
867 let repay_result = self.send_request(
868 "PUT",
869 &format!("transactions/{}?status=repaid¤cy=EUR", transaction_id),
870 None,
871 ).await;
872
873 if let Err(e) = repay_result {
874 error!("Repay failed for transaction {}: {}, continuing with release", transaction_id, e);
875 }
876
877 tokio::time::sleep(Duration::from_millis(1000)).await;
879
880 info!("Step 2: Releasing transaction {}", transaction_id);
882 self.send_request(
883 "PUT",
884 &format!("transactions/{}?status=released¤cy=EUR&in=0&out=0", transaction_id),
885 None,
886 ).await.map(|_| ())
887 }
888 _ => {
889 info!("Transaction {} has unknown status '{}', trying generic cancellation", transaction_id, status);
891 self.force_release_transaction(transaction_id).await
892 }
893 }
894 }
895 async fn get_queue(&self) -> Result<Value, String> {
901 self.send_request("GET", "queue", None).await
902 }
903
904 async fn handle_command(&self, write: &WebSocketWrite, command: String, transaction_id: &String, amount_due: f64) -> Result<(i32, String), (i32, String)> {
917 Ok(match command.as_str() {
918 "PROCESS_TRANSACTION" => {
919 info!("Processing new transaction with automatic queue clearing: {}", transaction_id);
920
921 if let Err(e) = self
922 .clear_queue_and_process_new_transaction(&transaction_id, amount_due, write.clone())
923 .await
924 {
925 error!("Error processing transaction with queue clearing: {}", e);
926 return Err((1, e));
927 }
928
929 (0, json!({"status": "success", "message": "Transaction processed successfully after clearing queue"}).to_string())
930 }
931 "CASH_OUT" => {
932 if let Err(e) = self
933 .cash_out_transaction(&transaction_id, amount_due, write.clone())
934 .await
935 {
936 error!("Failed to process cash out: {}", e);
937 return Err((1, e));
938 }
939 (0, "Cash out completed successfully".to_string())
940 }
941 "RESET" => {
942 let queue = self
944 .send_request("GET", "queue", None)
945 .await
946 .unwrap_or(json!({}));
947 if let Some(queue_data) = queue.as_array() {
948 for item in queue_data {
949 let transaction_id = item
950 .get("transactionId")
951 .and_then(|t| t.as_str())
952 .unwrap_or("");
953 let status = item.get("status").and_then(|s| s.as_str()).unwrap_or("");
954 match self.cancel_transaction(transaction_id, status).await {
955 Ok(_) => {
956 info!("Transaction {} cancelled.", transaction_id);
957 }
958 Err(e) => {
959 error!("Failed to cancel transaction {}: {}", transaction_id, e);
960 }
961 }
962 }
963 }
964 match self.reset().await {
965 Ok(_) => {
966 info!("CashGuard device reset.");
967 }
968 Err(e) => {
969 error!("Failed to reset CashGuard device: {}", e);
970 return Err((1, e));
971 }
972 }
973
974 let response = json!({
975 "status": "success",
976 "message": "CashGuard device reset.",
977 });
978 self.send_message(&write, response).await;
979 (0, "Device reset successfully".to_string())
980 }
981 "CONTENTS" => {
982 let contents = self
983 .send_request("GET", "contents", None)
984 .await
985 .unwrap_or(json!({}));
986 let response = json!({
987 "status": "success",
988 "contents": contents,
989 "command": command,
990 });
991 self.send_message(&write, response).await;
992 (0, "Contents retrieved successfully".to_string())
993 }
994 "STATUS" => {
995 let status = self.status().await.unwrap_or(json!({}));
996 let response = json!({
997 "status": "success",
998 "system_status": status,
999 "command": command,
1000 });
1001 self.send_message(&write, response).await;
1002 (0, "Status retrieved successfully".to_string())
1003 }
1004 "INITIATE_TRANSACTION" => {
1005 if let Err(e) = self.initiate_transaction(&transaction_id, amount_due).await {
1006 error!("Failed to initiate transaction: {}", e);
1007 return Err((1, e));
1008 }
1009 let response = json!({
1010 "status": "success",
1011 "message": "Transaction initiated.",
1012 });
1013 self.send_message(&write, response).await;
1014 (0, "Transaction initiated successfully".to_string())
1015 },
1016 "SET_AMOUNT_DUE" => {
1017 if let Err(e) = self.set_amount_due(&transaction_id, amount_due).await {
1018 error!("Failed to set amount due: {}", e);
1019 return Err((1, e));
1020 }
1021 let response = json!({
1022 "status": "success",
1023 "message": "Amount due set.",
1024 });
1025 self.send_message(&write, response).await;
1026 (0, "Amount due set successfully".to_string())
1027 },
1028 "CANCEL_TRANSACTION" => {
1029 if let Err(e) = self.set_amount_due(&transaction_id, 0.0).await {
1030 error!(
1031 "Failed to cancel transaction {}, trying to release instead: {}",
1032 transaction_id, e
1033 );
1034 self.send_request(
1035 "PUT",
1036 &format!("transactions/{}?status=released", transaction_id),
1037 None,
1038 )
1039 .await
1040 .unwrap_or_default();
1041 }
1042 let response = json!({
1043 "status": "cancelled",
1044 "message": "Transaction cancelled.",
1045 });
1046 self.send_message(&write, response).await;
1047 let mut processing_ids = self.processing_ids.lock().await;
1049 processing_ids.retain(|id| id != transaction_id);
1050 drop(processing_ids); (0, "Transaction cancelled successfully".to_string())
1052 }
1053 "RELEASE_TRANSACTION" => {
1054 let queue = self
1055 .send_request("GET", "queue", None)
1056 .await
1057 .unwrap_or(json!({}));
1058 if let Some(queue_data) = queue.as_array() {
1059 for item in queue_data {
1060 let t_id = item
1061 .get("transactionId")
1062 .and_then(|t| t.as_str())
1063 .unwrap_or("");
1064 if t_id == transaction_id {
1065 let status = item.get("status").and_then(|s| s.as_str()).unwrap_or("");
1066 match self.cancel_transaction(t_id, status).await {
1067 Ok(_) => {
1068 let response = json!({
1069 "status": "success",
1070 "message": "Transaction released.",
1071 });
1072 self.send_message(&write, response).await;
1073 return Err((0, "Transaction released successfully".to_string()));
1074 }
1075 Err(e) => {
1076 error!("Failed to release transaction: {}", e);
1077 return Err((1, e));
1078 }
1079 }
1080 }
1081 }
1082 }
1083 (0, "Transaction released successfully".to_string())
1084 }
1085 "GET_QUEUE" => {
1086 match self.get_queue().await {
1087 Ok(queue) => {
1088 let response = json!({
1089 "status": "success",
1090 "queue": queue,
1091 "command": command,
1092 });
1093 self.send_message(&write, response).await;
1094 (0, "Queue retrieved successfully".to_string())
1095 }
1096 Err(e) => {
1097 error!("Failed to get queue: {}", e);
1098 (1, e)
1099 }
1100 }
1101 }
1102 "LOGOUT" => {
1103 if let Err(e) = self.logout().await {
1104 error!("Failed to log out: {}", e);
1105 return Err((1, e));
1106 }
1107 let response = json!({
1108 "status": "success",
1109 "message": "Logged out.",
1110 });
1111 self.send_message(&write, response).await;
1112 (0, "Logged out successfully".to_string())
1113 }
1114 "LOGIN" => {
1115 if let Err(e) = self.login().await {
1116 error!("Failed to log in: {}", e);
1117 return Err((1, e));
1118 }
1119 let response = json!({
1120 "status": "success",
1121 "message": "Logged in.",
1122 });
1123 self.send_message(&write, response).await;
1124 (0, "Logged in successfully".to_string())
1125 }
1126 "CLEAR_TRANSACTION" =>{
1127 info!("Manually clearing queue");
1129 let queue = self.get_queue().await.unwrap_or(json!({}));
1130
1131 if let Some(queue_data) = queue.as_array() {
1132 for item in queue_data {
1133 let transaction_id = item.get("transactionId").and_then(|t| t.as_str()).unwrap_or("");
1134 let status = item.get("status").and_then(|s| s.as_str()).unwrap_or("");
1135 let being_handled = item.get("beingHandledBySystem").and_then(|b| b.as_bool()).unwrap_or(false);
1136 if !transaction_id.is_empty() {
1137 let _ = self.cancel_single_transaction(transaction_id, status, being_handled).await;
1138 tokio::time::sleep(Duration::from_millis(1000)).await;
1139 }
1140 }
1141 }
1142
1143 let response = json!({
1144 "status": "success",
1145 "message": "Queue cleared manually.",
1146 });
1147 self.send_message(&write, response).await;
1148 (0, json!({"status": "success", "message": "Queue cleared successfully"}).to_string())
1149
1150 }
1151 _ => {
1152 error!("Unknown command: {}", command);
1153 (1, format!("Unknown command: {}", command))
1154 }
1155 })
1156 }
1157}
1158
1159#[async_trait]
1160impl Service for CashGuardService {
1161 async fn run(&self, action: Value, write: WebSocketWrite) -> (i32, String) {
1174 let action_data = action.get("ACTION").and_then(|a| a.as_object());
1175 if action_data.is_none() {
1176 error!("Invalid action format: missing 'ACTION'");
1177 return (1, "Invalid action format: missing 'ACTION'".to_string());
1178 }
1179 let action_data = action_data.unwrap();
1180
1181 let command = action_data
1182 .get("command")
1183 .and_then(|c| c.as_str())
1184 .unwrap_or("")
1185 .to_string();
1186 let transaction_id = match action_data.get("transaction_id") {
1187 Some(Value::String(s)) => s.clone(),
1188 Some(Value::Number(n)) => n.to_string(),
1189 _ => "".to_string(),
1190 };
1191
1192 let amount_due = action_data
1193 .get("amount_due")
1194 .and_then(|a| a.as_f64())
1195 .unwrap_or(0.0);
1196
1197
1198 match self.handle_command(&write, command, &transaction_id, amount_due).await {
1199 Ok(value) => value,
1200 Err(value) => return value,
1201 }
1202 }
1203
1204 fn as_any(&self) -> &dyn std::any::Any {
1210 self
1211 }
1212
1213 fn stop_service(&self) {
1215 info!("CashGuard service stopped.");
1216 }
1217
1218 fn get_version(&self) -> String {
1224 CASH_GUARD_VERSION.to_string()
1225 }
1226}