sylvia_iot_broker/libs/mq/
control.rs1use std::{
2 collections::HashMap,
3 sync::{Arc, Mutex},
4};
5
6use url::Url;
7
8use general_mq::{
9 queue::{EventHandler, GmqQueue, MessageHandler},
10 AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
11};
12
13use super::{get_connection, Connection};
14
15const QUEUE_PREFIX: &'static str = "broker.ctrl";
16
17const DEF_PREFETCH: u16 = 100;
19
20pub fn new(
22 conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
23 host_uri: &Url,
24 prefetch: Option<u16>,
25 func_name: &str,
26 is_recv: bool,
27 handler: Arc<dyn EventHandler>,
28 msg_handler: Arc<dyn MessageHandler>,
29) -> Result<Queue, String> {
30 if func_name.len() == 0 {
31 return Err("`func_name` cannot be empty for control queue".to_string());
32 }
33
34 let conn = get_connection(&conn_pool, host_uri)?;
35 let mut queue = match conn {
36 Connection::Amqp(conn, counter) => {
37 let prefetch = match prefetch {
38 None => DEF_PREFETCH,
39 Some(prefetch) => match prefetch {
40 0 => DEF_PREFETCH,
41 _ => prefetch,
42 },
43 };
44 let opts = QueueOptions::Amqp(
45 AmqpQueueOptions {
46 name: format!("{}.{}", QUEUE_PREFIX, func_name),
47 is_recv,
48 reliable: true,
49 broadcast: true,
50 prefetch,
51 ..Default::default()
52 },
53 &conn,
54 );
55 {
56 *counter.lock().unwrap() += 1;
57 }
58 Queue::new(opts)?
59 }
60 Connection::Mqtt(conn, counter) => {
61 let opts = QueueOptions::Mqtt(
62 MqttQueueOptions {
63 name: format!("{}.{}", QUEUE_PREFIX, func_name),
64 is_recv,
65 reliable: true,
66 broadcast: true,
67 ..Default::default()
68 },
69 &conn,
70 );
71 {
72 *counter.lock().unwrap() += 1;
73 }
74 Queue::new(opts)?
75 }
76 };
77 queue.set_handler(handler);
78 if is_recv {
79 queue.set_msg_handler(msg_handler);
80 }
81 if let Err(e) = queue.connect() {
82 return Err(e.to_string());
83 }
84 Ok(queue)
85}