sylvia_iot_broker/libs/mq/
network.rs

1use std::{
2    cmp::Ordering,
3    collections::HashMap,
4    error::Error as StdError,
5    sync::{Arc, Mutex},
6};
7
8use async_trait::async_trait;
9use chrono::DateTime;
10use hex;
11use log::{error, warn};
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value};
14use tokio::task;
15use url::Url;
16
17use general_mq::{
18    queue::{
19        EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus,
20    },
21    Queue,
22};
23use sylvia_iot_corelib::strings;
24
25use super::{
26    get_connection, new_ctrl_queues, new_data_queues, remove_connection, Connection, MgrMqStatus,
27    MgrStatus, Options,
28};
29
30/// Uplink data from network to broker.
31#[derive(Deserialize)]
32pub struct UlData {
33    pub time: String,
34    #[serde(rename = "networkAddr")]
35    pub network_addr: String,
36    pub data: String,
37    pub extension: Option<Map<String, Value>>,
38}
39
40/// Downlink data from broker to network.
41#[derive(Serialize)]
42pub struct DlData {
43    #[serde(rename = "dataId")]
44    pub data_id: String,
45    #[serde(rename = "pub")]
46    pub publish: String,
47    #[serde(rename = "expiresIn")]
48    pub expires_in: i64,
49    #[serde(rename = "networkAddr")]
50    pub network_addr: String,
51    pub data: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub extension: Option<Map<String, Value>>,
54}
55
56/// Downlink data result when processing or completing data transfer to the device.
57#[derive(Deserialize)]
58pub struct DlDataResult {
59    #[serde(rename = "dataId")]
60    pub data_id: String,
61    pub status: i32,
62    pub message: Option<String>,
63}
64
65/// The manager for network queues.
66#[derive(Clone)]
67pub struct NetworkMgr {
68    opts: Arc<Options>,
69
70    // Information for delete connection automatically.
71    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
72    host_uri: String,
73
74    uldata: Arc<Mutex<Queue>>,
75    dldata: Arc<Mutex<Queue>>,
76    dldata_result: Arc<Mutex<Queue>>,
77    ctrl: Arc<Mutex<Queue>>,
78
79    status: Arc<Mutex<MgrStatus>>,
80    handler: Arc<Mutex<Arc<dyn EventHandler>>>,
81}
82
83/// Event handler trait for the [`NetworkMgr`].
84#[async_trait]
85pub trait EventHandler: Send + Sync {
86    async fn on_status_change(&self, mgr: &NetworkMgr, status: MgrStatus);
87
88    async fn on_uldata(&self, mgr: &NetworkMgr, data: Box<UlData>) -> Result<(), ()>;
89    async fn on_dldata_result(&self, mgr: &NetworkMgr, data: Box<DlDataResult>) -> Result<(), ()>;
90}
91
92/// The event handler for [`general_mq::queue::GmqQueue`].
93struct MgrMqEventHandler {
94    mgr: NetworkMgr,
95}
96
97const QUEUE_PREFIX: &'static str = "broker.network";
98
99impl NetworkMgr {
100    /// To create a manager instance.
101    pub fn new(
102        conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
103        host_uri: &Url,
104        opts: Options,
105        handler: Arc<dyn EventHandler>,
106    ) -> Result<Self, String> {
107        let conn = get_connection(&conn_pool, host_uri)?;
108
109        let (uldata, dldata, _, dldata_result) = new_data_queues(&conn, &opts, QUEUE_PREFIX, true)?;
110        let ctrl = new_ctrl_queues(&conn, &opts, QUEUE_PREFIX)?;
111
112        let mgr = NetworkMgr {
113            opts: Arc::new(opts),
114            conn_pool,
115            host_uri: host_uri.to_string(),
116            uldata,
117            dldata,
118            dldata_result,
119            ctrl,
120            status: Arc::new(Mutex::new(MgrStatus::NotReady)),
121            handler: Arc::new(Mutex::new(handler)),
122        };
123        let mq_handler = Arc::new(MgrMqEventHandler { mgr: mgr.clone() });
124        let mut q = { mgr.uldata.lock().unwrap().clone() };
125        q.set_handler(mq_handler.clone());
126        q.set_msg_handler(mq_handler.clone());
127        if let Err(e) = q.connect() {
128            return Err(e.to_string());
129        }
130        let mut q = { mgr.dldata.lock().unwrap().clone() };
131        q.set_handler(mq_handler.clone());
132        if let Err(e) = q.connect() {
133            return Err(e.to_string());
134        }
135        let mut q = { mgr.dldata_result.lock().unwrap().clone() };
136        q.set_handler(mq_handler.clone());
137        q.set_msg_handler(mq_handler.clone());
138        if let Err(e) = q.connect() {
139            return Err(e.to_string());
140        }
141        let mut q = { mgr.ctrl.lock().unwrap().clone() };
142        q.set_handler(mq_handler.clone());
143        if let Err(e) = q.connect() {
144            return Err(e.to_string());
145        }
146        match conn {
147            Connection::Amqp(_, counter) => {
148                *counter.lock().unwrap() += 4;
149            }
150            Connection::Mqtt(_, counter) => {
151                *counter.lock().unwrap() += 4;
152            }
153        }
154        Ok(mgr)
155    }
156
157    /// The associated unit ID of the network.
158    pub fn unit_id(&self) -> &str {
159        self.opts.unit_id.as_str()
160    }
161
162    /// The associated unit code of the network.
163    pub fn unit_code(&self) -> &str {
164        self.opts.unit_code.as_str()
165    }
166
167    /// The network ID.
168    pub fn id(&self) -> &str {
169        self.opts.id.as_str()
170    }
171
172    /// The network code.
173    pub fn name(&self) -> &str {
174        self.opts.name.as_str()
175    }
176
177    /// Manager status.
178    pub fn status(&self) -> MgrStatus {
179        *self.status.lock().unwrap()
180    }
181
182    /// Detail status of each message queue. Please ignore `dldata_resp`.
183    pub fn mq_status(&self) -> MgrMqStatus {
184        MgrMqStatus {
185            uldata: { self.uldata.lock().unwrap().status() },
186            dldata: { self.dldata.lock().unwrap().status() },
187            dldata_resp: QueueStatus::Closed,
188            dldata_result: { self.dldata_result.lock().unwrap().status() },
189            ctrl: { self.ctrl.lock().unwrap().status() },
190        }
191    }
192
193    /// To close the manager queues.
194    pub async fn close(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
195        let mut q = { self.uldata.lock().unwrap().clone() };
196        q.close().await?;
197        let mut q = { self.dldata.lock().unwrap().clone() };
198        q.close().await?;
199        let mut q = { self.dldata_result.lock().unwrap().clone() };
200        q.close().await?;
201        let mut q = { self.ctrl.lock().unwrap().clone() };
202        q.close().await?;
203
204        remove_connection(&self.conn_pool, &self.host_uri, 4).await
205    }
206
207    /// Send downlink data to the network.
208    pub fn send_dldata(&self, data: &DlData) -> Result<(), Box<dyn StdError>> {
209        let payload = serde_json::to_vec(data)?;
210        let queue = { (*self.dldata.lock().unwrap()).clone() };
211        task::spawn(async move {
212            let _ = queue.send_msg(payload).await;
213        });
214        Ok(())
215    }
216
217    /// Send control data to the network.
218    pub async fn send_ctrl(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
219        let queue = { (*self.ctrl.lock().unwrap()).clone() };
220        queue.send_msg(payload).await
221    }
222}
223
224#[async_trait]
225impl QueueEventHandler for MgrMqEventHandler {
226    async fn on_error(&self, _queue: Arc<dyn GmqQueue>, _err: Box<dyn StdError + Send + Sync>) {}
227
228    async fn on_status(&self, _queue: Arc<dyn GmqQueue>, _status: QueueStatus) {
229        let uldata_status = { self.mgr.uldata.lock().unwrap().status() };
230        let dldata_status = { self.mgr.dldata.lock().unwrap().status() };
231        let dldata_result_status = { self.mgr.dldata_result.lock().unwrap().status() };
232        let ctrl_status = { self.mgr.ctrl.lock().unwrap().status() };
233
234        let status = match uldata_status == QueueStatus::Connected
235            && dldata_status == QueueStatus::Connected
236            && dldata_result_status == QueueStatus::Connected
237            && ctrl_status == QueueStatus::Connected
238        {
239            false => MgrStatus::NotReady,
240            true => MgrStatus::Ready,
241        };
242
243        let mut changed = false;
244        {
245            let mut mutex = self.mgr.status.lock().unwrap();
246            if *mutex != status {
247                *mutex = status;
248                changed = true;
249            }
250        }
251        if changed {
252            let handler = { self.mgr.handler.lock().unwrap().clone() };
253            handler.on_status_change(&self.mgr, status).await;
254        }
255    }
256}
257
258#[async_trait]
259impl MessageHandler for MgrMqEventHandler {
260    // Validate and decode data.
261    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
262        const FN_NAME: &'static str = "NetworkMgr.on_message";
263
264        let queue_name = queue.name();
265        if queue_name.cmp(self.mgr.uldata.lock().unwrap().name()) == Ordering::Equal {
266            let data = match serde_json::from_slice::<UlData>(msg.payload()) {
267                Err(_) => {
268                    warn!("[{}] invalid format from {}", FN_NAME, queue_name);
269                    if let Err(e) = msg.ack().await {
270                        error!("[{}] ACK message error: {}", FN_NAME, e);
271                    }
272                    return;
273                }
274                Ok(mut data) => {
275                    let time = match DateTime::parse_from_rfc3339(data.time.as_str()) {
276                        Err(e) => {
277                            warn!(
278                                "[{}] invalid time format from {}: {}",
279                                FN_NAME, queue_name, e
280                            );
281                            if let Err(e) = msg.ack().await {
282                                error!("[{}] ACK message error: {}", FN_NAME, e);
283                            }
284                            return;
285                        }
286                        Ok(time) => time.into(),
287                    };
288                    data.time = strings::time_str(&time);
289                    if data.network_addr.len() == 0 {
290                        warn!(
291                            "[{}] invalid network_addr format from {}",
292                            FN_NAME, queue_name,
293                        );
294                        if let Err(e) = msg.ack().await {
295                            error!("[{}] ACK message error: {}", FN_NAME, e);
296                        }
297                        return;
298                    }
299                    data.network_addr = data.network_addr.to_lowercase();
300                    if data.data.len() > 0 {
301                        if let Err(_) = hex::decode(data.data.as_str()) {
302                            warn!("[{}] invalid data format from {}", FN_NAME, queue_name);
303                            if let Err(e) = msg.ack().await {
304                                error!("[{}] ACK message error: {}", FN_NAME, e);
305                            }
306                            return;
307                        }
308                        data.data = data.data.to_lowercase();
309                    }
310                    data
311                }
312            };
313            let handler = { self.mgr.handler.lock().unwrap().clone() };
314            match handler.on_uldata(&self.mgr, Box::new(data)).await {
315                Err(_) => {
316                    if let Err(e) = msg.nack().await {
317                        error!("[{}] NACK message error: {}", FN_NAME, e);
318                    }
319                }
320                Ok(_) => {
321                    if let Err(e) = msg.ack().await {
322                        error!("[{}] ACK message error: {}", FN_NAME, e);
323                    }
324                }
325            }
326        } else if queue_name.cmp(self.mgr.dldata_result.lock().unwrap().name()) == Ordering::Equal {
327            let data = match serde_json::from_slice::<DlDataResult>(msg.payload()) {
328                Err(_) => {
329                    warn!("[{}] invalid format from {}", FN_NAME, queue_name);
330                    if let Err(e) = msg.ack().await {
331                        error!("[{}] ACK message error: {}", FN_NAME, e);
332                    }
333                    return;
334                }
335                Ok(data) => {
336                    if data.data_id.len() == 0 {
337                        warn!("[{}] invalid data_id format from {}", FN_NAME, queue_name);
338                        if let Err(e) = msg.ack().await {
339                            error!("[{}] ACK message error: {}", FN_NAME, e);
340                        }
341                        return;
342                    }
343                    if let Some(message) = data.message.as_ref() {
344                        if message.len() == 0 {
345                            warn!("[{}] invalid message format from {}", FN_NAME, queue_name);
346                            if let Err(e) = msg.ack().await {
347                                error!("[{}] ACK message error: {}", FN_NAME, e);
348                            }
349                            return;
350                        }
351                    }
352                    data
353                }
354            };
355            let handler = { self.mgr.handler.lock().unwrap().clone() };
356            match handler.on_dldata_result(&self.mgr, Box::new(data)).await {
357                Err(_) => {
358                    if let Err(e) = msg.nack().await {
359                        error!("[{}] NACK message error: {}", FN_NAME, e);
360                    }
361                }
362                Ok(_) => {
363                    if let Err(e) = msg.ack().await {
364                        error!("[{}] ACK message error: {}", FN_NAME, e);
365                    }
366                }
367            }
368        }
369    }
370}