sylvia_iot_broker/libs/mq/
data.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, Mutex},
4};
5
6use url::Url;
7
8use general_mq::{
9    queue::{EventHandler, GmqQueue},
10    AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
11};
12
13use super::{get_connection, Connection};
14
15const QUEUE_NAME: &'static str = "broker.data";
16
17/// To create a reliable unicast queue to send data messages.
18pub fn new(
19    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
20    host_uri: &Url,
21    persistent: bool,
22    handler: Arc<dyn EventHandler>,
23) -> Result<Queue, String> {
24    let conn = get_connection(&conn_pool, host_uri)?;
25    let mut queue = match conn {
26        Connection::Amqp(conn, counter) => {
27            let opts = QueueOptions::Amqp(
28                AmqpQueueOptions {
29                    name: QUEUE_NAME.to_string(),
30                    is_recv: false,
31                    reliable: true,
32                    persistent,
33                    broadcast: false,
34                    ..Default::default()
35                },
36                &conn,
37            );
38            {
39                *counter.lock().unwrap() += 1;
40            }
41            Queue::new(opts)?
42        }
43        Connection::Mqtt(conn, counter) => {
44            let opts = QueueOptions::Mqtt(
45                MqttQueueOptions {
46                    name: QUEUE_NAME.to_string(),
47                    is_recv: false,
48                    reliable: true,
49                    broadcast: false,
50                    ..Default::default()
51                },
52                &conn,
53            );
54            {
55                *counter.lock().unwrap() += 1;
56            }
57            Queue::new(opts)?
58        }
59    };
60    queue.set_handler(handler);
61    if let Err(e) = queue.connect() {
62        return Err(e.to_string());
63    }
64    Ok(queue)
65}