sylvia_iot_broker/libs/mq/
control.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
};

use url::Url;

use general_mq::{
    queue::{EventHandler, GmqQueue, MessageHandler},
    AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
};

use super::{get_connection, Connection};

const QUEUE_PREFIX: &'static str = "broker.ctrl";

/// The default prefetch value for AMQP.
const DEF_PREFETCH: u16 = 100;

/// To create a broadcast queue for a function to send or receive control messages.
pub fn new(
    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
    host_uri: &Url,
    prefetch: Option<u16>,
    func_name: &str,
    is_recv: bool,
    handler: Arc<dyn EventHandler>,
    msg_handler: Arc<dyn MessageHandler>,
) -> Result<Queue, String> {
    if func_name.len() == 0 {
        return Err("`func_name` cannot be empty for control queue".to_string());
    }

    let conn = get_connection(&conn_pool, host_uri)?;
    let mut queue = match conn {
        Connection::Amqp(conn, counter) => {
            let prefetch = match prefetch {
                None => DEF_PREFETCH,
                Some(prefetch) => match prefetch {
                    0 => DEF_PREFETCH,
                    _ => prefetch,
                },
            };
            let opts = QueueOptions::Amqp(
                AmqpQueueOptions {
                    name: format!("{}.{}", QUEUE_PREFIX, func_name),
                    is_recv,
                    reliable: true,
                    broadcast: true,
                    prefetch,
                    ..Default::default()
                },
                &conn,
            );
            {
                *counter.lock().unwrap() += 1;
            }
            Queue::new(opts)?
        }
        Connection::Mqtt(conn, counter) => {
            let opts = QueueOptions::Mqtt(
                MqttQueueOptions {
                    name: format!("{}.{}", QUEUE_PREFIX, func_name),
                    is_recv,
                    reliable: true,
                    broadcast: true,
                    ..Default::default()
                },
                &conn,
            );
            {
                *counter.lock().unwrap() += 1;
            }
            Queue::new(opts)?
        }
    };
    queue.set_handler(handler);
    if is_recv {
        queue.set_msg_handler(msg_handler);
    }
    if let Err(e) = queue.connect() {
        return Err(e.to_string());
    }
    Ok(queue)
}