pispas_modules/
scale.rs

1use tokio::io::AsyncWriteExt;
2use async_trait::async_trait;
3use serde_json::{json, Value};
4
5use std::sync::{
6    atomic::{AtomicBool, Ordering},
7    Arc,
8};
9use tokio::{
10    io::AsyncReadExt,
11    time::sleep,
12    sync::RwLock,
13    task,
14    time::Duration,
15};
16use tokio_serial::SerialPortBuilderExt;
17use easy_trace::prelude::{error, info};
18use crate::service::{Service, WebSocketWrite};
19use regex::Regex;
20use crate::send_message;
21
22pub const SCALE_VERSION: &str = "1.0.0";
23
24/// `ScaleService` struct to manage scale operations, including capturing data
25/// and interacting with WebSocket connections.
26pub struct ScaleService {
27    capturing: Arc<AtomicBool>,          // Tracks whether data capturing is active
28    last_line: Arc<RwLock<Option<String>>>, // Stores the last captured value to prevent duplicates
29}
30
31impl ScaleService {
32    /// Creates a new `ScaleService` instance.
33    pub fn new() -> Self {
34        info!("ScaleService initialized (version: {})", SCALE_VERSION);
35        ScaleService {
36            capturing: Arc::new(AtomicBool::new(false)),
37            last_line: Arc::new(RwLock::new(None)),
38        }
39    }
40
41    /// Stops the current data capturing process.
42    pub fn stop(&self) {
43        self.capturing.store(false, Ordering::Relaxed);
44        info!("ScaleService capturing stopped.");
45    }
46
47    /// Reads data from a serial port and sends parsed values to a WebSocket.
48    ///
49    /// # Arguments
50    /// - `port`: The name of the serial port to read from.
51    /// - `write`: The WebSocket connection for sending messages.
52    /// - `uuid`: A unique identifier for the capture session.
53    pub async fn read_serial(&self, port: String, write: WebSocketWrite, uuid: String) {
54        let capturing = self.capturing.clone();
55        let last_line = self.last_line.clone();
56
57        // Reset last_line to a default value
58        {
59            let mut last_line_guard = self.last_line.write().await;
60            *last_line_guard = Some("-1".to_string());
61            info!("Reset last line to {:?}", last_line_guard);
62        }
63
64        // Start reading data in a separate task
65        task::spawn(async move {
66            match tokio_serial::new(&port, 9600)
67                .data_bits(tokio_serial::DataBits::Eight)
68                .parity(tokio_serial::Parity::None)
69                .stop_bits(tokio_serial::StopBits::One)
70                .timeout(Duration::from_secs(2))
71                .open_native_async()
72            {
73                Ok(mut serial) => {
74                    info!("Opened serial port: {}", port);
75
76                    let regex = Regex::new(r"-?\d+(\.\d+)?").unwrap();
77
78                    while capturing.load(Ordering::Relaxed) {
79                        if let Err(e) = serial.write_all(b"$").await {
80                            error!("Error writing '$' to serial port: {:?}", e);                            
81                        }
82                        let mut line = vec![];
83                        let mut buf = [0; 1];
84
85                        // Read one line of data from the serial port
86                        loop {
87                            match serial.read_exact(&mut buf).await {
88                                Ok(_) => {
89                                    if buf[0] == b'\r' {
90                                        break;
91                                    }
92                                    line.push(buf[0]);
93                                }
94                                Err(e) => {
95                                    error!("Error reading from serial port: {:?}", e);
96                                    break;
97                                }
98                            }
99                        }
100
101                        if let Ok(line_str) = String::from_utf8(line.clone()) {
102                            // Parse and scale the value if it's numeric
103                            if let Some(value) = regex
104                                .captures(&line_str)
105                                .and_then(|caps| caps.get(0))
106                                .map(|m| m.as_str())
107                                .and_then(|num| num.parse::<f64>().ok())
108                            {
109                                let scaled_value = format!("{:.2}", value * 1000.0);
110
111                                // Prevent duplicate values from being sent
112                                let mut last_line_guard = last_line.write().await;
113                                if last_line_guard.as_ref() != Some(&scaled_value) {
114                                    *last_line_guard = Some(scaled_value.clone());
115
116                                    // Create and send the message
117                                    let message_data = json!({
118                                        "SERVICE_NAME": "SCALE",
119                                        "SERVICE_VERS": "1.0.0",
120                                        "MESSAGE_TYPE": "RESPONSE",
121                                        "MESSAGE_EXEC": "SUCCESS",
122                                        "MESSAGE_UUID": uuid,
123                                        "MESSAGE_DATA": scaled_value,
124                                    })
125                                        .to_string();
126
127                                    send_message(&write, message_data).await;
128                                }
129                            }
130                        }
131
132                        // Add a small delay to prevent high CPU usage
133                        sleep(Duration::from_millis(10)).await;
134                    }
135
136                    info!("Serial port {} closed.", port);
137                }
138                Err(e) => {
139                    error!("Failed to open serial port \"{}\": {:?}", port, e);
140                }
141            }
142        });
143    }
144
145    /// Simulates incremental weight generation for testing purposes.
146    ///
147    /// # Arguments
148    /// - `write`: The WebSocket connection for sending messages.
149    /// - `uuid`: A unique identifier for the session.
150    pub async fn generate_incremental_weight(&self, write: WebSocketWrite, uuid: String) {
151        let capturing = self.capturing.clone();
152
153        task::spawn(async move {
154            let mut weight = 0.0;
155
156            while capturing.load(Ordering::Relaxed) {
157                info!("Generated incremental weight: {:.0}", weight);
158
159                // Create and send the message
160                let message_data = json!({
161                    "SERVICE_NAME": "SCALE",
162                    "SERVICE_VERS": "1.0",
163                    "MESSAGE_TYPE": "RESPONSE",
164                    "MESSAGE_EXEC": "SUCCESS",
165                    "MESSAGE_UUID": uuid,
166                    "MESSAGE_DATA": format!("{:.0}", weight),
167                })
168                    .to_string();
169
170                send_message(&write, message_data).await;
171
172                // Increment weight and loop
173                weight = (weight + 5.0) % 1000.0;
174                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
175            }
176
177            info!("Stopped generating incremental weight.");
178        });
179    }
180
181
182}
183
184#[async_trait]
185impl Service for ScaleService {
186    /// Executes an action for the scale service.
187    ///
188    /// Supported actions:
189    /// - "START": Starts data capturing from the scale. Can handle a test mode or real serial input.
190    /// - "STOP": Stops data capturing.
191    ///
192    /// # Arguments
193    ///
194    /// * `action` - A JSON value containing details of the action. Includes `command`, `device`, `port`, and `UUIDV4`.
195    /// * `write` - A `WebSocketWrite` object to communicate results back via WebSocket.
196    ///
197    /// # Returns
198    ///
199    /// A tuple containing:
200    /// * `i32` - Status code (0 for success, 1 for failure).
201    /// * `String` - A descriptive message or UUID related to the action result.
202    async fn run(&self, action: Value, write: WebSocketWrite) -> (i32, String) {
203        info!("ScaleService: Running action: {:?}", action);
204
205        let action_object = match action.get("ACTION") {
206            Some(Value::Object(obj)) => obj,
207            _ => {
208                error!("Invalid ACTION format: {:?}", action);
209                return (1, "Invalid ACTION format".to_string());
210            }
211        };
212
213        let command = action_object
214            .get("command")
215            .and_then(|c| c.as_str())
216            .unwrap_or("UNKNOWN");
217
218        let device = action_object
219            .get("device")
220            .and_then(|d| d.as_str())
221            .unwrap_or("UNKNOWN");
222
223        let port = action_object
224            .get("port")
225            .and_then(|p| p.as_str())
226            .unwrap_or("COM1");
227
228        let uuid = action
229            .get("UUIDV4")
230            .and_then(|u| u.as_str())
231            .unwrap_or("unknown-uuid")
232            .to_string();
233
234        info!("Command: {}, Device: {}, Port: {}", command, device, port);
235
236        match command {
237            "START" => {
238                if self.capturing.load(Ordering::Relaxed) {
239                    info!("Already capturing, stopping current capture.");
240                    self.stop();
241                }
242                tokio::time::sleep(Duration::from_millis(20)).await;
243
244                self.capturing.store(true, Ordering::Relaxed);
245
246                if device == "TEST" {
247                    info!("Started generating incremental weight.");
248                    self.generate_incremental_weight(write, uuid.clone()).await;
249                } else {
250                    self.read_serial(port.to_string(), write, uuid.clone()).await;
251                }
252
253                (0, uuid)
254            }
255            "STOP" => {
256                self.stop();
257                (0, "Stopped capturing data from scale".to_string())
258            }
259            _ => {
260                error!("Unknown command: {}", command);
261                (1, "Unknown command".to_string())
262            }
263        }
264    }
265
266    /// Converts the service instance into a `dyn Any` reference.
267    ///
268    /// # Returns
269    ///
270    /// A reference to `dyn Any` for dynamic type checks.
271    fn as_any(&self) -> &dyn std::any::Any {
272        self
273    }
274
275    /// Stops the scale service and halts data capturing.
276    ///
277    /// Ensures the following:
278    /// * Stops any ongoing capturing tasks.
279    /// * Resets internal states as needed.
280    fn stop_service(&self) {
281        self.stop();
282    }
283
284    /// Retrieves the current version of the scale service.
285    ///
286    /// # Returns
287    ///
288    /// A `String` containing the version of the scale service.
289    fn get_version(&self) -> String {
290        SCALE_VERSION.to_string()
291    }
292}