sylvia_iot_coremgr/libs/mq/
rumqttd.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::{
    collections::HashMap,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    thread::{self, JoinHandle as ThreadHandle},
};

use rumqttd::{
    Broker, Config, ConnectionSettings, ConsoleSettings, RouterConfig, ServerSettings, TlsConfig,
};

use sylvia_iot_corelib::server_config::Config as SylviaServerConfig;

use super::super::config::{
    Rumqttd, DEF_RUMQTTD_CONSOLE_PORT, DEF_RUMQTTD_MQTTS_PORT, DEF_RUMQTTD_MQTT_PORT,
};

/// To start a rumqttd broker.
pub fn start_rumqttd(
    server_conf: &SylviaServerConfig,
    rumqttd_conf: &Rumqttd,
) -> (ThreadHandle<()>, ThreadHandle<()>) {
    let mut console_setting = ConsoleSettings::default();
    console_setting.listen = match rumqttd_conf.console_port {
        None => format!("0.0.0.0:{}", DEF_RUMQTTD_CONSOLE_PORT),
        Some(port) => format!("0.0.0.0:{}", port),
    };
    let mut config = Config {
        router: RouterConfig {
            max_connections: 10000,
            max_outgoing_packet_count: 200,
            max_segment_size: 104857600,
            max_segment_count: 10,
            ..Default::default()
        },
        v4: Some(HashMap::new()),
        console: Some(console_setting),
        ..Default::default()
    };
    {
        if let Some(v4) = config.v4.as_mut() {
            v4.insert(
                "mqtt".to_string(),
                ServerSettings {
                    name: "mqtt".to_string(),
                    listen: match rumqttd_conf.mqtt_port {
                        None => SocketAddr::new(
                            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
                            DEF_RUMQTTD_MQTT_PORT,
                        ),
                        Some(port) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port),
                    },
                    tls: None,
                    next_connection_delay_ms: 1,
                    connections: ConnectionSettings {
                        connection_timeout_ms: 5000,
                        max_payload_size: 1 * 1024 * 1024,
                        max_inflight_count: 200,
                        auth: None,
                        external_auth: None,
                        dynamic_filters: true,
                    },
                },
            );
        }
    }
    if let Some(cacert_file) = server_conf.cacert_file.as_ref() {
        if let Some(cert_file) = server_conf.cert_file.as_ref() {
            if let Some(key_file) = server_conf.key_file.as_ref() {
                if let Some(v4) = config.v4.as_mut() {
                    v4.insert(
                        "mqtts".to_string(),
                        ServerSettings {
                            name: "mqtts".to_string(),
                            listen: match rumqttd_conf.mqtt_port {
                                None => SocketAddr::new(
                                    IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
                                    DEF_RUMQTTD_MQTTS_PORT,
                                ),
                                Some(port) => {
                                    SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port)
                                }
                            },
                            tls: Some(TlsConfig::Rustls {
                                capath: Some(cacert_file.clone()),
                                certpath: cert_file.clone(),
                                keypath: key_file.clone(),
                            }),
                            next_connection_delay_ms: 1,
                            connections: ConnectionSettings {
                                connection_timeout_ms: 5000,
                                max_payload_size: 1 * 1024 * 1024,
                                max_inflight_count: 200,
                                auth: None,
                                external_auth: None,
                                dynamic_filters: true,
                            },
                        },
                    );
                }
            }
        }
    }

    let mut broker = Broker::new(config);
    let (mut link_tx, mut link_rx) = broker.link("sylvia-iot-core").unwrap();
    let router_handle = thread::spawn(move || {
        let _ = broker.start();
    });
    let _ = link_tx.subscribe("#");
    let rx_handle = thread::spawn(move || loop {
        let _ = link_rx.id(); // XXX: add this line to prevent not ACK notifications.
        let _ = link_rx.recv().unwrap();
    });

    (router_handle, rx_handle)
}