sylvia_iot_broker/libs/mq/
application.rs1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 error::Error as StdError,
5 sync::{Arc, Mutex},
6};
7
8use async_trait::async_trait;
9use hex;
10use log::{error, warn};
11use serde::{Deserialize, Serialize};
12use serde_json::{self, Map, Value};
13use tokio::task;
14use url::Url;
15
16use general_mq::{
17 queue::{
18 EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus,
19 },
20 Queue,
21};
22use sylvia_iot_corelib::{err, strings};
23
24use super::{
25 get_connection, new_data_queues, remove_connection, Connection, MgrMqStatus, MgrStatus, Options,
26};
27
28#[derive(Serialize)]
30pub struct UlData {
31 #[serde(rename = "dataId")]
32 pub data_id: String,
33 pub time: String,
34 #[serde(rename = "pub")]
35 pub publish: String,
36 #[serde(rename = "deviceId")]
37 pub device_id: String,
38 #[serde(rename = "networkId")]
39 pub network_id: String,
40 #[serde(rename = "networkCode")]
41 pub network_code: String,
42 #[serde(rename = "networkAddr")]
43 pub network_addr: String,
44 #[serde(rename = "isPublic")]
45 pub is_public: bool,
46 pub profile: String,
47 pub data: String,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub extension: Option<Map<String, Value>>,
50}
51
52#[derive(Deserialize)]
54pub struct DlData {
55 #[serde(rename = "correlationId")]
56 pub correlation_id: String,
57 #[serde(rename = "deviceId")]
58 pub device_id: Option<String>,
59 #[serde(rename = "networkCode")]
60 pub network_code: Option<String>,
61 #[serde(rename = "networkAddr")]
62 pub network_addr: Option<String>,
63 pub data: String,
64 pub extension: Option<Map<String, Value>>,
65}
66
67#[derive(Default, Serialize)]
69pub struct DlDataResp {
70 #[serde(rename = "correlationId")]
71 pub correlation_id: String,
72 #[serde(rename = "dataId", skip_serializing_if = "Option::is_none")]
73 pub data_id: Option<String>,
74 #[serde(skip_serializing_if = "Option::is_none")]
75 pub error: Option<String>,
76 #[serde(skip_serializing_if = "Option::is_none")]
77 pub message: Option<String>,
78}
79
80#[derive(Serialize)]
82pub struct DlDataResult {
83 #[serde(rename = "dataId")]
84 pub data_id: String,
85 pub status: i32,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub message: Option<String>,
88}
89
90#[derive(Clone)]
92pub struct ApplicationMgr {
93 opts: Arc<Options>,
94
95 conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
97 host_uri: String,
98
99 uldata: Arc<Mutex<Queue>>,
100 dldata: Arc<Mutex<Queue>>,
101 dldata_resp: Arc<Mutex<Queue>>,
102 dldata_result: Arc<Mutex<Queue>>,
103
104 status: Arc<Mutex<MgrStatus>>,
105 handler: Arc<Mutex<Arc<dyn EventHandler>>>,
106}
107
108#[async_trait]
110pub trait EventHandler: Send + Sync {
111 async fn on_status_change(&self, mgr: &ApplicationMgr, status: MgrStatus);
112
113 async fn on_dldata(
114 &self,
115 mgr: &ApplicationMgr,
116 data: Box<DlData>,
117 ) -> Result<Box<DlDataResp>, ()>;
118}
119
120struct MgrMqEventHandler {
122 mgr: ApplicationMgr,
123}
124
125const QUEUE_PREFIX: &'static str = "broker.application";
126
127impl ApplicationMgr {
128 pub fn new(
130 conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
131 host_uri: &Url,
132 opts: Options,
133 handler: Arc<dyn EventHandler>,
134 ) -> Result<Self, String> {
135 if opts.unit_id.len() == 0 {
136 return Err("`unit_id` cannot be empty for application".to_string());
137 }
138
139 let conn = get_connection(&conn_pool, host_uri)?;
140
141 let (uldata, dldata, dldata_resp, dldata_result) =
142 new_data_queues(&conn, &opts, QUEUE_PREFIX, false)?;
143
144 let mgr = ApplicationMgr {
145 opts: Arc::new(opts),
146 conn_pool,
147 host_uri: host_uri.to_string(),
148 uldata,
149 dldata,
150 dldata_resp: dldata_resp.unwrap(),
151 dldata_result,
152 status: Arc::new(Mutex::new(MgrStatus::NotReady)),
153 handler: Arc::new(Mutex::new(handler)),
154 };
155 let mq_handler = Arc::new(MgrMqEventHandler { mgr: mgr.clone() });
156 let mut q = { mgr.uldata.lock().unwrap().clone() };
157 q.set_handler(mq_handler.clone());
158 if let Err(e) = q.connect() {
159 return Err(e.to_string());
160 }
161 let mut q = { mgr.dldata.lock().unwrap().clone() };
162 q.set_handler(mq_handler.clone());
163 q.set_msg_handler(mq_handler.clone());
164 if let Err(e) = q.connect() {
165 return Err(e.to_string());
166 }
167 let mut q = { mgr.dldata_resp.lock().unwrap().clone() };
168 q.set_handler(mq_handler.clone());
169 if let Err(e) = q.connect() {
170 return Err(e.to_string());
171 }
172 let mut q = { mgr.dldata_result.lock().unwrap().clone() };
173 q.set_handler(mq_handler.clone());
174 if let Err(e) = q.connect() {
175 return Err(e.to_string());
176 }
177 match conn {
178 Connection::Amqp(_, counter) => {
179 *counter.lock().unwrap() += 4;
180 }
181 Connection::Mqtt(_, counter) => {
182 *counter.lock().unwrap() += 4;
183 }
184 }
185 Ok(mgr)
186 }
187
188 pub fn unit_id(&self) -> &str {
190 self.opts.unit_id.as_str()
191 }
192
193 pub fn unit_code(&self) -> &str {
195 self.opts.unit_code.as_str()
196 }
197
198 pub fn id(&self) -> &str {
200 self.opts.id.as_str()
201 }
202
203 pub fn name(&self) -> &str {
205 self.opts.name.as_str()
206 }
207
208 pub fn status(&self) -> MgrStatus {
210 *self.status.lock().unwrap()
211 }
212
213 pub fn mq_status(&self) -> MgrMqStatus {
215 MgrMqStatus {
216 uldata: { self.uldata.lock().unwrap().status() },
217 dldata: { self.dldata.lock().unwrap().status() },
218 dldata_resp: { self.dldata_resp.lock().unwrap().status() },
219 dldata_result: { self.dldata_result.lock().unwrap().status() },
220 ctrl: QueueStatus::Closed,
221 }
222 }
223
224 pub async fn close(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
226 let mut q = { self.uldata.lock().unwrap().clone() };
227 q.close().await?;
228 let mut q = { self.dldata.lock().unwrap().clone() };
229 q.close().await?;
230 let mut q = { self.dldata_resp.lock().unwrap().clone() };
231 q.close().await?;
232 let mut q = { self.dldata_result.lock().unwrap().clone() };
233 q.close().await?;
234
235 remove_connection(&self.conn_pool, &self.host_uri, 4).await
236 }
237
238 pub fn send_uldata(&self, data: &UlData) -> Result<(), Box<dyn StdError>> {
240 let payload = serde_json::to_vec(data)?;
241 let queue = { (*self.uldata.lock().unwrap()).clone() };
242 task::spawn(async move {
243 let _ = queue.send_msg(payload).await;
244 });
245 Ok(())
246 }
247
248 pub async fn send_dldata_resp(
250 &self,
251 data: &DlDataResp,
252 ) -> Result<(), Box<dyn StdError + Send + Sync>> {
253 let payload = serde_json::to_vec(data)?;
254 let queue = { (*self.dldata_resp.lock().unwrap()).clone() };
255 queue.send_msg(payload).await
256 }
257
258 pub async fn send_dldata_result(
260 &self,
261 data: &DlDataResult,
262 ) -> Result<(), Box<dyn StdError + Send + Sync>> {
263 let payload = serde_json::to_vec(data)?;
264 let queue = { (*self.dldata_result.lock().unwrap()).clone() };
265 queue.send_msg(payload).await
266 }
267}
268
269#[async_trait]
270impl QueueEventHandler for MgrMqEventHandler {
271 async fn on_error(&self, _queue: Arc<dyn GmqQueue>, _err: Box<dyn StdError + Send + Sync>) {}
272
273 async fn on_status(&self, _queue: Arc<dyn GmqQueue>, _status: QueueStatus) {
274 let status = match { self.mgr.uldata.lock().unwrap().status() } == QueueStatus::Connected
275 && { self.mgr.dldata.lock().unwrap().status() } == QueueStatus::Connected
276 && { self.mgr.dldata_resp.lock().unwrap().status() } == QueueStatus::Connected
277 && { self.mgr.dldata_result.lock().unwrap().status() } == QueueStatus::Connected
278 {
279 false => MgrStatus::NotReady,
280 true => MgrStatus::Ready,
281 };
282
283 let mut changed = false;
284 {
285 let mut mutex = self.mgr.status.lock().unwrap();
286 if *mutex != status {
287 *mutex = status;
288 changed = true;
289 }
290 }
291 if changed {
292 let handler = { self.mgr.handler.lock().unwrap().clone() };
293 handler.on_status_change(&self.mgr, status).await;
294 }
295 }
296}
297
298#[async_trait]
299impl MessageHandler for MgrMqEventHandler {
300 async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
302 const FN_NAME: &'static str = "ApplicationMgr.on_message";
303
304 let queue_name = queue.name();
305 if queue_name.cmp(self.mgr.dldata.lock().unwrap().name()) == Ordering::Equal {
306 let data = match parse_dldata_msg(msg.payload()) {
307 Err(resp) => {
308 warn!("[{}] invalid format from {}", FN_NAME, queue_name);
309 if let Err(e) = msg.ack().await {
310 error!("[{}] ACK message error: {}", FN_NAME, e);
311 }
312 if let Err(e) = self.mgr.send_dldata_resp(&resp).await {
313 error!("[{}] send response error: {}", FN_NAME, e);
314 }
315 return;
316 }
317 Ok(data) => data,
318 };
319 let handler = { self.mgr.handler.lock().unwrap().clone() };
320 match handler.on_dldata(&self.mgr, Box::new(data)).await {
321 Err(_) => {
322 if let Err(e) = msg.nack().await {
323 error!("[{}] NACK message error: {}", FN_NAME, e);
324 }
325 }
326 Ok(resp) => {
327 if let Err(e) = msg.ack().await {
328 error!("[{}] ACK message error: {}", FN_NAME, e);
329 }
330 if let Err(e) = self.mgr.send_dldata_resp(resp.as_ref()).await {
331 error!("[{}] send response error: {}", FN_NAME, e);
332 }
333 }
334 }
335 }
336 }
337}
338
339fn parse_dldata_msg(msg: &[u8]) -> Result<DlData, DlDataResp> {
341 let mut data = match serde_json::from_slice::<DlData>(msg) {
342 Err(_) => {
343 return Err(DlDataResp {
344 correlation_id: "".to_string(),
345 error: Some(err::E_PARAM.to_string()),
346 message: Some("invalid format".to_string()),
347 ..Default::default()
348 });
349 }
350 Ok(data) => data,
351 };
352
353 if data.correlation_id.len() == 0 {
354 return Err(DlDataResp {
355 correlation_id: data.correlation_id.clone(),
356 error: Some(err::E_PARAM.to_string()),
357 message: Some("invalid `correlationId`".to_string()),
358 ..Default::default()
359 });
360 }
361 match data.device_id.as_ref() {
362 None => {
363 match data.network_code.as_ref() {
364 None => {
365 return Err(DlDataResp {
366 correlation_id: data.correlation_id.clone(),
367 error: Some(err::E_PARAM.to_string()),
368 message: Some("missing `networkCode`".to_string()),
369 ..Default::default()
370 });
371 }
372 Some(code) => {
373 let code = code.to_lowercase();
374 match strings::is_code(code.as_str()) {
375 false => {
376 return Err(DlDataResp {
377 correlation_id: data.correlation_id.clone(),
378 error: Some(err::E_PARAM.to_string()),
379 message: Some("invalid `networkCode`".to_string()),
380 ..Default::default()
381 });
382 }
383 true => {
384 data.network_code = Some(code);
385 ()
386 }
387 }
388 }
389 }
390 match data.network_addr.as_ref() {
391 None => {
392 return Err(DlDataResp {
393 correlation_id: data.correlation_id.clone(),
394 error: Some(err::E_PARAM.to_string()),
395 message: Some("missing `networkAddr`".to_string()),
396 ..Default::default()
397 });
398 }
399 Some(addr) => match addr.len() {
400 0 => {
401 return Err(DlDataResp {
402 correlation_id: data.correlation_id.clone(),
403 error: Some(err::E_PARAM.to_string()),
404 message: Some("invalid `networkAddr`".to_string()),
405 ..Default::default()
406 });
407 }
408 _ => {
409 data.network_addr = Some(addr.to_lowercase());
410 ()
411 }
412 },
413 }
414 }
415 Some(device_id) => match device_id.len() {
416 0 => {
417 return Err(DlDataResp {
418 correlation_id: data.correlation_id.clone(),
419 error: Some(err::E_PARAM.to_string()),
420 message: Some("invalid `deviceId`".to_string()),
421 ..Default::default()
422 });
423 }
424 _ => (),
425 },
426 }
427 if data.data.len() > 0 {
428 if let Err(_) = hex::decode(data.data.as_str()) {
429 return Err(DlDataResp {
430 correlation_id: data.correlation_id.clone(),
431 error: Some(err::E_PARAM.to_string()),
432 message: Some("invalid `data`".to_string()),
433 ..Default::default()
434 });
435 }
436 data.data = data.data.to_lowercase();
437 }
438 Ok(data)
439}