sylvia_iot_data/libs/mq/
broker.rs

1use std::{
2    collections::HashMap,
3    error::Error as StdError,
4    io::{Error as IoError, ErrorKind},
5    sync::Arc,
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use chrono::DateTime;
11use log::{error, info, warn};
12use serde::Deserialize;
13use serde_json::{Map, Value};
14use tokio::time;
15
16use super::{super::config::DataData as DataMqConfig, new_data_queue, Connection};
17use crate::models::{
18    application_dldata::{
19        ApplicationDlData, UpdateQueryCond as ApplicationDlDataCond,
20        Updates as ApplicationDlDataUpdate,
21    },
22    application_uldata::ApplicationUlData,
23    network_dldata::{
24        NetworkDlData, UpdateQueryCond as NetworkDlDataCond, Updates as NetworkDlDataUpdate,
25    },
26    network_uldata::NetworkUlData,
27    Model,
28};
29use general_mq::{
30    queue::{EventHandler, GmqQueue, Message, MessageHandler, Status},
31    Queue,
32};
33
34#[derive(Clone)]
35struct DataHandler {
36    model: Arc<dyn Model>,
37}
38
39#[derive(Deserialize)]
40#[serde(tag = "kind")]
41enum RecvDataMsg {
42    #[serde(rename = "application-uldata")]
43    AppUlData { data: AppUlData },
44    #[serde(rename = "application-dldata")]
45    AppDlData { data: AppDlData },
46    #[serde(rename = "application-dldata-result")]
47    AppDlDataResult { data: AppDlDataResult },
48    #[serde(rename = "network-uldata")]
49    NetUlData { data: NetUlData },
50    #[serde(rename = "network-dldata")]
51    NetDlData { data: NetDlData },
52    #[serde(rename = "network-dldata-result")]
53    NetDlDataResult { data: NetDlDataResult },
54}
55
56#[derive(Deserialize)]
57struct AppUlData {
58    #[serde(rename = "dataId")]
59    data_id: String,
60    proc: String,
61    #[serde(rename = "pub")]
62    publish: String,
63    #[serde(rename = "unitCode")]
64    unit_code: Option<String>,
65    #[serde(rename = "networkCode")]
66    network_code: String,
67    #[serde(rename = "networkAddr")]
68    network_addr: String,
69    #[serde(rename = "unitId")]
70    unit_id: String,
71    #[serde(rename = "deviceId")]
72    device_id: String,
73    time: String,
74    profile: String,
75    data: String,
76    extension: Option<Map<String, Value>>,
77}
78
79#[derive(Deserialize)]
80struct AppDlData {
81    #[serde(rename = "dataId")]
82    data_id: String,
83    proc: String,
84    status: i32,
85    #[serde(rename = "unitId")]
86    unit_id: String,
87    #[serde(rename = "deviceId")]
88    device_id: Option<String>,
89    #[serde(rename = "networkCode")]
90    network_code: Option<String>,
91    #[serde(rename = "networkAddr")]
92    network_addr: Option<String>,
93    profile: String,
94    data: String,
95    extension: Option<Map<String, Value>>,
96}
97
98#[derive(Deserialize)]
99struct AppDlDataResult {
100    #[serde(rename = "dataId")]
101    data_id: String,
102    resp: String,
103    status: i32,
104}
105
106#[derive(Deserialize)]
107struct NetUlData {
108    #[serde(rename = "dataId")]
109    data_id: String,
110    proc: String,
111    #[serde(rename = "unitCode")]
112    unit_code: Option<String>,
113    #[serde(rename = "networkCode")]
114    network_code: String,
115    #[serde(rename = "networkAddr")]
116    network_addr: String,
117    #[serde(rename = "unitId")]
118    unit_id: Option<String>,
119    #[serde(rename = "deviceId")]
120    device_id: Option<String>,
121    time: String,
122    profile: String,
123    data: String,
124    extension: Option<Map<String, Value>>,
125}
126
127#[derive(Deserialize)]
128struct NetDlData {
129    #[serde(rename = "dataId")]
130    data_id: String,
131    proc: String,
132    #[serde(rename = "pub")]
133    publish: String,
134    status: i32,
135    #[serde(rename = "unitId")]
136    unit_id: String,
137    #[serde(rename = "deviceId")]
138    device_id: String,
139    #[serde(rename = "networkCode")]
140    network_code: String,
141    #[serde(rename = "networkAddr")]
142    network_addr: String,
143    profile: String,
144    data: String,
145    extension: Option<Map<String, Value>>,
146}
147
148#[derive(Deserialize)]
149struct NetDlDataResult {
150    #[serde(rename = "dataId")]
151    data_id: String,
152    resp: String,
153    status: i32,
154}
155
156const QUEUE_NAME: &'static str = "broker.data";
157
158/// Create a receive queue to receive data from `broker.data` queue.
159pub fn new(
160    model: Arc<dyn Model>,
161    mq_conns: &mut HashMap<String, Connection>,
162    config: &DataMqConfig,
163) -> Result<Queue, Box<dyn StdError>> {
164    let handler = Arc::new(DataHandler { model });
165    match new_data_queue(mq_conns, config, QUEUE_NAME, handler.clone(), handler) {
166        Err(e) => Err(Box::new(IoError::new(ErrorKind::Other, e))),
167        Ok(q) => Ok(q),
168    }
169}
170
171#[async_trait]
172impl EventHandler for DataHandler {
173    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
174        const FN_NAME: &'static str = "DataHandler::on_error";
175        let queue_name = queue.name();
176        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
177    }
178
179    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
180        const FN_NAME: &'static str = "DataHandler::on_status";
181        let queue_name = queue.name();
182
183        match status {
184            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
185            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
186        }
187    }
188}
189
190#[async_trait]
191impl MessageHandler for DataHandler {
192    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
193        const FN_NAME: &'static str = "DataHandler::on_message";
194        let queue_name = queue.name();
195
196        let data_msg = match serde_json::from_slice::<RecvDataMsg>(msg.payload()) {
197            Err(e) => {
198                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
199                warn!(
200                    "[{}] {} parse JSON error: {}, src: {}",
201                    FN_NAME, queue_name, e, src_str
202                );
203                if let Err(e) = msg.ack().await {
204                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
205                }
206                return;
207            }
208            Ok(msg) => msg,
209        };
210        match data_msg {
211            RecvDataMsg::AppDlData { data } => {
212                let data = ApplicationDlData {
213                    data_id: data.data_id,
214                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
215                        Err(e) => {
216                            warn!(
217                                "[{}] {} parse application_dldata proc \"{}\" error: {}",
218                                FN_NAME, queue_name, data.proc, e
219                            );
220                            if let Err(e) = msg.ack().await {
221                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
222                            }
223                            return;
224                        }
225                        Ok(proc) => proc.into(),
226                    },
227                    resp: None,
228                    status: data.status,
229                    unit_id: data.unit_id,
230                    device_id: data.device_id,
231                    network_code: data.network_code,
232                    network_addr: data.network_addr,
233                    profile: data.profile,
234                    data: data.data,
235                    extension: data.extension,
236                };
237                let mut is_err = false;
238                if let Err(e) = self.model.application_dldata().add(&data).await {
239                    error!(
240                        "[{}] {} add application_dldata error: {}",
241                        FN_NAME, queue_name, e
242                    );
243                    is_err = true;
244                }
245                if is_err {
246                    time::sleep(Duration::from_secs(1)).await;
247                    if let Err(e) = msg.nack().await {
248                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
249                    }
250                    return;
251                }
252            }
253            RecvDataMsg::AppDlDataResult { data } => {
254                // FIXME: wait 1 second to wait for the associated dldata has been written in DB.
255                time::sleep(Duration::from_secs(1)).await;
256
257                let cond = ApplicationDlDataCond {
258                    data_id: data.data_id.as_str(),
259                };
260                let updates = ApplicationDlDataUpdate {
261                    resp: match DateTime::parse_from_rfc3339(data.resp.as_str()) {
262                        Err(e) => {
263                            warn!(
264                                "[{}] {} parse application_dldata resp \"{}\" error: {}",
265                                FN_NAME, queue_name, data.resp, e
266                            );
267                            if let Err(e) = msg.ack().await {
268                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
269                            }
270                            return;
271                        }
272                        Ok(resp) => resp.into(),
273                    },
274                    status: data.status,
275                };
276                let mut is_err = false;
277                if let Err(e) = self
278                    .model
279                    .application_dldata()
280                    .update(&cond, &updates)
281                    .await
282                {
283                    error!(
284                        "[{}] {} update application_dldata error: {}",
285                        FN_NAME, queue_name, e
286                    );
287                    is_err = true;
288                }
289                if is_err {
290                    time::sleep(Duration::from_secs(1)).await;
291                    if let Err(e) = msg.nack().await {
292                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
293                    }
294                    return;
295                }
296            }
297            RecvDataMsg::AppUlData { data } => {
298                let data = ApplicationUlData {
299                    data_id: data.data_id,
300                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
301                        Err(e) => {
302                            warn!(
303                                "[{}] {} parse application_uldata proc \"{}\" error: {}",
304                                FN_NAME, queue_name, data.proc, e
305                            );
306                            if let Err(e) = msg.ack().await {
307                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
308                            }
309                            return;
310                        }
311                        Ok(proc) => proc.into(),
312                    },
313                    publish: match DateTime::parse_from_rfc3339(data.publish.as_str()) {
314                        Err(e) => {
315                            warn!(
316                                "[{}] {} parse application_uldata publish \"{}\" error: {}",
317                                FN_NAME, queue_name, data.publish, e
318                            );
319                            if let Err(e) = msg.ack().await {
320                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
321                            }
322                            return;
323                        }
324                        Ok(publish) => publish.into(),
325                    },
326                    unit_code: data.unit_code,
327                    network_code: data.network_code,
328                    network_addr: data.network_addr,
329                    unit_id: data.unit_id,
330                    device_id: data.device_id,
331                    time: match DateTime::parse_from_rfc3339(data.time.as_str()) {
332                        Err(e) => {
333                            warn!(
334                                "[{}] {} parse application_uldata time \"{}\" error: {}",
335                                FN_NAME, queue_name, data.time, e
336                            );
337                            if let Err(e) = msg.ack().await {
338                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
339                            }
340                            return;
341                        }
342                        Ok(time) => time.into(),
343                    },
344                    profile: data.profile,
345                    data: data.data,
346                    extension: data.extension,
347                };
348                let mut is_err = false;
349                if let Err(e) = self.model.application_uldata().add(&data).await {
350                    error!(
351                        "[{}] {} add application_uldata error: {}",
352                        FN_NAME, queue_name, e
353                    );
354                    is_err = true;
355                }
356                if is_err {
357                    time::sleep(Duration::from_secs(1)).await;
358                    if let Err(e) = msg.nack().await {
359                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
360                    }
361                    return;
362                }
363            }
364            RecvDataMsg::NetDlData { data } => {
365                let data = NetworkDlData {
366                    data_id: data.data_id,
367                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
368                        Err(e) => {
369                            warn!(
370                                "[{}] {} parse network_dldata proc \"{}\" error: {}",
371                                FN_NAME, queue_name, data.proc, e
372                            );
373                            if let Err(e) = msg.ack().await {
374                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
375                            }
376                            return;
377                        }
378                        Ok(proc) => proc.into(),
379                    },
380                    publish: match DateTime::parse_from_rfc3339(data.publish.as_str()) {
381                        Err(e) => {
382                            warn!(
383                                "[{}] {} parse network_dldata publish \"{}\" error: {}",
384                                FN_NAME, queue_name, data.publish, e
385                            );
386                            if let Err(e) = msg.ack().await {
387                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
388                            }
389                            return;
390                        }
391                        Ok(publish) => publish.into(),
392                    },
393                    resp: None,
394                    status: data.status,
395                    unit_id: data.unit_id,
396                    device_id: data.device_id,
397                    network_code: data.network_code,
398                    network_addr: data.network_addr,
399                    profile: data.profile,
400                    data: data.data,
401                    extension: data.extension,
402                };
403                let mut is_err = false;
404                if let Err(e) = self.model.network_dldata().add(&data).await {
405                    error!(
406                        "[{}] {} add network_dldata error: {}",
407                        FN_NAME, queue_name, e
408                    );
409                    is_err = true;
410                }
411                if is_err {
412                    time::sleep(Duration::from_secs(1)).await;
413                    if let Err(e) = msg.nack().await {
414                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
415                    }
416                    return;
417                }
418            }
419            RecvDataMsg::NetDlDataResult { data } => {
420                // FIXME: wait 1 second to wait for the associated dldata has been written in DB.
421                time::sleep(Duration::from_secs(1)).await;
422
423                let cond = NetworkDlDataCond {
424                    data_id: data.data_id.as_str(),
425                };
426                let updates = NetworkDlDataUpdate {
427                    resp: match DateTime::parse_from_rfc3339(data.resp.as_str()) {
428                        Err(e) => {
429                            warn!(
430                                "[{}] {} parse network_dldata resp \"{}\" error: {}",
431                                FN_NAME, queue_name, data.resp, e
432                            );
433                            if let Err(e) = msg.ack().await {
434                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
435                            }
436                            return;
437                        }
438                        Ok(resp) => resp.into(),
439                    },
440                    status: data.status,
441                };
442                let mut is_err = false;
443                if let Err(e) = self.model.network_dldata().update(&cond, &updates).await {
444                    error!(
445                        "[{}] {} update network_dldata error: {}",
446                        FN_NAME, queue_name, e
447                    );
448                    is_err = true;
449                }
450                if is_err {
451                    time::sleep(Duration::from_secs(1)).await;
452                    if let Err(e) = msg.nack().await {
453                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
454                    }
455                    return;
456                }
457            }
458            RecvDataMsg::NetUlData { data } => {
459                let data = NetworkUlData {
460                    data_id: data.data_id,
461                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
462                        Err(e) => {
463                            warn!(
464                                "[{}] {} parse network_uldata proc \"{}\" error: {}",
465                                FN_NAME, queue_name, data.proc, e
466                            );
467                            if let Err(e) = msg.ack().await {
468                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
469                            }
470                            return;
471                        }
472                        Ok(proc) => proc.into(),
473                    },
474                    unit_code: data.unit_code,
475                    network_code: data.network_code,
476                    network_addr: data.network_addr,
477                    unit_id: data.unit_id,
478                    device_id: data.device_id,
479                    time: match DateTime::parse_from_rfc3339(data.time.as_str()) {
480                        Err(e) => {
481                            warn!(
482                                "[{}] {} parse network_uldata time \"{}\" error: {}",
483                                FN_NAME, queue_name, data.time, e
484                            );
485                            if let Err(e) = msg.ack().await {
486                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
487                            }
488                            return;
489                        }
490                        Ok(time) => time.into(),
491                    },
492                    profile: data.profile,
493                    data: data.data,
494                    extension: data.extension,
495                };
496                let mut is_err = false;
497                if let Err(e) = self.model.network_uldata().add(&data).await {
498                    error!(
499                        "[{}] {} add network_uldata error: {}",
500                        FN_NAME, queue_name, e
501                    );
502                    is_err = true;
503                }
504                if is_err {
505                    time::sleep(Duration::from_secs(1)).await;
506                    if let Err(e) = msg.nack().await {
507                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
508                    }
509                    return;
510                }
511            }
512        }
513        if let Err(e) = msg.ack().await {
514            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
515        }
516    }
517}