1use tokio::io::AsyncWriteExt;
2use async_trait::async_trait;
3use serde_json::{json, Value};
4
5use std::sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8};
9use tokio::{
10 io::AsyncReadExt,
11 time::sleep,
12 sync::RwLock,
13 task,
14 time::Duration,
15};
16use tokio_serial::SerialPortBuilderExt;
17use easy_trace::prelude::{error, info};
18use crate::service::{Service, WebSocketWrite};
19use regex::Regex;
20use crate::send_message;
21
22pub const SCALE_VERSION: &str = "1.0.0";
23
24pub struct ScaleService {
27 capturing: Arc<AtomicBool>, last_line: Arc<RwLock<Option<String>>>, }
30
31impl ScaleService {
32 pub fn new() -> Self {
34 info!("ScaleService initialized (version: {})", SCALE_VERSION);
35 ScaleService {
36 capturing: Arc::new(AtomicBool::new(false)),
37 last_line: Arc::new(RwLock::new(None)),
38 }
39 }
40
41 pub fn stop(&self) {
43 self.capturing.store(false, Ordering::Relaxed);
44 info!("ScaleService capturing stopped.");
45 }
46
47 pub async fn read_serial(&self, port: String, write: WebSocketWrite, uuid: String) {
54 let capturing = self.capturing.clone();
55 let last_line = self.last_line.clone();
56
57 {
59 let mut last_line_guard = self.last_line.write().await;
60 *last_line_guard = Some("-1".to_string());
61 info!("Reset last line to {:?}", last_line_guard);
62 }
63
64 task::spawn(async move {
66 match tokio_serial::new(&port, 9600)
67 .data_bits(tokio_serial::DataBits::Eight)
68 .parity(tokio_serial::Parity::None)
69 .stop_bits(tokio_serial::StopBits::One)
70 .timeout(Duration::from_secs(2))
71 .open_native_async()
72 {
73 Ok(mut serial) => {
74 info!("Opened serial port: {}", port);
75
76 let regex = Regex::new(r"-?\d+(\.\d+)?").unwrap();
77
78 while capturing.load(Ordering::Relaxed) {
79 if let Err(e) = serial.write_all(b"$").await {
80 error!("Error writing '$' to serial port: {:?}", e);
81 }
82 let mut line = vec![];
83 let mut buf = [0; 1];
84
85 loop {
87 match serial.read_exact(&mut buf).await {
88 Ok(_) => {
89 if buf[0] == b'\r' {
90 break;
91 }
92 line.push(buf[0]);
93 }
94 Err(e) => {
95 error!("Error reading from serial port: {:?}", e);
96 break;
97 }
98 }
99 }
100
101 if let Ok(line_str) = String::from_utf8(line.clone()) {
102 if let Some(value) = regex
104 .captures(&line_str)
105 .and_then(|caps| caps.get(0))
106 .map(|m| m.as_str())
107 .and_then(|num| num.parse::<f64>().ok())
108 {
109 let scaled_value = format!("{:.2}", value * 1000.0);
110
111 let mut last_line_guard = last_line.write().await;
113 if last_line_guard.as_ref() != Some(&scaled_value) {
114 *last_line_guard = Some(scaled_value.clone());
115
116 let message_data = json!({
118 "SERVICE_NAME": "SCALE",
119 "SERVICE_VERS": "1.0.0",
120 "MESSAGE_TYPE": "RESPONSE",
121 "MESSAGE_EXEC": "SUCCESS",
122 "MESSAGE_UUID": uuid,
123 "MESSAGE_DATA": scaled_value,
124 })
125 .to_string();
126
127 send_message(&write, message_data).await;
128 }
129 }
130 }
131
132 sleep(Duration::from_millis(10)).await;
134 }
135
136 info!("Serial port {} closed.", port);
137 }
138 Err(e) => {
139 error!("Failed to open serial port \"{}\": {:?}", port, e);
140 }
141 }
142 });
143 }
144
145 pub async fn generate_incremental_weight(&self, write: WebSocketWrite, uuid: String) {
151 let capturing = self.capturing.clone();
152
153 task::spawn(async move {
154 let mut weight = 0.0;
155
156 while capturing.load(Ordering::Relaxed) {
157 info!("Generated incremental weight: {:.0}", weight);
158
159 let message_data = json!({
161 "SERVICE_NAME": "SCALE",
162 "SERVICE_VERS": "1.0",
163 "MESSAGE_TYPE": "RESPONSE",
164 "MESSAGE_EXEC": "SUCCESS",
165 "MESSAGE_UUID": uuid,
166 "MESSAGE_DATA": format!("{:.0}", weight),
167 })
168 .to_string();
169
170 send_message(&write, message_data).await;
171
172 weight = (weight + 5.0) % 1000.0;
174 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
175 }
176
177 info!("Stopped generating incremental weight.");
178 });
179 }
180
181
182}
183
184#[async_trait]
185impl Service for ScaleService {
186 async fn run(&self, action: Value, write: WebSocketWrite) -> (i32, String) {
203 info!("ScaleService: Running action: {:?}", action);
204
205 let action_object = match action.get("ACTION") {
206 Some(Value::Object(obj)) => obj,
207 _ => {
208 error!("Invalid ACTION format: {:?}", action);
209 return (1, "Invalid ACTION format".to_string());
210 }
211 };
212
213 let command = action_object
214 .get("command")
215 .and_then(|c| c.as_str())
216 .unwrap_or("UNKNOWN");
217
218 let device = action_object
219 .get("device")
220 .and_then(|d| d.as_str())
221 .unwrap_or("UNKNOWN");
222
223 let port = action_object
224 .get("port")
225 .and_then(|p| p.as_str())
226 .unwrap_or("COM1");
227
228 let uuid = action
229 .get("UUIDV4")
230 .and_then(|u| u.as_str())
231 .unwrap_or("unknown-uuid")
232 .to_string();
233
234 info!("Command: {}, Device: {}, Port: {}", command, device, port);
235
236 match command {
237 "START" => {
238 if self.capturing.load(Ordering::Relaxed) {
239 info!("Already capturing, stopping current capture.");
240 self.stop();
241 }
242 tokio::time::sleep(Duration::from_millis(20)).await;
243
244 self.capturing.store(true, Ordering::Relaxed);
245
246 if device == "TEST" {
247 info!("Started generating incremental weight.");
248 self.generate_incremental_weight(write, uuid.clone()).await;
249 } else {
250 self.read_serial(port.to_string(), write, uuid.clone()).await;
251 }
252
253 (0, uuid)
254 }
255 "STOP" => {
256 self.stop();
257 (0, "Stopped capturing data from scale".to_string())
258 }
259 _ => {
260 error!("Unknown command: {}", command);
261 (1, "Unknown command".to_string())
262 }
263 }
264 }
265
266 fn as_any(&self) -> &dyn std::any::Any {
272 self
273 }
274
275 fn stop_service(&self) {
281 self.stop();
282 }
283
284 fn get_version(&self) -> String {
290 SCALE_VERSION.to_string()
291 }
292}