sylvia_iot_coremgr/libs/mq/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use std::{
    collections::HashMap,
    fmt,
    sync::{Arc, Mutex},
};

use url::Url;

use general_mq::{
    connection::GmqConnection, AmqpConnection, AmqpConnectionOptions, MqttConnection,
    MqttConnectionOptions,
};

pub mod data;
pub mod emqx;
pub mod rabbitmq;
pub mod rumqttd;

/// The general connection type with reference counter for upper layer maintenance.
#[derive(Clone)]
pub enum Connection {
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
}

/// Broker message queue type.
pub enum QueueType {
    Application,
    Network,
}

impl fmt::Display for QueueType {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            QueueType::Application => fmt.write_str("application"),
            QueueType::Network => fmt.write_str("network"),
        }
    }
}

impl Copy for QueueType {}

impl Clone for QueueType {
    fn clone(&self) -> QueueType {
        *self
    }
}

/// Transfer queue type, unit code, application/network code to AMQP virtual host name and queue
/// name.
pub fn to_username(q_type: QueueType, unit: &str, code: &str) -> String {
    format!("{}.{}.{}", q_type, unit_code(unit), code)
}

/// Unit code part for queue name.
fn unit_code(code: &str) -> &str {
    match code {
        "" => "_",
        _ => code,
    }
}

/// Utility function to get the message queue connection instance. A new connection will be created
/// if the host does not exist.
fn get_connection(
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
    host_uri: &Url,
) -> Result<Connection, String> {
    let uri = host_uri.to_string();
    {
        let mutex = conn_pool.lock().unwrap();
        if let Some(conn) = mutex.get(&uri) {
            return Ok(conn.clone());
        }
    }

    match host_uri.scheme() {
        "amqp" | "amqps" => {
            let opts = AmqpConnectionOptions {
                uri: host_uri.to_string(),
                ..Default::default()
            };
            let mut conn = AmqpConnection::new(opts)?;
            let _ = conn.connect();
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
            {
                conn_pool.lock().unwrap().insert(uri, conn.clone());
            }
            Ok(conn)
        }
        "mqtt" | "mqtts" => {
            let opts = MqttConnectionOptions {
                uri: host_uri.to_string(),
                ..Default::default()
            };
            let mut conn = MqttConnection::new(opts)?;
            let _ = conn.connect();
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
            {
                conn_pool.lock().unwrap().insert(uri, conn.clone());
            }
            Ok(conn)
        }
        s => Err(format!("unsupport scheme {}", s)),
    }
}