sylvia_iot_data/libs/mq/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
};

use url::Url;

use general_mq::{
    connection::GmqConnection,
    queue::{EventHandler, GmqQueue, MessageHandler},
    AmqpConnection, AmqpConnectionOptions, AmqpQueueOptions, MqttConnection, MqttConnectionOptions,
    MqttQueueOptions, Queue, QueueOptions,
};

pub mod broker;
pub mod coremgr;

use super::config::DataData as DataMqConfig;

/// The general connection type with reference counter for upper layer maintenance.
#[derive(Clone)]
pub enum Connection {
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
}

/// The default prefetch value for AMQP.
const DEF_PREFETCH: u16 = 100;

/// To create a reliable unicast queue to receive data messages.
fn new_data_queue(
    conn_pool: &mut HashMap<String, Connection>,
    config: &DataMqConfig,
    queue_name: &str,
    handler: Arc<dyn EventHandler>,
    msg_handler: Arc<dyn MessageHandler>,
) -> Result<Queue, String> {
    let host_uri = match config.url.as_ref() {
        None => return Err("host_uri empty".to_string()),
        Some(host_uri) => match Url::parse(host_uri) {
            Err(e) => return Err(format!("host_uri error: {}", e)),
            Ok(uri) => uri,
        },
    };
    let conn = get_connection(conn_pool, &host_uri)?;
    let mut queue = match conn {
        Connection::Amqp(conn, counter) => {
            let opts = QueueOptions::Amqp(
                AmqpQueueOptions {
                    name: queue_name.to_string(),
                    is_recv: true,
                    reliable: true,
                    broadcast: false,
                    prefetch: match config.prefetch {
                        None => DEF_PREFETCH,
                        Some(prefetch) => prefetch,
                    },
                    ..Default::default()
                },
                &conn,
            );
            {
                *counter.lock().unwrap() += 1;
            }
            Queue::new(opts)?
        }
        Connection::Mqtt(conn, counter) => {
            let opts = QueueOptions::Mqtt(
                MqttQueueOptions {
                    name: queue_name.to_string(),
                    is_recv: true,
                    reliable: true,
                    broadcast: false,
                    shared_prefix: config.shared_prefix.clone(),
                    ..Default::default()
                },
                &conn,
            );
            {
                *counter.lock().unwrap() += 1;
            }
            Queue::new(opts)?
        }
    };
    queue.set_handler(handler);
    queue.set_msg_handler(msg_handler);
    if let Err(e) = queue.connect() {
        return Err(e.to_string());
    }
    Ok(queue)
}

/// Utility function to get the message queue connection instance. A new connection will be created
/// if the host does not exist.
fn get_connection(
    conn_pool: &mut HashMap<String, Connection>,
    host_uri: &Url,
) -> Result<Connection, String> {
    let uri = host_uri.to_string();
    if let Some(conn) = conn_pool.get(&uri) {
        return Ok(conn.clone());
    }

    match host_uri.scheme() {
        "amqp" | "amqps" => {
            let opts = AmqpConnectionOptions {
                uri: host_uri.to_string(),
                ..Default::default()
            };
            let mut conn = AmqpConnection::new(opts)?;
            let _ = conn.connect();
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
            conn_pool.insert(uri, conn.clone());
            Ok(conn)
        }
        "mqtt" | "mqtts" => {
            let opts = MqttConnectionOptions {
                uri: host_uri.to_string(),
                ..Default::default()
            };
            let mut conn = MqttConnection::new(opts)?;
            let _ = conn.connect();
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
            conn_pool.insert(uri, conn.clone());
            Ok(conn)
        }
        s => Err(format!("unsupport scheme {}", s)),
    }
}