1use std::{
2 collections::HashMap,
3 error::Error as StdError,
4 io::{Error as IoError, ErrorKind},
5 sync::Arc,
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use chrono::DateTime;
11use log::{error, info, warn};
12use serde::Deserialize;
13use serde_json::{Map, Value};
14use tokio::time;
15
16use super::{super::config::DataData as DataMqConfig, new_data_queue, Connection};
17use crate::models::{
18 application_dldata::{
19 ApplicationDlData, UpdateQueryCond as ApplicationDlDataCond,
20 Updates as ApplicationDlDataUpdate,
21 },
22 application_uldata::ApplicationUlData,
23 network_dldata::{
24 NetworkDlData, UpdateQueryCond as NetworkDlDataCond, Updates as NetworkDlDataUpdate,
25 },
26 network_uldata::NetworkUlData,
27 Model,
28};
29use general_mq::{
30 queue::{EventHandler, GmqQueue, Message, MessageHandler, Status},
31 Queue,
32};
33
34#[derive(Clone)]
35struct DataHandler {
36 model: Arc<dyn Model>,
37}
38
39#[derive(Deserialize)]
40#[serde(tag = "kind")]
41enum RecvDataMsg {
42 #[serde(rename = "application-uldata")]
43 AppUlData { data: AppUlData },
44 #[serde(rename = "application-dldata")]
45 AppDlData { data: AppDlData },
46 #[serde(rename = "application-dldata-result")]
47 AppDlDataResult { data: AppDlDataResult },
48 #[serde(rename = "network-uldata")]
49 NetUlData { data: NetUlData },
50 #[serde(rename = "network-dldata")]
51 NetDlData { data: NetDlData },
52 #[serde(rename = "network-dldata-result")]
53 NetDlDataResult { data: NetDlDataResult },
54}
55
56#[derive(Deserialize)]
57struct AppUlData {
58 #[serde(rename = "dataId")]
59 data_id: String,
60 proc: String,
61 #[serde(rename = "pub")]
62 publish: String,
63 #[serde(rename = "unitCode")]
64 unit_code: Option<String>,
65 #[serde(rename = "networkCode")]
66 network_code: String,
67 #[serde(rename = "networkAddr")]
68 network_addr: String,
69 #[serde(rename = "unitId")]
70 unit_id: String,
71 #[serde(rename = "deviceId")]
72 device_id: String,
73 time: String,
74 profile: String,
75 data: String,
76 extension: Option<Map<String, Value>>,
77}
78
79#[derive(Deserialize)]
80struct AppDlData {
81 #[serde(rename = "dataId")]
82 data_id: String,
83 proc: String,
84 status: i32,
85 #[serde(rename = "unitId")]
86 unit_id: String,
87 #[serde(rename = "deviceId")]
88 device_id: Option<String>,
89 #[serde(rename = "networkCode")]
90 network_code: Option<String>,
91 #[serde(rename = "networkAddr")]
92 network_addr: Option<String>,
93 profile: String,
94 data: String,
95 extension: Option<Map<String, Value>>,
96}
97
98#[derive(Deserialize)]
99struct AppDlDataResult {
100 #[serde(rename = "dataId")]
101 data_id: String,
102 resp: String,
103 status: i32,
104}
105
106#[derive(Deserialize)]
107struct NetUlData {
108 #[serde(rename = "dataId")]
109 data_id: String,
110 proc: String,
111 #[serde(rename = "unitCode")]
112 unit_code: Option<String>,
113 #[serde(rename = "networkCode")]
114 network_code: String,
115 #[serde(rename = "networkAddr")]
116 network_addr: String,
117 #[serde(rename = "unitId")]
118 unit_id: Option<String>,
119 #[serde(rename = "deviceId")]
120 device_id: Option<String>,
121 time: String,
122 profile: String,
123 data: String,
124 extension: Option<Map<String, Value>>,
125}
126
127#[derive(Deserialize)]
128struct NetDlData {
129 #[serde(rename = "dataId")]
130 data_id: String,
131 proc: String,
132 #[serde(rename = "pub")]
133 publish: String,
134 status: i32,
135 #[serde(rename = "unitId")]
136 unit_id: String,
137 #[serde(rename = "deviceId")]
138 device_id: String,
139 #[serde(rename = "networkCode")]
140 network_code: String,
141 #[serde(rename = "networkAddr")]
142 network_addr: String,
143 profile: String,
144 data: String,
145 extension: Option<Map<String, Value>>,
146}
147
148#[derive(Deserialize)]
149struct NetDlDataResult {
150 #[serde(rename = "dataId")]
151 data_id: String,
152 resp: String,
153 status: i32,
154}
155
156const QUEUE_NAME: &'static str = "broker.data";
157
158pub fn new(
160 model: Arc<dyn Model>,
161 mq_conns: &mut HashMap<String, Connection>,
162 config: &DataMqConfig,
163) -> Result<Queue, Box<dyn StdError>> {
164 let handler = Arc::new(DataHandler { model });
165 match new_data_queue(mq_conns, config, QUEUE_NAME, handler.clone(), handler) {
166 Err(e) => Err(Box::new(IoError::new(ErrorKind::Other, e))),
167 Ok(q) => Ok(q),
168 }
169}
170
171#[async_trait]
172impl EventHandler for DataHandler {
173 async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
174 const FN_NAME: &'static str = "DataHandler::on_error";
175 let queue_name = queue.name();
176 error!("[{}] {} error: {}", FN_NAME, queue_name, err);
177 }
178
179 async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
180 const FN_NAME: &'static str = "DataHandler::on_status";
181 let queue_name = queue.name();
182
183 match status {
184 Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
185 _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
186 }
187 }
188}
189
190#[async_trait]
191impl MessageHandler for DataHandler {
192 async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
193 const FN_NAME: &'static str = "DataHandler::on_message";
194 let queue_name = queue.name();
195
196 let data_msg = match serde_json::from_slice::<RecvDataMsg>(msg.payload()) {
197 Err(e) => {
198 let src_str: String = String::from_utf8_lossy(msg.payload()).into();
199 warn!(
200 "[{}] {} parse JSON error: {}, src: {}",
201 FN_NAME, queue_name, e, src_str
202 );
203 if let Err(e) = msg.ack().await {
204 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
205 }
206 return;
207 }
208 Ok(msg) => msg,
209 };
210 match data_msg {
211 RecvDataMsg::AppDlData { data } => {
212 let data = ApplicationDlData {
213 data_id: data.data_id,
214 proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
215 Err(e) => {
216 warn!(
217 "[{}] {} parse application_dldata proc \"{}\" error: {}",
218 FN_NAME, queue_name, data.proc, e
219 );
220 if let Err(e) = msg.ack().await {
221 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
222 }
223 return;
224 }
225 Ok(proc) => proc.into(),
226 },
227 resp: None,
228 status: data.status,
229 unit_id: data.unit_id,
230 device_id: data.device_id,
231 network_code: data.network_code,
232 network_addr: data.network_addr,
233 profile: data.profile,
234 data: data.data,
235 extension: data.extension,
236 };
237 let mut is_err = false;
238 if let Err(e) = self.model.application_dldata().add(&data).await {
239 error!(
240 "[{}] {} add application_dldata error: {}",
241 FN_NAME, queue_name, e
242 );
243 is_err = true;
244 }
245 if is_err {
246 time::sleep(Duration::from_secs(1)).await;
247 if let Err(e) = msg.nack().await {
248 error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
249 }
250 return;
251 }
252 }
253 RecvDataMsg::AppDlDataResult { data } => {
254 time::sleep(Duration::from_secs(1)).await;
256
257 let cond = ApplicationDlDataCond {
258 data_id: data.data_id.as_str(),
259 };
260 let updates = ApplicationDlDataUpdate {
261 resp: match DateTime::parse_from_rfc3339(data.resp.as_str()) {
262 Err(e) => {
263 warn!(
264 "[{}] {} parse application_dldata resp \"{}\" error: {}",
265 FN_NAME, queue_name, data.resp, e
266 );
267 if let Err(e) = msg.ack().await {
268 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
269 }
270 return;
271 }
272 Ok(resp) => resp.into(),
273 },
274 status: data.status,
275 };
276 let mut is_err = false;
277 if let Err(e) = self
278 .model
279 .application_dldata()
280 .update(&cond, &updates)
281 .await
282 {
283 error!(
284 "[{}] {} update application_dldata error: {}",
285 FN_NAME, queue_name, e
286 );
287 is_err = true;
288 }
289 if is_err {
290 time::sleep(Duration::from_secs(1)).await;
291 if let Err(e) = msg.nack().await {
292 error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
293 }
294 return;
295 }
296 }
297 RecvDataMsg::AppUlData { data } => {
298 let data = ApplicationUlData {
299 data_id: data.data_id,
300 proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
301 Err(e) => {
302 warn!(
303 "[{}] {} parse application_uldata proc \"{}\" error: {}",
304 FN_NAME, queue_name, data.proc, e
305 );
306 if let Err(e) = msg.ack().await {
307 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
308 }
309 return;
310 }
311 Ok(proc) => proc.into(),
312 },
313 publish: match DateTime::parse_from_rfc3339(data.publish.as_str()) {
314 Err(e) => {
315 warn!(
316 "[{}] {} parse application_uldata publish \"{}\" error: {}",
317 FN_NAME, queue_name, data.publish, e
318 );
319 if let Err(e) = msg.ack().await {
320 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
321 }
322 return;
323 }
324 Ok(publish) => publish.into(),
325 },
326 unit_code: data.unit_code,
327 network_code: data.network_code,
328 network_addr: data.network_addr,
329 unit_id: data.unit_id,
330 device_id: data.device_id,
331 time: match DateTime::parse_from_rfc3339(data.time.as_str()) {
332 Err(e) => {
333 warn!(
334 "[{}] {} parse application_uldata time \"{}\" error: {}",
335 FN_NAME, queue_name, data.time, e
336 );
337 if let Err(e) = msg.ack().await {
338 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
339 }
340 return;
341 }
342 Ok(time) => time.into(),
343 },
344 profile: data.profile,
345 data: data.data,
346 extension: data.extension,
347 };
348 let mut is_err = false;
349 if let Err(e) = self.model.application_uldata().add(&data).await {
350 error!(
351 "[{}] {} add application_uldata error: {}",
352 FN_NAME, queue_name, e
353 );
354 is_err = true;
355 }
356 if is_err {
357 time::sleep(Duration::from_secs(1)).await;
358 if let Err(e) = msg.nack().await {
359 error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
360 }
361 return;
362 }
363 }
364 RecvDataMsg::NetDlData { data } => {
365 let data = NetworkDlData {
366 data_id: data.data_id,
367 proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
368 Err(e) => {
369 warn!(
370 "[{}] {} parse network_dldata proc \"{}\" error: {}",
371 FN_NAME, queue_name, data.proc, e
372 );
373 if let Err(e) = msg.ack().await {
374 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
375 }
376 return;
377 }
378 Ok(proc) => proc.into(),
379 },
380 publish: match DateTime::parse_from_rfc3339(data.publish.as_str()) {
381 Err(e) => {
382 warn!(
383 "[{}] {} parse network_dldata publish \"{}\" error: {}",
384 FN_NAME, queue_name, data.publish, e
385 );
386 if let Err(e) = msg.ack().await {
387 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
388 }
389 return;
390 }
391 Ok(publish) => publish.into(),
392 },
393 resp: None,
394 status: data.status,
395 unit_id: data.unit_id,
396 device_id: data.device_id,
397 network_code: data.network_code,
398 network_addr: data.network_addr,
399 profile: data.profile,
400 data: data.data,
401 extension: data.extension,
402 };
403 let mut is_err = false;
404 if let Err(e) = self.model.network_dldata().add(&data).await {
405 error!(
406 "[{}] {} add network_dldata error: {}",
407 FN_NAME, queue_name, e
408 );
409 is_err = true;
410 }
411 if is_err {
412 time::sleep(Duration::from_secs(1)).await;
413 if let Err(e) = msg.nack().await {
414 error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
415 }
416 return;
417 }
418 }
419 RecvDataMsg::NetDlDataResult { data } => {
420 time::sleep(Duration::from_secs(1)).await;
422
423 let cond = NetworkDlDataCond {
424 data_id: data.data_id.as_str(),
425 };
426 let updates = NetworkDlDataUpdate {
427 resp: match DateTime::parse_from_rfc3339(data.resp.as_str()) {
428 Err(e) => {
429 warn!(
430 "[{}] {} parse network_dldata resp \"{}\" error: {}",
431 FN_NAME, queue_name, data.resp, e
432 );
433 if let Err(e) = msg.ack().await {
434 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
435 }
436 return;
437 }
438 Ok(resp) => resp.into(),
439 },
440 status: data.status,
441 };
442 let mut is_err = false;
443 if let Err(e) = self.model.network_dldata().update(&cond, &updates).await {
444 error!(
445 "[{}] {} update network_dldata error: {}",
446 FN_NAME, queue_name, e
447 );
448 is_err = true;
449 }
450 if is_err {
451 time::sleep(Duration::from_secs(1)).await;
452 if let Err(e) = msg.nack().await {
453 error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
454 }
455 return;
456 }
457 }
458 RecvDataMsg::NetUlData { data } => {
459 let data = NetworkUlData {
460 data_id: data.data_id,
461 proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
462 Err(e) => {
463 warn!(
464 "[{}] {} parse network_uldata proc \"{}\" error: {}",
465 FN_NAME, queue_name, data.proc, e
466 );
467 if let Err(e) = msg.ack().await {
468 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
469 }
470 return;
471 }
472 Ok(proc) => proc.into(),
473 },
474 unit_code: data.unit_code,
475 network_code: data.network_code,
476 network_addr: data.network_addr,
477 unit_id: data.unit_id,
478 device_id: data.device_id,
479 time: match DateTime::parse_from_rfc3339(data.time.as_str()) {
480 Err(e) => {
481 warn!(
482 "[{}] {} parse network_uldata time \"{}\" error: {}",
483 FN_NAME, queue_name, data.time, e
484 );
485 if let Err(e) = msg.ack().await {
486 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
487 }
488 return;
489 }
490 Ok(time) => time.into(),
491 },
492 profile: data.profile,
493 data: data.data,
494 extension: data.extension,
495 };
496 let mut is_err = false;
497 if let Err(e) = self.model.network_uldata().add(&data).await {
498 error!(
499 "[{}] {} add network_uldata error: {}",
500 FN_NAME, queue_name, e
501 );
502 is_err = true;
503 }
504 if is_err {
505 time::sleep(Duration::from_secs(1)).await;
506 if let Err(e) = msg.nack().await {
507 error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
508 }
509 return;
510 }
511 }
512 }
513 if let Err(e) = msg.ack().await {
514 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
515 }
516 }
517}