sylvia_iot_broker/libs/mq/
data.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
};

use url::Url;

use general_mq::{
    queue::{EventHandler, GmqQueue},
    AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
};

use super::{get_connection, Connection};

const QUEUE_NAME: &'static str = "broker.data";

/// To create a reliable unicast queue to send data messages.
pub fn new(
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
    host_uri: &Url,
    persistent: bool,
    handler: Arc<dyn EventHandler>,
) -> Result<Queue, String> {
    let conn = get_connection(&conn_pool, host_uri)?;
    let mut queue = match conn {
        Connection::Amqp(conn, counter) => {
            let opts = QueueOptions::Amqp(
                AmqpQueueOptions {
                    name: QUEUE_NAME.to_string(),
                    is_recv: false,
                    reliable: true,
                    persistent,
                    broadcast: false,
                    ..Default::default()
                },
                &conn,
            );
            {
                *counter.lock().unwrap() += 1;
            }
            Queue::new(opts)?
        }
        Connection::Mqtt(conn, counter) => {
            let opts = QueueOptions::Mqtt(
                MqttQueueOptions {
                    name: QUEUE_NAME.to_string(),
                    is_recv: false,
                    reliable: true,
                    broadcast: false,
                    ..Default::default()
                },
                &conn,
            );
            {
                *counter.lock().unwrap() += 1;
            }
            Queue::new(opts)?
        }
    };
    queue.set_handler(handler);
    if let Err(e) = queue.connect() {
        return Err(e.to_string());
    }
    Ok(queue)
}