pispas_modules/service.rs
1use async_trait::async_trait;
2use serde_json::Value;
3use std::any::Any;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
8use tokio_tungstenite::tungstenite::protocol::Message;
9use tokio_tungstenite::WebSocketStream;
10use futures_util::stream::SplitSink;
11use tokio::sync::RwLock;
12
13/// Supertrait that bundles the async IO bounds we need for an erased stream.
14///
15/// Rust trait objects only allow **one** non-auto trait, so we cannot write
16/// `Box<dyn AsyncRead + AsyncWrite + …>`. This supertrait collapses the
17/// bounds into a single name that is trait-object-compatible.
18///
19/// The blanket impl covers every concrete stream type we care about
20/// (`TcpStream`, `TlsStream<TcpStream>`), so wrapping is a zero-cost
21/// `Box::new(stream)`.
22pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send {}
23impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsyncStream for T {}
24
25/// Type-erased async stream used throughout the WebSocket layer.
26///
27/// The local WebSocket listener in `pispas-modules` operates in dual mode:
28/// it accepts plain TCP (`ws://`) **and** TLS (`wss://`) on the same port.
29/// The two concrete stream types are different (`TcpStream` vs
30/// `TlsStream<TcpStream>`), but every service in the workspace needs to
31/// store a write-half of the socket in a single concrete type.
32///
33/// `BoxedStream` wraps the stream in `Box<dyn AsyncStream>` and forwards
34/// `poll_read`, `poll_write`, `poll_flush`, `poll_shutdown` to the inner
35/// stream, so downstream code is transport-agnostic.
36///
37/// See `docs/DESIGN.md §2` for the reasoning and `docs/ARCHITECTURE.md §2`
38/// for the WebSocket topology.
39pub struct BoxedStream(pub Box<dyn AsyncStream>);
40
41impl AsyncRead for BoxedStream {
42 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
43 Pin::new(&mut *self.0).poll_read(cx, buf)
44 }
45}
46
47impl AsyncWrite for BoxedStream {
48 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
49 Pin::new(&mut *self.0).poll_write(cx, buf)
50 }
51 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
52 Pin::new(&mut *self.0).poll_flush(cx)
53 }
54 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
55 Pin::new(&mut *self.0).poll_shutdown(cx)
56 }
57}
58
59impl Unpin for BoxedStream {}
60
61/// Handle used by services to push WebSocket messages back to a client.
62///
63/// * `Some(lock)` — the service was invoked from the **local** WebSocket,
64/// which supports async back-channel notifications. Services can acquire
65/// the write lock and push arbitrary `Message::Text(_)` frames.
66/// * `None` — the service was invoked from the **remote** WebSocket
67/// (outbound connection to `wss.unpispas.es`). There is no streaming
68/// back-channel; the service must respond only through the return value
69/// of `run()`.
70///
71/// Use `send_message` from the `pispas_modules::utils` module to write
72/// through the lock safely.
73pub type WebSocketWrite = Option<Arc<RwLock<SplitSink<WebSocketStream<BoxedStream>, Message>>>>;
74
75/// Common interface every business module in `pispas-modules` implements.
76///
77/// Incoming WebSocket messages are dispatched to an implementation by the
78/// `TARGET` field of the JSON payload. The trait stays deliberately small
79/// so adding a new module is cheap: implement [`Service::run`], return the
80/// static version via [`Service::get_version`], hook it up in
81/// `load_services`, and document it in `docs/MODULES.md`.
82///
83/// ## Example skeleton
84///
85/// ```ignore
86/// use async_trait::async_trait;
87/// use serde_json::Value;
88/// use pispas_modules::prelude::{Service, WebSocketWrite};
89///
90/// pub struct MyService;
91///
92/// #[async_trait]
93/// impl Service for MyService {
94/// async fn run(&self, action: Value, _write: WebSocketWrite) -> (i32, String) {
95/// match action.get("ACTION").and_then(|v| v.as_str()) {
96/// Some("PING") => (0, "pong".into()),
97/// _ => (1, "unknown action".into()),
98/// }
99/// }
100///
101/// fn as_any(&self) -> &dyn std::any::Any { self }
102/// fn stop_service(&self) {}
103/// fn get_version(&self) -> String { env!("CARGO_PKG_VERSION").into() }
104/// }
105/// ```
106#[async_trait]
107pub trait Service: Send + Sync {
108 /// Executes an action for the service.
109 ///
110 /// # Arguments
111 ///
112 /// * `action` - A `Value` object representing the action to be performed.
113 /// The structure of the action depends on the service implementation.
114 /// * `write` - A `WebSocketWrite` object for sending responses or notifications
115 /// via a WebSocket connection.
116 ///
117 /// # Returns
118 ///
119 /// A tuple containing:
120 /// * `i32` - Status code (e.g., 0 for success, 1 for error).
121 /// * `String` - A descriptive message or response related to the action.
122 ///
123 /// # Notes
124 ///
125 /// This method must be implemented asynchronously by the service.
126 async fn run(
127 &self,
128 action: Value,
129 write: WebSocketWrite,
130 ) -> (i32, String);
131
132 /// Allows dynamic casting of the service to a specific type.
133 ///
134 /// # Returns
135 ///
136 /// A reference to `dyn Any`, which can be used for type-safe downcasting.
137 #[allow(dead_code)]
138 fn as_any(&self) -> &dyn Any;
139
140 /// Stops the service and cleans up resources.
141 ///
142 /// # Notes
143 ///
144 /// This method should ensure that all tasks and processes related to the
145 /// service are properly terminated. It can also reset any internal states.
146 #[allow(dead_code)]
147 fn stop_service(&self);
148
149 /// Retrieves the version of the service.
150 ///
151 /// # Returns
152 ///
153 /// A `String` containing the version information of the service.
154 fn get_version(&self) -> String;
155}
156
157