sylvia_iot_data/libs/mq/
coremgr.rs

1use std::{
2    collections::HashMap,
3    error::Error as StdError,
4    io::{Error as IoError, ErrorKind},
5    sync::Arc,
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use chrono::DateTime;
11use log::{error, info, warn};
12use serde::Deserialize;
13use serde_json::{Map, Value};
14use tokio::time;
15
16use general_mq::{
17    queue::{EventHandler, GmqQueue, Message, MessageHandler, Status},
18    Queue,
19};
20
21use super::{super::config::DataData as DataMqConfig, new_data_queue, Connection};
22use crate::models::{coremgr_opdata::CoremgrOpData, Model};
23
24#[derive(Clone)]
25struct DataHandler {
26    model: Arc<dyn Model>,
27}
28
29#[derive(Deserialize)]
30#[serde(tag = "kind")]
31enum RecvDataMsg {
32    #[serde(rename = "operation")]
33    Operation { data: CmOpData },
34}
35
36#[derive(Deserialize)]
37struct CmOpData {
38    #[serde(rename = "dataId")]
39    data_id: String,
40    #[serde(rename = "reqTime")]
41    req_time: String,
42    #[serde(rename = "resTime")]
43    res_time: String,
44    #[serde(rename = "latencyMs")]
45    latency_ms: i64,
46    status: i32,
47    #[serde(rename = "sourceIp")]
48    source_ip: String,
49    method: String,
50    path: String,
51    body: Option<Map<String, Value>>,
52    #[serde(rename = "userId")]
53    user_id: String,
54    #[serde(rename = "clientId")]
55    client_id: String,
56    #[serde(rename = "errCode")]
57    err_code: Option<String>,
58    #[serde(rename = "errMessage")]
59    err_message: Option<String>,
60}
61
62const QUEUE_NAME: &'static str = "coremgr.data";
63
64/// Create a receive queue to receive data from `coremgr.data` queue.
65pub fn new(
66    model: Arc<dyn Model>,
67    mq_conns: &mut HashMap<String, Connection>,
68    config: &DataMqConfig,
69) -> Result<Queue, Box<dyn StdError>> {
70    let handler = Arc::new(DataHandler { model });
71    match new_data_queue(mq_conns, config, QUEUE_NAME, handler.clone(), handler) {
72        Err(e) => Err(Box::new(IoError::new(ErrorKind::Other, e))),
73        Ok(q) => Ok(q),
74    }
75}
76
77#[async_trait]
78impl EventHandler for DataHandler {
79    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
80        const FN_NAME: &'static str = "DataHandler::on_error";
81        let queue_name = queue.name();
82        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
83    }
84
85    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
86        const FN_NAME: &'static str = "DataHandler::on_status";
87        let queue_name = queue.name();
88
89        match status {
90            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
91            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
92        }
93    }
94}
95
96#[async_trait]
97impl MessageHandler for DataHandler {
98    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
99        const FN_NAME: &'static str = "DataHandler::on_message";
100        let queue_name = queue.name();
101
102        let data_msg = match serde_json::from_slice::<RecvDataMsg>(msg.payload()) {
103            Err(e) => {
104                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
105                warn!(
106                    "[{}] {} parse JSON error: {}, src: {}",
107                    FN_NAME, queue_name, e, src_str
108                );
109                if let Err(e) = msg.ack().await {
110                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
111                }
112                return;
113            }
114            Ok(msg) => msg,
115        };
116        match data_msg {
117            RecvDataMsg::Operation { data } => {
118                let data = CoremgrOpData {
119                    data_id: data.data_id,
120                    req_time: match DateTime::parse_from_rfc3339(data.req_time.as_str()) {
121                        Err(e) => {
122                            warn!(
123                                "[{}] {} parse coremgr_opdata req_time \"{}\" error: {}",
124                                FN_NAME, queue_name, data.req_time, e
125                            );
126                            if let Err(e) = msg.ack().await {
127                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
128                            }
129                            return;
130                        }
131                        Ok(req_time) => req_time.into(),
132                    },
133                    res_time: match DateTime::parse_from_rfc3339(data.res_time.as_str()) {
134                        Err(e) => {
135                            warn!(
136                                "[{}] {} parse coremgr_opdata res_time \"{}\" error: {}",
137                                FN_NAME, queue_name, data.res_time, e
138                            );
139                            if let Err(e) = msg.ack().await {
140                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
141                            }
142                            return;
143                        }
144                        Ok(res_time) => res_time.into(),
145                    },
146                    latency_ms: data.latency_ms,
147                    status: data.status,
148                    source_ip: data.source_ip,
149                    method: data.method,
150                    path: data.path,
151                    body: data.body,
152                    user_id: data.user_id,
153                    client_id: data.client_id,
154                    err_code: data.err_code,
155                    err_message: data.err_message,
156                };
157                let mut is_err = false;
158                if let Err(e) = self.model.coremgr_opdata().add(&data).await {
159                    error!(
160                        "[{}] {} add coremgr_opdata error: {}",
161                        FN_NAME, queue_name, e
162                    );
163                    is_err = true;
164                }
165                if is_err {
166                    time::sleep(Duration::from_secs(1)).await;
167                    if let Err(e) = msg.nack().await {
168                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
169                    }
170                    return;
171                }
172            }
173        }
174        if let Err(e) = msg.ack().await {
175            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
176        }
177    }
178}