general_mq/mqtt/
queue.rs

1use std::{
2    error::Error as StdError,
3    sync::{Arc, Mutex},
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use rumqttc::{ClientError as RumqttError, Publish, QoS};
9use tokio::{
10    task::{self, JoinHandle},
11    time,
12};
13
14use super::connection::{MqttConnection, PacketHandler};
15use crate::{
16    connection::{GmqConnection, Status as ConnStatus},
17    queue::{
18        name_validate, EventHandler, GmqQueue, Message, MessageHandler, Status, QUEUE_NAME_PATTERN,
19    },
20    Error,
21};
22
23/// Manages a MQTT queue.
24#[derive(Clone)]
25pub struct MqttQueue {
26    /// Options of the queue.
27    opts: MqttQueueOptions,
28    /// The associated [`crate::MqttConnection`].
29    conn: Arc<Mutex<MqttConnection>>,
30    /// Queue status.
31    status: Arc<Mutex<Status>>,
32    /// The event handler.
33    handler: Arc<Mutex<Option<Arc<dyn EventHandler>>>>,
34    /// The message handler.
35    msg_handler: Arc<Mutex<Option<Arc<dyn MessageHandler>>>>,
36    /// The event loop to manage and monitor the connection.
37    ev_loop: Arc<Mutex<Option<JoinHandle<()>>>>,
38}
39
40/// The queue options.
41#[derive(Clone)]
42pub struct MqttQueueOptions {
43    /// The queue name that is used to map a MQTT topic.
44    ///
45    /// The pattern is [`QUEUE_NAME_PATTERN`].
46    pub name: String,
47    /// `true` for the receiver and `false` for the sender.
48    pub is_recv: bool,
49    /// Reliable by using QoS 1.
50    pub reliable: bool,
51    /// `true` for broadcast and `false` for unicast.
52    ///
53    /// **Note**: the unicast queue relies on **shared queue**. See the `shared_prefix` option.
54    pub broadcast: bool,
55    /// Time in milliseconds from disconnection to reconnection.
56    ///
57    /// Default or zero value is `1000`.
58    pub reconnect_millis: u64,
59    /// Used for `broadcast=false`.
60    pub shared_prefix: Option<String>,
61}
62
63/// The MQTT [`Message`] implementation.
64pub struct MqttMessage {
65    /// Hold the [`rumqttc::Publish`] instance.
66    packet: Publish,
67}
68
69/// Default reconnect time in milliseconds.
70const DEF_RECONN_TIME_MS: u64 = 1000;
71
72impl MqttQueue {
73    /// Create a queue instance.
74    pub fn new(opts: MqttQueueOptions, conn: &MqttConnection) -> Result<MqttQueue, String> {
75        let name = opts.name.as_str();
76        if name.len() == 0 {
77            return Err("queue name cannot be empty".to_string());
78        } else if !name_validate(name) {
79            return Err(format!(
80                "queue name {} is not match {}",
81                name, QUEUE_NAME_PATTERN
82            ));
83        }
84        let mut opts = opts;
85        if opts.reconnect_millis == 0 {
86            opts.reconnect_millis = DEF_RECONN_TIME_MS;
87        }
88
89        Ok(MqttQueue {
90            opts,
91            conn: Arc::new(Mutex::new(conn.clone())),
92            status: Arc::new(Mutex::new(Status::Closed)),
93            handler: Arc::new(Mutex::new(None)),
94            msg_handler: Arc::new(Mutex::new(None)),
95            ev_loop: Arc::new(Mutex::new(None)),
96        })
97    }
98
99    /// To get the associated connection status.
100    fn conn_status(&self) -> ConnStatus {
101        self.conn.lock().unwrap().status()
102    }
103
104    /// To get the event handler.
105    fn handler(&self) -> Option<Arc<dyn EventHandler>> {
106        self.handler.lock().unwrap().clone()
107    }
108
109    /// To get the message handler.
110    fn msg_handler(&self) -> Option<Arc<dyn MessageHandler>> {
111        self.msg_handler.lock().unwrap().clone()
112    }
113
114    /// The error handling.
115    fn on_error(&self, err: Box<dyn StdError + Send + Sync>) {
116        let handler = { (*self.handler.lock().unwrap()).clone() };
117        if let Some(handler) = handler {
118            let q = Arc::new(self.clone());
119            task::spawn(async move {
120                handler.on_error(q, err).await;
121            });
122        }
123    }
124
125    /// To get the associated topic.
126    fn topic(&self) -> String {
127        if self.opts.is_recv && !self.opts.broadcast {
128            if let Some(prefix) = self.opts.shared_prefix.as_ref() {
129                return format!("{}{}", prefix.as_str(), self.opts.name.as_str());
130            }
131        }
132        self.opts.name.clone()
133    }
134
135    /// To get the associated QoS.
136    fn qos(&self) -> QoS {
137        match self.opts.reliable {
138            false => QoS::AtMostOnce,
139            true => QoS::AtLeastOnce,
140        }
141    }
142}
143
144#[async_trait]
145impl GmqQueue for MqttQueue {
146    fn name(&self) -> &str {
147        self.opts.name.as_str()
148    }
149
150    fn is_recv(&self) -> bool {
151        self.opts.is_recv
152    }
153
154    fn status(&self) -> Status {
155        *self.status.lock().unwrap()
156    }
157
158    fn set_handler(&mut self, handler: Arc<dyn EventHandler>) {
159        *self.handler.lock().unwrap() = Some(handler);
160    }
161
162    fn clear_handler(&mut self) {
163        let _ = (*self.handler.lock().unwrap()).take();
164    }
165
166    fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>) {
167        *self.msg_handler.lock().unwrap() = Some(handler);
168    }
169
170    fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
171        if self.opts.is_recv && self.msg_handler().is_none() {
172            return Err(Box::new(Error::NoMsgHandler));
173        }
174
175        {
176            let mut task_handle_mutex = self.ev_loop.lock().unwrap();
177            if (*task_handle_mutex).is_some() {
178                return Ok(());
179            }
180            *self.status.lock().unwrap() = Status::Connecting;
181            *task_handle_mutex = Some(create_event_loop(self));
182        }
183        Ok(())
184    }
185
186    async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
187        match { self.ev_loop.lock().unwrap().take() } {
188            None => return Ok(()),
189            Some(handle) => handle.abort(),
190        }
191        {
192            *self.status.lock().unwrap() = Status::Closing;
193        }
194
195        let raw_conn;
196        {
197            let mut conn = self.conn.lock().unwrap();
198            conn.remove_packet_handler(self.opts.name.as_str());
199            raw_conn = conn.get_raw_connection();
200        }
201
202        let mut result: Result<(), RumqttError> = Ok(());
203        if let Some(raw_conn) = raw_conn {
204            result = raw_conn.unsubscribe(self.topic()).await;
205        }
206
207        {
208            *self.status.lock().unwrap() = Status::Closed;
209        }
210        if let Some(handler) = { (*self.handler.lock().unwrap()).clone() } {
211            let queue = Arc::new(self.clone());
212            task::spawn(async move {
213                handler.on_status(queue, Status::Closed).await;
214            });
215        }
216
217        result?;
218        Ok(())
219    }
220
221    async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
222        if self.opts.is_recv {
223            return Err(Box::new(Error::QueueIsReceiver));
224        } else if self.status() != Status::Connected {
225            return Err(Box::new(Error::NotConnected));
226        }
227
228        let raw_conn = {
229            match self.conn.lock().unwrap().get_raw_connection() {
230                None => return Err(Box::new(Error::NotConnected)),
231                Some(raw_conn) => raw_conn,
232            }
233        };
234
235        raw_conn
236            .publish(self.topic(), self.qos(), false, payload)
237            .await?;
238        Ok(())
239    }
240}
241
242impl PacketHandler for MqttQueue {
243    fn on_publish(&self, packet: Publish) {
244        if let Some(handler) = self.msg_handler() {
245            let this = Arc::new(self.clone());
246            task::spawn(async move {
247                handler
248                    .on_message(this, Box::new(MqttMessage::new(packet)))
249                    .await;
250            });
251        }
252    }
253}
254
255impl Default for MqttQueueOptions {
256    fn default() -> Self {
257        MqttQueueOptions {
258            name: "".to_string(),
259            is_recv: false,
260            reliable: false,
261            broadcast: false,
262            reconnect_millis: DEF_RECONN_TIME_MS,
263            shared_prefix: None,
264        }
265    }
266}
267
268impl MqttMessage {
269    /// Create a message instance.
270    pub fn new(packet: Publish) -> Self {
271        MqttMessage { packet }
272    }
273}
274
275#[async_trait]
276impl Message for MqttMessage {
277    fn payload(&self) -> &[u8] {
278        self.packet.payload.as_ref()
279    }
280
281    async fn ack(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
282        Ok(())
283    }
284
285    async fn nack(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
286        Ok(())
287    }
288}
289
290/// To create an event loop runtime task.
291fn create_event_loop(queue: &MqttQueue) -> JoinHandle<()> {
292    let this = Arc::new(queue.clone());
293    task::spawn(async move {
294        loop {
295            match this.status() {
296                Status::Closing | Status::Closed => task::yield_now().await,
297                Status::Connecting => {
298                    if this.conn_status() != ConnStatus::Connected {
299                        time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
300                        continue;
301                    }
302
303                    if this.opts.is_recv {
304                        let raw_conn;
305                        {
306                            let mut conn = this.conn.lock().unwrap();
307                            conn.add_packet_handler(this.opts.name.as_str(), this.clone());
308                            raw_conn = conn.get_raw_connection();
309                        }
310                        if let Some(raw_conn) = raw_conn {
311                            if let Err(e) = raw_conn.subscribe(this.topic(), this.qos()).await {
312                                this.on_error(Box::new(e));
313                                time::sleep(Duration::from_millis(this.opts.reconnect_millis))
314                                    .await;
315                                continue;
316                            }
317                        } else {
318                            time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
319                            continue;
320                        }
321                    }
322
323                    {
324                        *this.status.lock().unwrap() = Status::Connected;
325                    }
326                    if let Some(handler) = this.handler() {
327                        let queue = this.clone();
328                        task::spawn(async move {
329                            handler.on_status(queue, Status::Connected).await;
330                        });
331                    }
332                }
333                Status::Connected => {
334                    time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
335                    if this.conn_status() != ConnStatus::Connected {
336                        if let Some(handler) = this.handler() {
337                            let queue = this.clone();
338                            task::spawn(async move {
339                                handler.on_status(queue, Status::Connecting).await;
340                            });
341                        }
342                        *this.status.lock().unwrap() = Status::Connecting;
343                    }
344                }
345                Status::Disconnected => {
346                    *this.status.lock().unwrap() = Status::Connecting;
347                }
348            }
349        }
350    })
351}