sylvia_iot_broker/libs/mq/
control.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, Mutex},
4};
5
6use url::Url;
7
8use general_mq::{
9    queue::{EventHandler, GmqQueue, MessageHandler},
10    AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
11};
12
13use super::{get_connection, Connection};
14
15const QUEUE_PREFIX: &'static str = "broker.ctrl";
16
17/// The default prefetch value for AMQP.
18const DEF_PREFETCH: u16 = 100;
19
20/// To create a broadcast queue for a function to send or receive control messages.
21pub fn new(
22    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
23    host_uri: &Url,
24    prefetch: Option<u16>,
25    func_name: &str,
26    is_recv: bool,
27    handler: Arc<dyn EventHandler>,
28    msg_handler: Arc<dyn MessageHandler>,
29) -> Result<Queue, String> {
30    if func_name.len() == 0 {
31        return Err("`func_name` cannot be empty for control queue".to_string());
32    }
33
34    let conn = get_connection(&conn_pool, host_uri)?;
35    let mut queue = match conn {
36        Connection::Amqp(conn, counter) => {
37            let prefetch = match prefetch {
38                None => DEF_PREFETCH,
39                Some(prefetch) => match prefetch {
40                    0 => DEF_PREFETCH,
41                    _ => prefetch,
42                },
43            };
44            let opts = QueueOptions::Amqp(
45                AmqpQueueOptions {
46                    name: format!("{}.{}", QUEUE_PREFIX, func_name),
47                    is_recv,
48                    reliable: true,
49                    broadcast: true,
50                    prefetch,
51                    ..Default::default()
52                },
53                &conn,
54            );
55            {
56                *counter.lock().unwrap() += 1;
57            }
58            Queue::new(opts)?
59        }
60        Connection::Mqtt(conn, counter) => {
61            let opts = QueueOptions::Mqtt(
62                MqttQueueOptions {
63                    name: format!("{}.{}", QUEUE_PREFIX, func_name),
64                    is_recv,
65                    reliable: true,
66                    broadcast: true,
67                    ..Default::default()
68                },
69                &conn,
70            );
71            {
72                *counter.lock().unwrap() += 1;
73            }
74            Queue::new(opts)?
75        }
76    };
77    queue.set_handler(handler);
78    if is_recv {
79        queue.set_msg_handler(msg_handler);
80    }
81    if let Err(e) = queue.connect() {
82        return Err(e.to_string());
83    }
84    Ok(queue)
85}