1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
};

use url::Url;

use general_mq::{
    connection::GmqConnection,
    queue::{EventHandler, GmqQueue},
    AmqpConnection, AmqpConnectionOptions, AmqpQueueOptions, MqttConnection, MqttConnectionOptions,
    MqttQueueOptions, Queue, QueueOptions,
};

pub mod broker;
pub mod coremgr;

use super::config::DataData as DataMqConfig;

/// The general connection type with reference counter for upper layer maintenance.
#[derive(Clone)]
pub enum Connection {
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
}

/// The default prefetch value for AMQP.
const DEF_PREFETCH: u16 = 100;

/// To create a reliable unicast queue to receive data messages.
fn new_data_queue(
    conn_pool: &mut HashMap<String, Connection>,
    config: &DataMqConfig,
    queue_name: &str,
    handler: Arc<dyn EventHandler>,
) -> Result<Queue, String> {
    let host_uri = match config.url.as_ref() {
        None => return Err("host_uri empty".to_string()),
        Some(host_uri) => match Url::parse(host_uri) {
            Err(e) => return Err(format!("host_uri error: {}", e)),
            Ok(uri) => uri,
        },
    };
    let conn = get_connection(conn_pool, &host_uri)?;
    let mut queue = match conn {
        Connection::Amqp(conn, counter) => {
            let opts = QueueOptions::Amqp(
                AmqpQueueOptions {
                    name: queue_name.to_string(),
                    is_recv: true,
                    reliable: true,
                    broadcast: false,
                    prefetch: match config.prefetch {
                        None => DEF_PREFETCH,
                        Some(prefetch) => prefetch,
                    },
                    ..Default::default()
                },
                &conn,
            );
            {
                *counter.lock().unwrap() += 1;
            }
            Queue::new(opts)?
        }
        Connection::Mqtt(conn, counter) => {
            let opts = QueueOptions::Mqtt(
                MqttQueueOptions {
                    name: queue_name.to_string(),
                    is_recv: true,
                    reliable: true,
                    broadcast: false,
                    shared_prefix: config.shared_prefix.clone(),
                    ..Default::default()
                },
                &conn,
            );
            {
                *counter.lock().unwrap() += 1;
            }
            Queue::new(opts)?
        }
    };
    queue.set_handler(handler);
    if let Err(e) = queue.connect() {
        return Err(e.to_string());
    }
    Ok(queue)
}

/// Utility function to get the message queue connection instance. A new connection will be created
/// if the host does not exist.
fn get_connection(
    conn_pool: &mut HashMap<String, Connection>,
    host_uri: &Url,
) -> Result<Connection, String> {
    let uri = host_uri.to_string();
    if let Some(conn) = conn_pool.get(&uri) {
        return Ok(conn.clone());
    }

    match host_uri.scheme() {
        "amqp" | "amqps" => {
            let opts = AmqpConnectionOptions {
                uri: host_uri.to_string(),
                ..Default::default()
            };
            let mut conn = AmqpConnection::new(opts)?;
            let _ = conn.connect();
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
            conn_pool.insert(uri, conn.clone());
            Ok(conn)
        }
        "mqtt" | "mqtts" => {
            let opts = MqttConnectionOptions {
                uri: host_uri.to_string(),
                ..Default::default()
            };
            let mut conn = MqttConnection::new(opts)?;
            let _ = conn.connect();
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
            conn_pool.insert(uri, conn.clone());
            Ok(conn)
        }
        s => Err(format!("unsupport scheme {}", s)),
    }
}