sylvia_iot_data/libs/mq/
mod.rs1use std::{
2 collections::HashMap,
3 sync::{Arc, Mutex},
4};
5
6use url::Url;
7
8use general_mq::{
9 connection::GmqConnection,
10 queue::{EventHandler, GmqQueue, MessageHandler},
11 AmqpConnection, AmqpConnectionOptions, AmqpQueueOptions, MqttConnection, MqttConnectionOptions,
12 MqttQueueOptions, Queue, QueueOptions,
13};
14
15pub mod broker;
16pub mod coremgr;
17
18use super::config::DataData as DataMqConfig;
19
20#[derive(Clone)]
22pub enum Connection {
23 Amqp(AmqpConnection, Arc<Mutex<isize>>),
24 Mqtt(MqttConnection, Arc<Mutex<isize>>),
25}
26
27const DEF_PREFETCH: u16 = 100;
29
30fn new_data_queue(
32 conn_pool: &mut HashMap<String, Connection>,
33 config: &DataMqConfig,
34 queue_name: &str,
35 handler: Arc<dyn EventHandler>,
36 msg_handler: Arc<dyn MessageHandler>,
37) -> Result<Queue, String> {
38 let host_uri = match config.url.as_ref() {
39 None => return Err("host_uri empty".to_string()),
40 Some(host_uri) => match Url::parse(host_uri) {
41 Err(e) => return Err(format!("host_uri error: {}", e)),
42 Ok(uri) => uri,
43 },
44 };
45 let conn = get_connection(conn_pool, &host_uri)?;
46 let mut queue = match conn {
47 Connection::Amqp(conn, counter) => {
48 let opts = QueueOptions::Amqp(
49 AmqpQueueOptions {
50 name: queue_name.to_string(),
51 is_recv: true,
52 reliable: true,
53 broadcast: false,
54 prefetch: match config.prefetch {
55 None => DEF_PREFETCH,
56 Some(prefetch) => prefetch,
57 },
58 ..Default::default()
59 },
60 &conn,
61 );
62 {
63 *counter.lock().unwrap() += 1;
64 }
65 Queue::new(opts)?
66 }
67 Connection::Mqtt(conn, counter) => {
68 let opts = QueueOptions::Mqtt(
69 MqttQueueOptions {
70 name: queue_name.to_string(),
71 is_recv: true,
72 reliable: true,
73 broadcast: false,
74 shared_prefix: config.shared_prefix.clone(),
75 ..Default::default()
76 },
77 &conn,
78 );
79 {
80 *counter.lock().unwrap() += 1;
81 }
82 Queue::new(opts)?
83 }
84 };
85 queue.set_handler(handler);
86 queue.set_msg_handler(msg_handler);
87 if let Err(e) = queue.connect() {
88 return Err(e.to_string());
89 }
90 Ok(queue)
91}
92
93fn get_connection(
96 conn_pool: &mut HashMap<String, Connection>,
97 host_uri: &Url,
98) -> Result<Connection, String> {
99 let uri = host_uri.to_string();
100 if let Some(conn) = conn_pool.get(&uri) {
101 return Ok(conn.clone());
102 }
103
104 match host_uri.scheme() {
105 "amqp" | "amqps" => {
106 let opts = AmqpConnectionOptions {
107 uri: host_uri.to_string(),
108 ..Default::default()
109 };
110 let mut conn = AmqpConnection::new(opts)?;
111 let _ = conn.connect();
112 let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
113 conn_pool.insert(uri, conn.clone());
114 Ok(conn)
115 }
116 "mqtt" | "mqtts" => {
117 let opts = MqttConnectionOptions {
118 uri: host_uri.to_string(),
119 ..Default::default()
120 };
121 let mut conn = MqttConnection::new(opts)?;
122 let _ = conn.connect();
123 let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
124 conn_pool.insert(uri, conn.clone());
125 Ok(conn)
126 }
127 s => Err(format!("unsupport scheme {}", s)),
128 }
129}