sylvia_iot_broker/libs/mq/
mod.rs

1//! To management queues for applications and networks.
2//!
3//! For applications, the [`application::ApplicationMgr`] manages the following kind of queues:
4//! - uldata: uplink data from the broker to the application.
5//! - dldata: downlink data from the application to the broker.
6//! - dldata-resp: the response of downlink data.
7//! - dldata-result: the data process result from the network.
8//!
9//! For networks, the [`network::NetworkMgr`] manages the following kind of queues:
10//! - uldata: device uplink data from the network to the broker.
11//! - dldata: downlink data from the broker to the network.
12//! - dldata-result: the data process result from the network.
13
14use std::{
15    collections::HashMap,
16    error::Error as StdError,
17    sync::{Arc, Mutex},
18};
19
20use serde::{Deserialize, Serialize};
21use url::Url;
22
23use general_mq::{
24    connection::GmqConnection, queue::Status, AmqpConnection, AmqpConnectionOptions,
25    AmqpQueueOptions, MqttConnection, MqttConnectionOptions, MqttQueueOptions, Queue, QueueOptions,
26};
27
28pub mod application;
29pub mod control;
30pub mod data;
31pub mod network;
32
33/// The general connection type with reference counter for upper layer maintenance.
34#[derive(Clone)]
35pub enum Connection {
36    Amqp(AmqpConnection, Arc<Mutex<isize>>),
37    Mqtt(MqttConnection, Arc<Mutex<isize>>),
38}
39
40/// Manager status.
41#[derive(PartialEq)]
42pub enum MgrStatus {
43    /// One or more queues are not connected.
44    NotReady,
45    /// All queues are connected.
46    Ready,
47}
48
49/// Detail queue connection status.
50pub struct MgrMqStatus {
51    /// For `uldata`.
52    pub uldata: Status,
53    /// For `dldata`.
54    pub dldata: Status,
55    /// For `dldata-resp`.
56    pub dldata_resp: Status,
57    /// For `dldata-result`.
58    pub dldata_result: Status,
59    /// For `ctrl`.
60    pub ctrl: Status,
61}
62
63/// The options of the application/network manager.
64#[derive(Default, Deserialize, Serialize)]
65pub struct Options {
66    /// The associated unit ID of the application/network. Empty for public network.
67    #[serde(rename = "unitId")]
68    pub unit_id: String,
69    /// The associated unit code of the application/network. Empty for public network.
70    #[serde(rename = "unitCode")]
71    pub unit_code: String,
72    /// The associated application/network ID.
73    pub id: String,
74    /// The associated application/network code.
75    pub name: String,
76    /// AMQP prefetch option.
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub prefetch: Option<u16>,
79    pub persistent: bool,
80    /// MQTT shared queue prefix option.
81    #[serde(rename = "sharedPrefix", skip_serializing_if = "Option::is_none")]
82    pub shared_prefix: Option<String>,
83}
84
85/// Support application/network host schemes.
86pub const SUPPORT_SCHEMES: &'static [&'static str] = &["amqp", "amqps", "mqtt", "mqtts"];
87
88/// The default prefetch value for AMQP.
89const DEF_PREFETCH: u16 = 100;
90
91impl Copy for MgrStatus {}
92
93impl Clone for MgrStatus {
94    fn clone(&self) -> MgrStatus {
95        *self
96    }
97}
98
99/// Utility function to get the message queue connection instance. A new connection will be created
100/// if the host does not exist.
101fn get_connection(
102    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
103    host_uri: &Url,
104) -> Result<Connection, String> {
105    let uri = host_uri.to_string();
106    let mut mutex = conn_pool.lock().unwrap();
107    if let Some(conn) = mutex.get(&uri) {
108        return Ok(conn.clone());
109    }
110
111    match host_uri.scheme() {
112        "amqp" | "amqps" => {
113            let opts = AmqpConnectionOptions {
114                uri: host_uri.to_string(),
115                ..Default::default()
116            };
117            let mut conn = AmqpConnection::new(opts)?;
118            let _ = conn.connect();
119            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
120            mutex.insert(uri, conn.clone());
121            Ok(conn)
122        }
123        "mqtt" | "mqtts" => {
124            let opts = MqttConnectionOptions {
125                uri: host_uri.to_string(),
126                ..Default::default()
127            };
128            let mut conn = MqttConnection::new(opts)?;
129            let _ = conn.connect();
130            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
131            mutex.insert(uri, conn.clone());
132            Ok(conn)
133        }
134        s => Err(format!("unsupport scheme {}", s)),
135    }
136}
137
138/// Utility function to remove connection from the pool if the reference count meet zero.
139async fn remove_connection(
140    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
141    host_uri: &String,
142    count: isize,
143) -> Result<(), Box<dyn StdError + Send + Sync>> {
144    let conn = {
145        let mut mutex = conn_pool.lock().unwrap();
146        match mutex.get(host_uri) {
147            None => return Ok(()),
148            Some(conn) => match conn {
149                Connection::Amqp(_, counter) => {
150                    let mut mutex = counter.lock().unwrap();
151                    *mutex -= count;
152                    if *mutex > 0 {
153                        return Ok(());
154                    }
155                }
156                Connection::Mqtt(_, counter) => {
157                    let mut mutex = counter.lock().unwrap();
158                    *mutex -= count;
159                    if *mutex > 0 {
160                        return Ok(());
161                    }
162                }
163            },
164        }
165        mutex.remove(host_uri)
166    };
167    if let Some(conn) = conn {
168        match conn {
169            Connection::Amqp(mut conn, _) => {
170                conn.close().await?;
171            }
172            Connection::Mqtt(mut conn, _) => {
173                conn.close().await?;
174            }
175        }
176    }
177    Ok(())
178}
179
180/// The utility function for creating application/network control queue with the following name:
181/// - `[prefix].[unit].[code].ctrl`
182fn new_ctrl_queues(
183    conn: &Connection,
184    opts: &Options,
185    prefix: &str,
186) -> Result<Arc<Mutex<Queue>>, String> {
187    let ctrl: Arc<Mutex<Queue>>;
188
189    if opts.unit_id.len() == 0 {
190        if opts.unit_code.len() != 0 {
191            return Err("unit_id and unit_code must both empty or non-empty".to_string());
192        }
193    } else {
194        if opts.unit_code.len() == 0 {
195            return Err("unit_id and unit_code must both empty or non-empty".to_string());
196        }
197    }
198    if opts.id.len() == 0 {
199        return Err("`id` cannot be empty".to_string());
200    }
201    if opts.name.len() == 0 {
202        return Err("`name` cannot be empty".to_string());
203    }
204
205    let unit = match opts.unit_code.len() {
206        0 => "_",
207        _ => opts.unit_code.as_str(),
208    };
209
210    match conn {
211        Connection::Amqp(conn, _) => {
212            let prefetch = match opts.prefetch {
213                None => DEF_PREFETCH,
214                Some(prefetch) => match prefetch {
215                    0 => DEF_PREFETCH,
216                    _ => prefetch,
217                },
218            };
219
220            let ctrl_opts = QueueOptions::Amqp(
221                AmqpQueueOptions {
222                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
223                    is_recv: false,
224                    reliable: true,
225                    broadcast: false,
226                    prefetch,
227                    ..Default::default()
228                },
229                conn,
230            );
231            ctrl = Arc::new(Mutex::new(Queue::new(ctrl_opts)?));
232        }
233        Connection::Mqtt(conn, _) => {
234            let ctrl_opts = QueueOptions::Mqtt(
235                MqttQueueOptions {
236                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
237                    is_recv: false,
238                    reliable: true,
239                    broadcast: false,
240                    shared_prefix: opts.shared_prefix.clone(),
241                    ..Default::default()
242                },
243                conn,
244            );
245            ctrl = Arc::new(Mutex::new(Queue::new(ctrl_opts)?));
246        }
247    }
248
249    Ok(ctrl)
250}
251
252/// The utility function for creating application/network data queues. The return tuple contains:
253/// - `[prefix].[unit].[code].uldata`
254/// - `[prefix].[unit].[code].dldata`
255/// - `[prefix].[unit].[code].dldata-resp`: `Some` for applications and `None` for networks.
256/// - `[prefix].[unit].[code].dldata-result`
257fn new_data_queues(
258    conn: &Connection,
259    opts: &Options,
260    prefix: &str,
261    is_network: bool,
262) -> Result<
263    (
264        Arc<Mutex<Queue>>,
265        Arc<Mutex<Queue>>,
266        Option<Arc<Mutex<Queue>>>,
267        Arc<Mutex<Queue>>,
268    ),
269    String,
270> {
271    let uldata: Arc<Mutex<Queue>>;
272    let dldata: Arc<Mutex<Queue>>;
273    let dldata_resp: Option<Arc<Mutex<Queue>>>;
274    let dldata_result: Arc<Mutex<Queue>>;
275
276    if opts.unit_id.len() == 0 {
277        if opts.unit_code.len() != 0 {
278            return Err("unit_id and unit_code must both empty or non-empty".to_string());
279        }
280    } else {
281        if opts.unit_code.len() == 0 {
282            return Err("unit_id and unit_code must both empty or non-empty".to_string());
283        }
284    }
285    if opts.id.len() == 0 {
286        return Err("`id` cannot be empty".to_string());
287    }
288    if opts.name.len() == 0 {
289        return Err("`name` cannot be empty".to_string());
290    }
291
292    let unit = match opts.unit_code.len() {
293        0 => "_",
294        _ => opts.unit_code.as_str(),
295    };
296
297    match conn {
298        Connection::Amqp(conn, _) => {
299            let prefetch = match opts.prefetch {
300                None => DEF_PREFETCH,
301                Some(prefetch) => match prefetch {
302                    0 => DEF_PREFETCH,
303                    _ => prefetch,
304                },
305            };
306
307            let uldata_opts = QueueOptions::Amqp(
308                AmqpQueueOptions {
309                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
310                    is_recv: is_network,
311                    reliable: true,
312                    persistent: opts.persistent,
313                    broadcast: false,
314                    prefetch,
315                    ..Default::default()
316                },
317                conn,
318            );
319            let dldata_opts = QueueOptions::Amqp(
320                AmqpQueueOptions {
321                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
322                    is_recv: !is_network,
323                    reliable: true,
324                    persistent: opts.persistent,
325                    broadcast: false,
326                    prefetch,
327                    ..Default::default()
328                },
329                conn,
330            );
331            let dldata_resp_opts = QueueOptions::Amqp(
332                AmqpQueueOptions {
333                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
334                    is_recv: is_network,
335                    reliable: true,
336                    persistent: opts.persistent,
337                    broadcast: false,
338                    prefetch,
339                    ..Default::default()
340                },
341                conn,
342            );
343            let dldata_result_opts = QueueOptions::Amqp(
344                AmqpQueueOptions {
345                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
346                    is_recv: is_network,
347                    reliable: true,
348                    persistent: opts.persistent,
349                    broadcast: false,
350                    prefetch,
351                    ..Default::default()
352                },
353                conn,
354            );
355            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
356            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
357            dldata_resp = match is_network {
358                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
359                true => None,
360            };
361            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
362        }
363        Connection::Mqtt(conn, _) => {
364            let uldata_opts = QueueOptions::Mqtt(
365                MqttQueueOptions {
366                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
367                    is_recv: is_network,
368                    reliable: true,
369                    broadcast: false,
370                    shared_prefix: opts.shared_prefix.clone(),
371                    ..Default::default()
372                },
373                conn,
374            );
375            let dldata_opts = QueueOptions::Mqtt(
376                MqttQueueOptions {
377                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
378                    is_recv: !is_network,
379                    reliable: true,
380                    broadcast: false,
381                    shared_prefix: opts.shared_prefix.clone(),
382                    ..Default::default()
383                },
384                conn,
385            );
386            let dldata_resp_opts = QueueOptions::Mqtt(
387                MqttQueueOptions {
388                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
389                    is_recv: is_network,
390                    reliable: true,
391                    broadcast: false,
392                    shared_prefix: opts.shared_prefix.clone(),
393                    ..Default::default()
394                },
395                conn,
396            );
397            let dldata_result_opts = QueueOptions::Mqtt(
398                MqttQueueOptions {
399                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
400                    is_recv: is_network,
401                    reliable: true,
402                    broadcast: false,
403                    shared_prefix: opts.shared_prefix.clone(),
404                    ..Default::default()
405                },
406                conn,
407            );
408            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
409            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
410            dldata_resp = match is_network {
411                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
412                true => None,
413            };
414            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
415        }
416    }
417
418    Ok((uldata, dldata, dldata_resp, dldata_result))
419}