sylvia_iot_data/libs/mq/
mod.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, Mutex},
4};
5
6use url::Url;
7
8use general_mq::{
9    connection::GmqConnection,
10    queue::{EventHandler, GmqQueue, MessageHandler},
11    AmqpConnection, AmqpConnectionOptions, AmqpQueueOptions, MqttConnection, MqttConnectionOptions,
12    MqttQueueOptions, Queue, QueueOptions,
13};
14
15pub mod broker;
16pub mod coremgr;
17
18use super::config::DataData as DataMqConfig;
19
20/// The general connection type with reference counter for upper layer maintenance.
21#[derive(Clone)]
22pub enum Connection {
23    Amqp(AmqpConnection, Arc<Mutex<isize>>),
24    Mqtt(MqttConnection, Arc<Mutex<isize>>),
25}
26
27/// The default prefetch value for AMQP.
28const DEF_PREFETCH: u16 = 100;
29
30/// To create a reliable unicast queue to receive data messages.
31fn new_data_queue(
32    conn_pool: &mut HashMap<String, Connection>,
33    config: &DataMqConfig,
34    queue_name: &str,
35    handler: Arc<dyn EventHandler>,
36    msg_handler: Arc<dyn MessageHandler>,
37) -> Result<Queue, String> {
38    let host_uri = match config.url.as_ref() {
39        None => return Err("host_uri empty".to_string()),
40        Some(host_uri) => match Url::parse(host_uri) {
41            Err(e) => return Err(format!("host_uri error: {}", e)),
42            Ok(uri) => uri,
43        },
44    };
45    let conn = get_connection(conn_pool, &host_uri)?;
46    let mut queue = match conn {
47        Connection::Amqp(conn, counter) => {
48            let opts = QueueOptions::Amqp(
49                AmqpQueueOptions {
50                    name: queue_name.to_string(),
51                    is_recv: true,
52                    reliable: true,
53                    broadcast: false,
54                    prefetch: match config.prefetch {
55                        None => DEF_PREFETCH,
56                        Some(prefetch) => prefetch,
57                    },
58                    ..Default::default()
59                },
60                &conn,
61            );
62            {
63                *counter.lock().unwrap() += 1;
64            }
65            Queue::new(opts)?
66        }
67        Connection::Mqtt(conn, counter) => {
68            let opts = QueueOptions::Mqtt(
69                MqttQueueOptions {
70                    name: queue_name.to_string(),
71                    is_recv: true,
72                    reliable: true,
73                    broadcast: false,
74                    shared_prefix: config.shared_prefix.clone(),
75                    ..Default::default()
76                },
77                &conn,
78            );
79            {
80                *counter.lock().unwrap() += 1;
81            }
82            Queue::new(opts)?
83        }
84    };
85    queue.set_handler(handler);
86    queue.set_msg_handler(msg_handler);
87    if let Err(e) = queue.connect() {
88        return Err(e.to_string());
89    }
90    Ok(queue)
91}
92
93/// Utility function to get the message queue connection instance. A new connection will be created
94/// if the host does not exist.
95fn get_connection(
96    conn_pool: &mut HashMap<String, Connection>,
97    host_uri: &Url,
98) -> Result<Connection, String> {
99    let uri = host_uri.to_string();
100    if let Some(conn) = conn_pool.get(&uri) {
101        return Ok(conn.clone());
102    }
103
104    match host_uri.scheme() {
105        "amqp" | "amqps" => {
106            let opts = AmqpConnectionOptions {
107                uri: host_uri.to_string(),
108                ..Default::default()
109            };
110            let mut conn = AmqpConnection::new(opts)?;
111            let _ = conn.connect();
112            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
113            conn_pool.insert(uri, conn.clone());
114            Ok(conn)
115        }
116        "mqtt" | "mqtts" => {
117            let opts = MqttConnectionOptions {
118                uri: host_uri.to_string(),
119                ..Default::default()
120            };
121            let mut conn = MqttConnection::new(opts)?;
122            let _ = conn.connect();
123            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
124            conn_pool.insert(uri, conn.clone());
125            Ok(conn)
126        }
127        s => Err(format!("unsupport scheme {}", s)),
128    }
129}