sylvia_iot_broker/libs/mq/
application.rs

1use std::{
2    cmp::Ordering,
3    collections::HashMap,
4    error::Error as StdError,
5    sync::{Arc, Mutex},
6};
7
8use async_trait::async_trait;
9use hex;
10use log::{error, warn};
11use serde::{Deserialize, Serialize};
12use serde_json::{self, Map, Value};
13use tokio::task;
14use url::Url;
15
16use general_mq::{
17    queue::{
18        EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus,
19    },
20    Queue,
21};
22use sylvia_iot_corelib::{err, strings};
23
24use super::{
25    get_connection, new_data_queues, remove_connection, Connection, MgrMqStatus, MgrStatus, Options,
26};
27
28/// Uplink data from broker to application.
29#[derive(Serialize)]
30pub struct UlData {
31    #[serde(rename = "dataId")]
32    pub data_id: String,
33    pub time: String,
34    #[serde(rename = "pub")]
35    pub publish: String,
36    #[serde(rename = "deviceId")]
37    pub device_id: String,
38    #[serde(rename = "networkId")]
39    pub network_id: String,
40    #[serde(rename = "networkCode")]
41    pub network_code: String,
42    #[serde(rename = "networkAddr")]
43    pub network_addr: String,
44    #[serde(rename = "isPublic")]
45    pub is_public: bool,
46    pub profile: String,
47    pub data: String,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub extension: Option<Map<String, Value>>,
50}
51
52/// Downlink data from application to broker.
53#[derive(Deserialize)]
54pub struct DlData {
55    #[serde(rename = "correlationId")]
56    pub correlation_id: String,
57    #[serde(rename = "deviceId")]
58    pub device_id: Option<String>,
59    #[serde(rename = "networkCode")]
60    pub network_code: Option<String>,
61    #[serde(rename = "networkAddr")]
62    pub network_addr: Option<String>,
63    pub data: String,
64    pub extension: Option<Map<String, Value>>,
65}
66
67/// Downlink data response for [`DlData`].
68#[derive(Default, Serialize)]
69pub struct DlDataResp {
70    #[serde(rename = "correlationId")]
71    pub correlation_id: String,
72    #[serde(rename = "dataId", skip_serializing_if = "Option::is_none")]
73    pub data_id: Option<String>,
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub error: Option<String>,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub message: Option<String>,
78}
79
80/// Downlink data result when processing or completing data transfer to the device.
81#[derive(Serialize)]
82pub struct DlDataResult {
83    #[serde(rename = "dataId")]
84    pub data_id: String,
85    pub status: i32,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub message: Option<String>,
88}
89
90/// The manager for application queues.
91#[derive(Clone)]
92pub struct ApplicationMgr {
93    opts: Arc<Options>,
94
95    // Information for delete connection automatically.
96    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
97    host_uri: String,
98
99    uldata: Arc<Mutex<Queue>>,
100    dldata: Arc<Mutex<Queue>>,
101    dldata_resp: Arc<Mutex<Queue>>,
102    dldata_result: Arc<Mutex<Queue>>,
103
104    status: Arc<Mutex<MgrStatus>>,
105    handler: Arc<Mutex<Arc<dyn EventHandler>>>,
106}
107
108/// Event handler trait for the [`ApplicationMgr`].
109#[async_trait]
110pub trait EventHandler: Send + Sync {
111    async fn on_status_change(&self, mgr: &ApplicationMgr, status: MgrStatus);
112
113    async fn on_dldata(
114        &self,
115        mgr: &ApplicationMgr,
116        data: Box<DlData>,
117    ) -> Result<Box<DlDataResp>, ()>;
118}
119
120/// The event handler for [`general_mq::queue::GmqQueue`].
121struct MgrMqEventHandler {
122    mgr: ApplicationMgr,
123}
124
125const QUEUE_PREFIX: &'static str = "broker.application";
126
127impl ApplicationMgr {
128    /// To create a manager instance.
129    pub fn new(
130        conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
131        host_uri: &Url,
132        opts: Options,
133        handler: Arc<dyn EventHandler>,
134    ) -> Result<Self, String> {
135        if opts.unit_id.len() == 0 {
136            return Err("`unit_id` cannot be empty for application".to_string());
137        }
138
139        let conn = get_connection(&conn_pool, host_uri)?;
140
141        let (uldata, dldata, dldata_resp, dldata_result) =
142            new_data_queues(&conn, &opts, QUEUE_PREFIX, false)?;
143
144        let mgr = ApplicationMgr {
145            opts: Arc::new(opts),
146            conn_pool,
147            host_uri: host_uri.to_string(),
148            uldata,
149            dldata,
150            dldata_resp: dldata_resp.unwrap(),
151            dldata_result,
152            status: Arc::new(Mutex::new(MgrStatus::NotReady)),
153            handler: Arc::new(Mutex::new(handler)),
154        };
155        let mq_handler = Arc::new(MgrMqEventHandler { mgr: mgr.clone() });
156        let mut q = { mgr.uldata.lock().unwrap().clone() };
157        q.set_handler(mq_handler.clone());
158        if let Err(e) = q.connect() {
159            return Err(e.to_string());
160        }
161        let mut q = { mgr.dldata.lock().unwrap().clone() };
162        q.set_handler(mq_handler.clone());
163        q.set_msg_handler(mq_handler.clone());
164        if let Err(e) = q.connect() {
165            return Err(e.to_string());
166        }
167        let mut q = { mgr.dldata_resp.lock().unwrap().clone() };
168        q.set_handler(mq_handler.clone());
169        if let Err(e) = q.connect() {
170            return Err(e.to_string());
171        }
172        let mut q = { mgr.dldata_result.lock().unwrap().clone() };
173        q.set_handler(mq_handler.clone());
174        if let Err(e) = q.connect() {
175            return Err(e.to_string());
176        }
177        match conn {
178            Connection::Amqp(_, counter) => {
179                *counter.lock().unwrap() += 4;
180            }
181            Connection::Mqtt(_, counter) => {
182                *counter.lock().unwrap() += 4;
183            }
184        }
185        Ok(mgr)
186    }
187
188    /// The associated unit ID of the application.
189    pub fn unit_id(&self) -> &str {
190        self.opts.unit_id.as_str()
191    }
192
193    /// The associated unit code of the application.
194    pub fn unit_code(&self) -> &str {
195        self.opts.unit_code.as_str()
196    }
197
198    /// The application ID.
199    pub fn id(&self) -> &str {
200        self.opts.id.as_str()
201    }
202
203    /// The application code.
204    pub fn name(&self) -> &str {
205        self.opts.name.as_str()
206    }
207
208    /// Manager status.
209    pub fn status(&self) -> MgrStatus {
210        *self.status.lock().unwrap()
211    }
212
213    /// Detail status of each message queue.
214    pub fn mq_status(&self) -> MgrMqStatus {
215        MgrMqStatus {
216            uldata: { self.uldata.lock().unwrap().status() },
217            dldata: { self.dldata.lock().unwrap().status() },
218            dldata_resp: { self.dldata_resp.lock().unwrap().status() },
219            dldata_result: { self.dldata_result.lock().unwrap().status() },
220            ctrl: QueueStatus::Closed,
221        }
222    }
223
224    /// To close the manager queues.
225    pub async fn close(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
226        let mut q = { self.uldata.lock().unwrap().clone() };
227        q.close().await?;
228        let mut q = { self.dldata.lock().unwrap().clone() };
229        q.close().await?;
230        let mut q = { self.dldata_resp.lock().unwrap().clone() };
231        q.close().await?;
232        let mut q = { self.dldata_result.lock().unwrap().clone() };
233        q.close().await?;
234
235        remove_connection(&self.conn_pool, &self.host_uri, 4).await
236    }
237
238    /// Send uplink data to the application.
239    pub fn send_uldata(&self, data: &UlData) -> Result<(), Box<dyn StdError>> {
240        let payload = serde_json::to_vec(data)?;
241        let queue = { (*self.uldata.lock().unwrap()).clone() };
242        task::spawn(async move {
243            let _ = queue.send_msg(payload).await;
244        });
245        Ok(())
246    }
247
248    /// Send downlink response for a downlink data to the application.
249    pub async fn send_dldata_resp(
250        &self,
251        data: &DlDataResp,
252    ) -> Result<(), Box<dyn StdError + Send + Sync>> {
253        let payload = serde_json::to_vec(data)?;
254        let queue = { (*self.dldata_resp.lock().unwrap()).clone() };
255        queue.send_msg(payload).await
256    }
257
258    /// Send the downlink data process result to the application.
259    pub async fn send_dldata_result(
260        &self,
261        data: &DlDataResult,
262    ) -> Result<(), Box<dyn StdError + Send + Sync>> {
263        let payload = serde_json::to_vec(data)?;
264        let queue = { (*self.dldata_result.lock().unwrap()).clone() };
265        queue.send_msg(payload).await
266    }
267}
268
269#[async_trait]
270impl QueueEventHandler for MgrMqEventHandler {
271    async fn on_error(&self, _queue: Arc<dyn GmqQueue>, _err: Box<dyn StdError + Send + Sync>) {}
272
273    async fn on_status(&self, _queue: Arc<dyn GmqQueue>, _status: QueueStatus) {
274        let status = match { self.mgr.uldata.lock().unwrap().status() } == QueueStatus::Connected
275            && { self.mgr.dldata.lock().unwrap().status() } == QueueStatus::Connected
276            && { self.mgr.dldata_resp.lock().unwrap().status() } == QueueStatus::Connected
277            && { self.mgr.dldata_result.lock().unwrap().status() } == QueueStatus::Connected
278        {
279            false => MgrStatus::NotReady,
280            true => MgrStatus::Ready,
281        };
282
283        let mut changed = false;
284        {
285            let mut mutex = self.mgr.status.lock().unwrap();
286            if *mutex != status {
287                *mutex = status;
288                changed = true;
289            }
290        }
291        if changed {
292            let handler = { self.mgr.handler.lock().unwrap().clone() };
293            handler.on_status_change(&self.mgr, status).await;
294        }
295    }
296}
297
298#[async_trait]
299impl MessageHandler for MgrMqEventHandler {
300    // Validate and decode data.
301    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
302        const FN_NAME: &'static str = "ApplicationMgr.on_message";
303
304        let queue_name = queue.name();
305        if queue_name.cmp(self.mgr.dldata.lock().unwrap().name()) == Ordering::Equal {
306            let data = match parse_dldata_msg(msg.payload()) {
307                Err(resp) => {
308                    warn!("[{}] invalid format from {}", FN_NAME, queue_name);
309                    if let Err(e) = msg.ack().await {
310                        error!("[{}] ACK message error: {}", FN_NAME, e);
311                    }
312                    if let Err(e) = self.mgr.send_dldata_resp(&resp).await {
313                        error!("[{}] send response error: {}", FN_NAME, e);
314                    }
315                    return;
316                }
317                Ok(data) => data,
318            };
319            let handler = { self.mgr.handler.lock().unwrap().clone() };
320            match handler.on_dldata(&self.mgr, Box::new(data)).await {
321                Err(_) => {
322                    if let Err(e) = msg.nack().await {
323                        error!("[{}] NACK message error: {}", FN_NAME, e);
324                    }
325                }
326                Ok(resp) => {
327                    if let Err(e) = msg.ack().await {
328                        error!("[{}] ACK message error: {}", FN_NAME, e);
329                    }
330                    if let Err(e) = self.mgr.send_dldata_resp(resp.as_ref()).await {
331                        error!("[{}] send response error: {}", FN_NAME, e);
332                    }
333                }
334            }
335        }
336    }
337}
338
339/// Parses downlink data from the application and responds error for wrong format data.
340fn parse_dldata_msg(msg: &[u8]) -> Result<DlData, DlDataResp> {
341    let mut data = match serde_json::from_slice::<DlData>(msg) {
342        Err(_) => {
343            return Err(DlDataResp {
344                correlation_id: "".to_string(),
345                error: Some(err::E_PARAM.to_string()),
346                message: Some("invalid format".to_string()),
347                ..Default::default()
348            });
349        }
350        Ok(data) => data,
351    };
352
353    if data.correlation_id.len() == 0 {
354        return Err(DlDataResp {
355            correlation_id: data.correlation_id.clone(),
356            error: Some(err::E_PARAM.to_string()),
357            message: Some("invalid `correlationId`".to_string()),
358            ..Default::default()
359        });
360    }
361    match data.device_id.as_ref() {
362        None => {
363            match data.network_code.as_ref() {
364                None => {
365                    return Err(DlDataResp {
366                        correlation_id: data.correlation_id.clone(),
367                        error: Some(err::E_PARAM.to_string()),
368                        message: Some("missing `networkCode`".to_string()),
369                        ..Default::default()
370                    });
371                }
372                Some(code) => {
373                    let code = code.to_lowercase();
374                    match strings::is_code(code.as_str()) {
375                        false => {
376                            return Err(DlDataResp {
377                                correlation_id: data.correlation_id.clone(),
378                                error: Some(err::E_PARAM.to_string()),
379                                message: Some("invalid `networkCode`".to_string()),
380                                ..Default::default()
381                            });
382                        }
383                        true => {
384                            data.network_code = Some(code);
385                            ()
386                        }
387                    }
388                }
389            }
390            match data.network_addr.as_ref() {
391                None => {
392                    return Err(DlDataResp {
393                        correlation_id: data.correlation_id.clone(),
394                        error: Some(err::E_PARAM.to_string()),
395                        message: Some("missing `networkAddr`".to_string()),
396                        ..Default::default()
397                    });
398                }
399                Some(addr) => match addr.len() {
400                    0 => {
401                        return Err(DlDataResp {
402                            correlation_id: data.correlation_id.clone(),
403                            error: Some(err::E_PARAM.to_string()),
404                            message: Some("invalid `networkAddr`".to_string()),
405                            ..Default::default()
406                        });
407                    }
408                    _ => {
409                        data.network_addr = Some(addr.to_lowercase());
410                        ()
411                    }
412                },
413            }
414        }
415        Some(device_id) => match device_id.len() {
416            0 => {
417                return Err(DlDataResp {
418                    correlation_id: data.correlation_id.clone(),
419                    error: Some(err::E_PARAM.to_string()),
420                    message: Some("invalid `deviceId`".to_string()),
421                    ..Default::default()
422                });
423            }
424            _ => (),
425        },
426    }
427    if data.data.len() > 0 {
428        if let Err(_) = hex::decode(data.data.as_str()) {
429            return Err(DlDataResp {
430                correlation_id: data.correlation_id.clone(),
431                error: Some(err::E_PARAM.to_string()),
432                message: Some("invalid `data`".to_string()),
433                ..Default::default()
434            });
435        }
436        data.data = data.data.to_lowercase();
437    }
438    Ok(data)
439}