1use std::{
2 error::Error as StdError,
3 sync::{Arc, Mutex},
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use rumqttc::{ClientError as RumqttError, Publish, QoS};
9use tokio::{
10 task::{self, JoinHandle},
11 time,
12};
13
14use super::connection::{MqttConnection, PacketHandler};
15use crate::{
16 connection::{GmqConnection, Status as ConnStatus},
17 queue::{
18 name_validate, EventHandler, GmqQueue, Message, MessageHandler, Status, QUEUE_NAME_PATTERN,
19 },
20 Error,
21};
22
23#[derive(Clone)]
25pub struct MqttQueue {
26 opts: MqttQueueOptions,
28 conn: Arc<Mutex<MqttConnection>>,
30 status: Arc<Mutex<Status>>,
32 handler: Arc<Mutex<Option<Arc<dyn EventHandler>>>>,
34 msg_handler: Arc<Mutex<Option<Arc<dyn MessageHandler>>>>,
36 ev_loop: Arc<Mutex<Option<JoinHandle<()>>>>,
38}
39
40#[derive(Clone)]
42pub struct MqttQueueOptions {
43 pub name: String,
47 pub is_recv: bool,
49 pub reliable: bool,
51 pub broadcast: bool,
55 pub reconnect_millis: u64,
59 pub shared_prefix: Option<String>,
61}
62
63pub struct MqttMessage {
65 packet: Publish,
67}
68
69const DEF_RECONN_TIME_MS: u64 = 1000;
71
72impl MqttQueue {
73 pub fn new(opts: MqttQueueOptions, conn: &MqttConnection) -> Result<MqttQueue, String> {
75 let name = opts.name.as_str();
76 if name.len() == 0 {
77 return Err("queue name cannot be empty".to_string());
78 } else if !name_validate(name) {
79 return Err(format!(
80 "queue name {} is not match {}",
81 name, QUEUE_NAME_PATTERN
82 ));
83 }
84 let mut opts = opts;
85 if opts.reconnect_millis == 0 {
86 opts.reconnect_millis = DEF_RECONN_TIME_MS;
87 }
88
89 Ok(MqttQueue {
90 opts,
91 conn: Arc::new(Mutex::new(conn.clone())),
92 status: Arc::new(Mutex::new(Status::Closed)),
93 handler: Arc::new(Mutex::new(None)),
94 msg_handler: Arc::new(Mutex::new(None)),
95 ev_loop: Arc::new(Mutex::new(None)),
96 })
97 }
98
99 fn conn_status(&self) -> ConnStatus {
101 self.conn.lock().unwrap().status()
102 }
103
104 fn handler(&self) -> Option<Arc<dyn EventHandler>> {
106 self.handler.lock().unwrap().clone()
107 }
108
109 fn msg_handler(&self) -> Option<Arc<dyn MessageHandler>> {
111 self.msg_handler.lock().unwrap().clone()
112 }
113
114 fn on_error(&self, err: Box<dyn StdError + Send + Sync>) {
116 let handler = { (*self.handler.lock().unwrap()).clone() };
117 if let Some(handler) = handler {
118 let q = Arc::new(self.clone());
119 task::spawn(async move {
120 handler.on_error(q, err).await;
121 });
122 }
123 }
124
125 fn topic(&self) -> String {
127 if self.opts.is_recv && !self.opts.broadcast {
128 if let Some(prefix) = self.opts.shared_prefix.as_ref() {
129 return format!("{}{}", prefix.as_str(), self.opts.name.as_str());
130 }
131 }
132 self.opts.name.clone()
133 }
134
135 fn qos(&self) -> QoS {
137 match self.opts.reliable {
138 false => QoS::AtMostOnce,
139 true => QoS::AtLeastOnce,
140 }
141 }
142}
143
144#[async_trait]
145impl GmqQueue for MqttQueue {
146 fn name(&self) -> &str {
147 self.opts.name.as_str()
148 }
149
150 fn is_recv(&self) -> bool {
151 self.opts.is_recv
152 }
153
154 fn status(&self) -> Status {
155 *self.status.lock().unwrap()
156 }
157
158 fn set_handler(&mut self, handler: Arc<dyn EventHandler>) {
159 *self.handler.lock().unwrap() = Some(handler);
160 }
161
162 fn clear_handler(&mut self) {
163 let _ = (*self.handler.lock().unwrap()).take();
164 }
165
166 fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>) {
167 *self.msg_handler.lock().unwrap() = Some(handler);
168 }
169
170 fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
171 if self.opts.is_recv && self.msg_handler().is_none() {
172 return Err(Box::new(Error::NoMsgHandler));
173 }
174
175 {
176 let mut task_handle_mutex = self.ev_loop.lock().unwrap();
177 if (*task_handle_mutex).is_some() {
178 return Ok(());
179 }
180 *self.status.lock().unwrap() = Status::Connecting;
181 *task_handle_mutex = Some(create_event_loop(self));
182 }
183 Ok(())
184 }
185
186 async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
187 match { self.ev_loop.lock().unwrap().take() } {
188 None => return Ok(()),
189 Some(handle) => handle.abort(),
190 }
191 {
192 *self.status.lock().unwrap() = Status::Closing;
193 }
194
195 let raw_conn;
196 {
197 let mut conn = self.conn.lock().unwrap();
198 conn.remove_packet_handler(self.opts.name.as_str());
199 raw_conn = conn.get_raw_connection();
200 }
201
202 let mut result: Result<(), RumqttError> = Ok(());
203 if let Some(raw_conn) = raw_conn {
204 result = raw_conn.unsubscribe(self.topic()).await;
205 }
206
207 {
208 *self.status.lock().unwrap() = Status::Closed;
209 }
210 if let Some(handler) = { (*self.handler.lock().unwrap()).clone() } {
211 let queue = Arc::new(self.clone());
212 task::spawn(async move {
213 handler.on_status(queue, Status::Closed).await;
214 });
215 }
216
217 result?;
218 Ok(())
219 }
220
221 async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
222 if self.opts.is_recv {
223 return Err(Box::new(Error::QueueIsReceiver));
224 } else if self.status() != Status::Connected {
225 return Err(Box::new(Error::NotConnected));
226 }
227
228 let raw_conn = {
229 match self.conn.lock().unwrap().get_raw_connection() {
230 None => return Err(Box::new(Error::NotConnected)),
231 Some(raw_conn) => raw_conn,
232 }
233 };
234
235 raw_conn
236 .publish(self.topic(), self.qos(), false, payload)
237 .await?;
238 Ok(())
239 }
240}
241
242impl PacketHandler for MqttQueue {
243 fn on_publish(&self, packet: Publish) {
244 if let Some(handler) = self.msg_handler() {
245 let this = Arc::new(self.clone());
246 task::spawn(async move {
247 handler
248 .on_message(this, Box::new(MqttMessage::new(packet)))
249 .await;
250 });
251 }
252 }
253}
254
255impl Default for MqttQueueOptions {
256 fn default() -> Self {
257 MqttQueueOptions {
258 name: "".to_string(),
259 is_recv: false,
260 reliable: false,
261 broadcast: false,
262 reconnect_millis: DEF_RECONN_TIME_MS,
263 shared_prefix: None,
264 }
265 }
266}
267
268impl MqttMessage {
269 pub fn new(packet: Publish) -> Self {
271 MqttMessage { packet }
272 }
273}
274
275#[async_trait]
276impl Message for MqttMessage {
277 fn payload(&self) -> &[u8] {
278 self.packet.payload.as_ref()
279 }
280
281 async fn ack(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
282 Ok(())
283 }
284
285 async fn nack(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
286 Ok(())
287 }
288}
289
290fn create_event_loop(queue: &MqttQueue) -> JoinHandle<()> {
292 let this = Arc::new(queue.clone());
293 task::spawn(async move {
294 loop {
295 match this.status() {
296 Status::Closing | Status::Closed => task::yield_now().await,
297 Status::Connecting => {
298 if this.conn_status() != ConnStatus::Connected {
299 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
300 continue;
301 }
302
303 if this.opts.is_recv {
304 let raw_conn;
305 {
306 let mut conn = this.conn.lock().unwrap();
307 conn.add_packet_handler(this.opts.name.as_str(), this.clone());
308 raw_conn = conn.get_raw_connection();
309 }
310 if let Some(raw_conn) = raw_conn {
311 if let Err(e) = raw_conn.subscribe(this.topic(), this.qos()).await {
312 this.on_error(Box::new(e));
313 time::sleep(Duration::from_millis(this.opts.reconnect_millis))
314 .await;
315 continue;
316 }
317 } else {
318 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
319 continue;
320 }
321 }
322
323 {
324 *this.status.lock().unwrap() = Status::Connected;
325 }
326 if let Some(handler) = this.handler() {
327 let queue = this.clone();
328 task::spawn(async move {
329 handler.on_status(queue, Status::Connected).await;
330 });
331 }
332 }
333 Status::Connected => {
334 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
335 if this.conn_status() != ConnStatus::Connected {
336 if let Some(handler) = this.handler() {
337 let queue = this.clone();
338 task::spawn(async move {
339 handler.on_status(queue, Status::Connecting).await;
340 });
341 }
342 *this.status.lock().unwrap() = Status::Connecting;
343 }
344 }
345 Status::Disconnected => {
346 *this.status.lock().unwrap() = Status::Connecting;
347 }
348 }
349 }
350 })
351}