1use std::{
2 collections::HashMap,
3 error::Error as StdError,
4 io::{Error as IoError, ErrorKind},
5 sync::Arc,
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use chrono::DateTime;
11use log::{error, info, warn};
12use serde::Deserialize;
13use serde_json::{Map, Value};
14use tokio::time;
15
16use general_mq::{
17 queue::{EventHandler, GmqQueue, Message, MessageHandler, Status},
18 Queue,
19};
20
21use super::{super::config::DataData as DataMqConfig, new_data_queue, Connection};
22use crate::models::{coremgr_opdata::CoremgrOpData, Model};
23
24#[derive(Clone)]
25struct DataHandler {
26 model: Arc<dyn Model>,
27}
28
29#[derive(Deserialize)]
30#[serde(tag = "kind")]
31enum RecvDataMsg {
32 #[serde(rename = "operation")]
33 Operation { data: CmOpData },
34}
35
36#[derive(Deserialize)]
37struct CmOpData {
38 #[serde(rename = "dataId")]
39 data_id: String,
40 #[serde(rename = "reqTime")]
41 req_time: String,
42 #[serde(rename = "resTime")]
43 res_time: String,
44 #[serde(rename = "latencyMs")]
45 latency_ms: i64,
46 status: i32,
47 #[serde(rename = "sourceIp")]
48 source_ip: String,
49 method: String,
50 path: String,
51 body: Option<Map<String, Value>>,
52 #[serde(rename = "userId")]
53 user_id: String,
54 #[serde(rename = "clientId")]
55 client_id: String,
56 #[serde(rename = "errCode")]
57 err_code: Option<String>,
58 #[serde(rename = "errMessage")]
59 err_message: Option<String>,
60}
61
62const QUEUE_NAME: &'static str = "coremgr.data";
63
64pub fn new(
66 model: Arc<dyn Model>,
67 mq_conns: &mut HashMap<String, Connection>,
68 config: &DataMqConfig,
69) -> Result<Queue, Box<dyn StdError>> {
70 let handler = Arc::new(DataHandler { model });
71 match new_data_queue(mq_conns, config, QUEUE_NAME, handler.clone(), handler) {
72 Err(e) => Err(Box::new(IoError::new(ErrorKind::Other, e))),
73 Ok(q) => Ok(q),
74 }
75}
76
77#[async_trait]
78impl EventHandler for DataHandler {
79 async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
80 const FN_NAME: &'static str = "DataHandler::on_error";
81 let queue_name = queue.name();
82 error!("[{}] {} error: {}", FN_NAME, queue_name, err);
83 }
84
85 async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
86 const FN_NAME: &'static str = "DataHandler::on_status";
87 let queue_name = queue.name();
88
89 match status {
90 Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
91 _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
92 }
93 }
94}
95
96#[async_trait]
97impl MessageHandler for DataHandler {
98 async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
99 const FN_NAME: &'static str = "DataHandler::on_message";
100 let queue_name = queue.name();
101
102 let data_msg = match serde_json::from_slice::<RecvDataMsg>(msg.payload()) {
103 Err(e) => {
104 let src_str: String = String::from_utf8_lossy(msg.payload()).into();
105 warn!(
106 "[{}] {} parse JSON error: {}, src: {}",
107 FN_NAME, queue_name, e, src_str
108 );
109 if let Err(e) = msg.ack().await {
110 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
111 }
112 return;
113 }
114 Ok(msg) => msg,
115 };
116 match data_msg {
117 RecvDataMsg::Operation { data } => {
118 let data = CoremgrOpData {
119 data_id: data.data_id,
120 req_time: match DateTime::parse_from_rfc3339(data.req_time.as_str()) {
121 Err(e) => {
122 warn!(
123 "[{}] {} parse coremgr_opdata req_time \"{}\" error: {}",
124 FN_NAME, queue_name, data.req_time, e
125 );
126 if let Err(e) = msg.ack().await {
127 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
128 }
129 return;
130 }
131 Ok(req_time) => req_time.into(),
132 },
133 res_time: match DateTime::parse_from_rfc3339(data.res_time.as_str()) {
134 Err(e) => {
135 warn!(
136 "[{}] {} parse coremgr_opdata res_time \"{}\" error: {}",
137 FN_NAME, queue_name, data.res_time, e
138 );
139 if let Err(e) = msg.ack().await {
140 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
141 }
142 return;
143 }
144 Ok(res_time) => res_time.into(),
145 },
146 latency_ms: data.latency_ms,
147 status: data.status,
148 source_ip: data.source_ip,
149 method: data.method,
150 path: data.path,
151 body: data.body,
152 user_id: data.user_id,
153 client_id: data.client_id,
154 err_code: data.err_code,
155 err_message: data.err_message,
156 };
157 let mut is_err = false;
158 if let Err(e) = self.model.coremgr_opdata().add(&data).await {
159 error!(
160 "[{}] {} add coremgr_opdata error: {}",
161 FN_NAME, queue_name, e
162 );
163 is_err = true;
164 }
165 if is_err {
166 time::sleep(Duration::from_secs(1)).await;
167 if let Err(e) = msg.nack().await {
168 error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
169 }
170 return;
171 }
172 }
173 }
174 if let Err(e) = msg.ack().await {
175 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
176 }
177 }
178}