sylvia_iot_broker/libs/mq/
network.rs1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 error::Error as StdError,
5 sync::{Arc, Mutex},
6};
7
8use async_trait::async_trait;
9use chrono::DateTime;
10use hex;
11use log::{error, warn};
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value};
14use tokio::task;
15use url::Url;
16
17use general_mq::{
18 queue::{
19 EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus,
20 },
21 Queue,
22};
23use sylvia_iot_corelib::strings;
24
25use super::{
26 get_connection, new_ctrl_queues, new_data_queues, remove_connection, Connection, MgrMqStatus,
27 MgrStatus, Options,
28};
29
30#[derive(Deserialize)]
32pub struct UlData {
33 pub time: String,
34 #[serde(rename = "networkAddr")]
35 pub network_addr: String,
36 pub data: String,
37 pub extension: Option<Map<String, Value>>,
38}
39
40#[derive(Serialize)]
42pub struct DlData {
43 #[serde(rename = "dataId")]
44 pub data_id: String,
45 #[serde(rename = "pub")]
46 pub publish: String,
47 #[serde(rename = "expiresIn")]
48 pub expires_in: i64,
49 #[serde(rename = "networkAddr")]
50 pub network_addr: String,
51 pub data: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub extension: Option<Map<String, Value>>,
54}
55
56#[derive(Deserialize)]
58pub struct DlDataResult {
59 #[serde(rename = "dataId")]
60 pub data_id: String,
61 pub status: i32,
62 pub message: Option<String>,
63}
64
65#[derive(Clone)]
67pub struct NetworkMgr {
68 opts: Arc<Options>,
69
70 conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
72 host_uri: String,
73
74 uldata: Arc<Mutex<Queue>>,
75 dldata: Arc<Mutex<Queue>>,
76 dldata_result: Arc<Mutex<Queue>>,
77 ctrl: Arc<Mutex<Queue>>,
78
79 status: Arc<Mutex<MgrStatus>>,
80 handler: Arc<Mutex<Arc<dyn EventHandler>>>,
81}
82
83#[async_trait]
85pub trait EventHandler: Send + Sync {
86 async fn on_status_change(&self, mgr: &NetworkMgr, status: MgrStatus);
87
88 async fn on_uldata(&self, mgr: &NetworkMgr, data: Box<UlData>) -> Result<(), ()>;
89 async fn on_dldata_result(&self, mgr: &NetworkMgr, data: Box<DlDataResult>) -> Result<(), ()>;
90}
91
92struct MgrMqEventHandler {
94 mgr: NetworkMgr,
95}
96
97const QUEUE_PREFIX: &'static str = "broker.network";
98
99impl NetworkMgr {
100 pub fn new(
102 conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
103 host_uri: &Url,
104 opts: Options,
105 handler: Arc<dyn EventHandler>,
106 ) -> Result<Self, String> {
107 let conn = get_connection(&conn_pool, host_uri)?;
108
109 let (uldata, dldata, _, dldata_result) = new_data_queues(&conn, &opts, QUEUE_PREFIX, true)?;
110 let ctrl = new_ctrl_queues(&conn, &opts, QUEUE_PREFIX)?;
111
112 let mgr = NetworkMgr {
113 opts: Arc::new(opts),
114 conn_pool,
115 host_uri: host_uri.to_string(),
116 uldata,
117 dldata,
118 dldata_result,
119 ctrl,
120 status: Arc::new(Mutex::new(MgrStatus::NotReady)),
121 handler: Arc::new(Mutex::new(handler)),
122 };
123 let mq_handler = Arc::new(MgrMqEventHandler { mgr: mgr.clone() });
124 let mut q = { mgr.uldata.lock().unwrap().clone() };
125 q.set_handler(mq_handler.clone());
126 q.set_msg_handler(mq_handler.clone());
127 if let Err(e) = q.connect() {
128 return Err(e.to_string());
129 }
130 let mut q = { mgr.dldata.lock().unwrap().clone() };
131 q.set_handler(mq_handler.clone());
132 if let Err(e) = q.connect() {
133 return Err(e.to_string());
134 }
135 let mut q = { mgr.dldata_result.lock().unwrap().clone() };
136 q.set_handler(mq_handler.clone());
137 q.set_msg_handler(mq_handler.clone());
138 if let Err(e) = q.connect() {
139 return Err(e.to_string());
140 }
141 let mut q = { mgr.ctrl.lock().unwrap().clone() };
142 q.set_handler(mq_handler.clone());
143 if let Err(e) = q.connect() {
144 return Err(e.to_string());
145 }
146 match conn {
147 Connection::Amqp(_, counter) => {
148 *counter.lock().unwrap() += 4;
149 }
150 Connection::Mqtt(_, counter) => {
151 *counter.lock().unwrap() += 4;
152 }
153 }
154 Ok(mgr)
155 }
156
157 pub fn unit_id(&self) -> &str {
159 self.opts.unit_id.as_str()
160 }
161
162 pub fn unit_code(&self) -> &str {
164 self.opts.unit_code.as_str()
165 }
166
167 pub fn id(&self) -> &str {
169 self.opts.id.as_str()
170 }
171
172 pub fn name(&self) -> &str {
174 self.opts.name.as_str()
175 }
176
177 pub fn status(&self) -> MgrStatus {
179 *self.status.lock().unwrap()
180 }
181
182 pub fn mq_status(&self) -> MgrMqStatus {
184 MgrMqStatus {
185 uldata: { self.uldata.lock().unwrap().status() },
186 dldata: { self.dldata.lock().unwrap().status() },
187 dldata_resp: QueueStatus::Closed,
188 dldata_result: { self.dldata_result.lock().unwrap().status() },
189 ctrl: { self.ctrl.lock().unwrap().status() },
190 }
191 }
192
193 pub async fn close(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
195 let mut q = { self.uldata.lock().unwrap().clone() };
196 q.close().await?;
197 let mut q = { self.dldata.lock().unwrap().clone() };
198 q.close().await?;
199 let mut q = { self.dldata_result.lock().unwrap().clone() };
200 q.close().await?;
201 let mut q = { self.ctrl.lock().unwrap().clone() };
202 q.close().await?;
203
204 remove_connection(&self.conn_pool, &self.host_uri, 4).await
205 }
206
207 pub fn send_dldata(&self, data: &DlData) -> Result<(), Box<dyn StdError>> {
209 let payload = serde_json::to_vec(data)?;
210 let queue = { (*self.dldata.lock().unwrap()).clone() };
211 task::spawn(async move {
212 let _ = queue.send_msg(payload).await;
213 });
214 Ok(())
215 }
216
217 pub async fn send_ctrl(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
219 let queue = { (*self.ctrl.lock().unwrap()).clone() };
220 queue.send_msg(payload).await
221 }
222}
223
224#[async_trait]
225impl QueueEventHandler for MgrMqEventHandler {
226 async fn on_error(&self, _queue: Arc<dyn GmqQueue>, _err: Box<dyn StdError + Send + Sync>) {}
227
228 async fn on_status(&self, _queue: Arc<dyn GmqQueue>, _status: QueueStatus) {
229 let uldata_status = { self.mgr.uldata.lock().unwrap().status() };
230 let dldata_status = { self.mgr.dldata.lock().unwrap().status() };
231 let dldata_result_status = { self.mgr.dldata_result.lock().unwrap().status() };
232 let ctrl_status = { self.mgr.ctrl.lock().unwrap().status() };
233
234 let status = match uldata_status == QueueStatus::Connected
235 && dldata_status == QueueStatus::Connected
236 && dldata_result_status == QueueStatus::Connected
237 && ctrl_status == QueueStatus::Connected
238 {
239 false => MgrStatus::NotReady,
240 true => MgrStatus::Ready,
241 };
242
243 let mut changed = false;
244 {
245 let mut mutex = self.mgr.status.lock().unwrap();
246 if *mutex != status {
247 *mutex = status;
248 changed = true;
249 }
250 }
251 if changed {
252 let handler = { self.mgr.handler.lock().unwrap().clone() };
253 handler.on_status_change(&self.mgr, status).await;
254 }
255 }
256}
257
258#[async_trait]
259impl MessageHandler for MgrMqEventHandler {
260 async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
262 const FN_NAME: &'static str = "NetworkMgr.on_message";
263
264 let queue_name = queue.name();
265 if queue_name.cmp(self.mgr.uldata.lock().unwrap().name()) == Ordering::Equal {
266 let data = match serde_json::from_slice::<UlData>(msg.payload()) {
267 Err(_) => {
268 warn!("[{}] invalid format from {}", FN_NAME, queue_name);
269 if let Err(e) = msg.ack().await {
270 error!("[{}] ACK message error: {}", FN_NAME, e);
271 }
272 return;
273 }
274 Ok(mut data) => {
275 let time = match DateTime::parse_from_rfc3339(data.time.as_str()) {
276 Err(e) => {
277 warn!(
278 "[{}] invalid time format from {}: {}",
279 FN_NAME, queue_name, e
280 );
281 if let Err(e) = msg.ack().await {
282 error!("[{}] ACK message error: {}", FN_NAME, e);
283 }
284 return;
285 }
286 Ok(time) => time.into(),
287 };
288 data.time = strings::time_str(&time);
289 if data.network_addr.len() == 0 {
290 warn!(
291 "[{}] invalid network_addr format from {}",
292 FN_NAME, queue_name,
293 );
294 if let Err(e) = msg.ack().await {
295 error!("[{}] ACK message error: {}", FN_NAME, e);
296 }
297 return;
298 }
299 data.network_addr = data.network_addr.to_lowercase();
300 if data.data.len() > 0 {
301 if let Err(_) = hex::decode(data.data.as_str()) {
302 warn!("[{}] invalid data format from {}", FN_NAME, queue_name);
303 if let Err(e) = msg.ack().await {
304 error!("[{}] ACK message error: {}", FN_NAME, e);
305 }
306 return;
307 }
308 data.data = data.data.to_lowercase();
309 }
310 data
311 }
312 };
313 let handler = { self.mgr.handler.lock().unwrap().clone() };
314 match handler.on_uldata(&self.mgr, Box::new(data)).await {
315 Err(_) => {
316 if let Err(e) = msg.nack().await {
317 error!("[{}] NACK message error: {}", FN_NAME, e);
318 }
319 }
320 Ok(_) => {
321 if let Err(e) = msg.ack().await {
322 error!("[{}] ACK message error: {}", FN_NAME, e);
323 }
324 }
325 }
326 } else if queue_name.cmp(self.mgr.dldata_result.lock().unwrap().name()) == Ordering::Equal {
327 let data = match serde_json::from_slice::<DlDataResult>(msg.payload()) {
328 Err(_) => {
329 warn!("[{}] invalid format from {}", FN_NAME, queue_name);
330 if let Err(e) = msg.ack().await {
331 error!("[{}] ACK message error: {}", FN_NAME, e);
332 }
333 return;
334 }
335 Ok(data) => {
336 if data.data_id.len() == 0 {
337 warn!("[{}] invalid data_id format from {}", FN_NAME, queue_name);
338 if let Err(e) = msg.ack().await {
339 error!("[{}] ACK message error: {}", FN_NAME, e);
340 }
341 return;
342 }
343 if let Some(message) = data.message.as_ref() {
344 if message.len() == 0 {
345 warn!("[{}] invalid message format from {}", FN_NAME, queue_name);
346 if let Err(e) = msg.ack().await {
347 error!("[{}] ACK message error: {}", FN_NAME, e);
348 }
349 return;
350 }
351 }
352 data
353 }
354 };
355 let handler = { self.mgr.handler.lock().unwrap().clone() };
356 match handler.on_dldata_result(&self.mgr, Box::new(data)).await {
357 Err(_) => {
358 if let Err(e) = msg.nack().await {
359 error!("[{}] NACK message error: {}", FN_NAME, e);
360 }
361 }
362 Ok(_) => {
363 if let Err(e) = msg.ack().await {
364 error!("[{}] ACK message error: {}", FN_NAME, e);
365 }
366 }
367 }
368 }
369 }
370}