pispas_modules/
service.rs

1use async_trait::async_trait;
2use serde_json::Value;
3use std::any::Any;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
8use tokio_tungstenite::tungstenite::protocol::Message;
9use tokio_tungstenite::WebSocketStream;
10use futures_util::stream::SplitSink;
11use tokio::sync::RwLock;
12
13/// Supertrait that bundles the async IO bounds we need for an erased stream.
14///
15/// Rust trait objects only allow **one** non-auto trait, so we cannot write
16/// `Box<dyn AsyncRead + AsyncWrite + …>`. This supertrait collapses the
17/// bounds into a single name that is trait-object-compatible.
18///
19/// The blanket impl covers every concrete stream type we care about
20/// (`TcpStream`, `TlsStream<TcpStream>`), so wrapping is a zero-cost
21/// `Box::new(stream)`.
22pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send {}
23impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsyncStream for T {}
24
25/// Type-erased async stream used throughout the WebSocket layer.
26///
27/// The local WebSocket listener in `pispas-modules` operates in dual mode:
28/// it accepts plain TCP (`ws://`) **and** TLS (`wss://`) on the same port.
29/// The two concrete stream types are different (`TcpStream` vs
30/// `TlsStream<TcpStream>`), but every service in the workspace needs to
31/// store a write-half of the socket in a single concrete type.
32///
33/// `BoxedStream` wraps the stream in `Box<dyn AsyncStream>` and forwards
34/// `poll_read`, `poll_write`, `poll_flush`, `poll_shutdown` to the inner
35/// stream, so downstream code is transport-agnostic.
36///
37/// See `docs/DESIGN.md §2` for the reasoning and `docs/ARCHITECTURE.md §2`
38/// for the WebSocket topology.
39pub struct BoxedStream(pub Box<dyn AsyncStream>);
40
41impl AsyncRead for BoxedStream {
42    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
43        Pin::new(&mut *self.0).poll_read(cx, buf)
44    }
45}
46
47impl AsyncWrite for BoxedStream {
48    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
49        Pin::new(&mut *self.0).poll_write(cx, buf)
50    }
51    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
52        Pin::new(&mut *self.0).poll_flush(cx)
53    }
54    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
55        Pin::new(&mut *self.0).poll_shutdown(cx)
56    }
57}
58
59impl Unpin for BoxedStream {}
60
61/// Handle used by services to push WebSocket messages back to a client.
62///
63/// * `Some(lock)` — the service was invoked from the **local** WebSocket,
64///   which supports async back-channel notifications. Services can acquire
65///   the write lock and push arbitrary `Message::Text(_)` frames.
66/// * `None` — the service was invoked from the **remote** WebSocket
67///   (outbound connection to `wss.unpispas.es`). There is no streaming
68///   back-channel; the service must respond only through the return value
69///   of `run()`.
70///
71/// Use `send_message` from the `pispas_modules::utils` module to write
72/// through the lock safely.
73pub type WebSocketWrite = Option<Arc<RwLock<SplitSink<WebSocketStream<BoxedStream>, Message>>>>;
74
75/// Common interface every business module in `pispas-modules` implements.
76///
77/// Incoming WebSocket messages are dispatched to an implementation by the
78/// `TARGET` field of the JSON payload. The trait stays deliberately small
79/// so adding a new module is cheap: implement [`Service::run`], return the
80/// static version via [`Service::get_version`], hook it up in
81/// `load_services`, and document it in `docs/MODULES.md`.
82///
83/// ## Example skeleton
84///
85/// ```ignore
86/// use async_trait::async_trait;
87/// use serde_json::Value;
88/// use pispas_modules::prelude::{Service, WebSocketWrite};
89///
90/// pub struct MyService;
91///
92/// #[async_trait]
93/// impl Service for MyService {
94///     async fn run(&self, action: Value, _write: WebSocketWrite) -> (i32, String) {
95///         match action.get("ACTION").and_then(|v| v.as_str()) {
96///             Some("PING") => (0, "pong".into()),
97///             _            => (1, "unknown action".into()),
98///         }
99///     }
100///
101///     fn as_any(&self) -> &dyn std::any::Any { self }
102///     fn stop_service(&self) {}
103///     fn get_version(&self) -> String { env!("CARGO_PKG_VERSION").into() }
104/// }
105/// ```
106#[async_trait]
107pub trait Service: Send + Sync {
108    /// Executes an action for the service.
109    ///
110    /// # Arguments
111    ///
112    /// * `action` - A `Value` object representing the action to be performed.
113    ///   The structure of the action depends on the service implementation.
114    /// * `write` - A `WebSocketWrite` object for sending responses or notifications
115    ///   via a WebSocket connection.
116    ///
117    /// # Returns
118    ///
119    /// A tuple containing:
120    /// * `i32` - Status code (e.g., 0 for success, 1 for error).
121    /// * `String` - A descriptive message or response related to the action.
122    ///
123    /// # Notes
124    ///
125    /// This method must be implemented asynchronously by the service.
126    async fn run(
127        &self,
128        action: Value,
129        write: WebSocketWrite,
130    ) -> (i32, String);
131
132    /// Allows dynamic casting of the service to a specific type.
133    ///
134    /// # Returns
135    ///
136    /// A reference to `dyn Any`, which can be used for type-safe downcasting.
137    #[allow(dead_code)]
138    fn as_any(&self) -> &dyn Any;
139
140    /// Stops the service and cleans up resources.
141    ///
142    /// # Notes
143    ///
144    /// This method should ensure that all tasks and processes related to the
145    /// service are properly terminated. It can also reset any internal states.
146    #[allow(dead_code)]
147    fn stop_service(&self);
148
149    /// Retrieves the version of the service.
150    ///
151    /// # Returns
152    ///
153    /// A `String` containing the version information of the service.
154    fn get_version(&self) -> String;
155}
156
157