1use std::{
15 collections::HashMap,
16 error::Error as StdError,
17 sync::{Arc, Mutex},
18};
19
20use serde::{Deserialize, Serialize};
21use url::Url;
22
23use general_mq::{
24 connection::GmqConnection, queue::Status, AmqpConnection, AmqpConnectionOptions,
25 AmqpQueueOptions, MqttConnection, MqttConnectionOptions, MqttQueueOptions, Queue, QueueOptions,
26};
27
28pub mod application;
29pub mod control;
30pub mod data;
31pub mod network;
32
33#[derive(Clone)]
35pub enum Connection {
36 Amqp(AmqpConnection, Arc<Mutex<isize>>),
37 Mqtt(MqttConnection, Arc<Mutex<isize>>),
38}
39
40#[derive(PartialEq)]
42pub enum MgrStatus {
43 NotReady,
45 Ready,
47}
48
49pub struct MgrMqStatus {
51 pub uldata: Status,
53 pub dldata: Status,
55 pub dldata_resp: Status,
57 pub dldata_result: Status,
59 pub ctrl: Status,
61}
62
63#[derive(Default, Deserialize, Serialize)]
65pub struct Options {
66 #[serde(rename = "unitId")]
68 pub unit_id: String,
69 #[serde(rename = "unitCode")]
71 pub unit_code: String,
72 pub id: String,
74 pub name: String,
76 #[serde(skip_serializing_if = "Option::is_none")]
78 pub prefetch: Option<u16>,
79 pub persistent: bool,
80 #[serde(rename = "sharedPrefix", skip_serializing_if = "Option::is_none")]
82 pub shared_prefix: Option<String>,
83}
84
85pub const SUPPORT_SCHEMES: &'static [&'static str] = &["amqp", "amqps", "mqtt", "mqtts"];
87
88const DEF_PREFETCH: u16 = 100;
90
91impl Copy for MgrStatus {}
92
93impl Clone for MgrStatus {
94 fn clone(&self) -> MgrStatus {
95 *self
96 }
97}
98
99fn get_connection(
102 conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
103 host_uri: &Url,
104) -> Result<Connection, String> {
105 let uri = host_uri.to_string();
106 let mut mutex = conn_pool.lock().unwrap();
107 if let Some(conn) = mutex.get(&uri) {
108 return Ok(conn.clone());
109 }
110
111 match host_uri.scheme() {
112 "amqp" | "amqps" => {
113 let opts = AmqpConnectionOptions {
114 uri: host_uri.to_string(),
115 ..Default::default()
116 };
117 let mut conn = AmqpConnection::new(opts)?;
118 let _ = conn.connect();
119 let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
120 mutex.insert(uri, conn.clone());
121 Ok(conn)
122 }
123 "mqtt" | "mqtts" => {
124 let opts = MqttConnectionOptions {
125 uri: host_uri.to_string(),
126 ..Default::default()
127 };
128 let mut conn = MqttConnection::new(opts)?;
129 let _ = conn.connect();
130 let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
131 mutex.insert(uri, conn.clone());
132 Ok(conn)
133 }
134 s => Err(format!("unsupport scheme {}", s)),
135 }
136}
137
138async fn remove_connection(
140 conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
141 host_uri: &String,
142 count: isize,
143) -> Result<(), Box<dyn StdError + Send + Sync>> {
144 let conn = {
145 let mut mutex = conn_pool.lock().unwrap();
146 match mutex.get(host_uri) {
147 None => return Ok(()),
148 Some(conn) => match conn {
149 Connection::Amqp(_, counter) => {
150 let mut mutex = counter.lock().unwrap();
151 *mutex -= count;
152 if *mutex > 0 {
153 return Ok(());
154 }
155 }
156 Connection::Mqtt(_, counter) => {
157 let mut mutex = counter.lock().unwrap();
158 *mutex -= count;
159 if *mutex > 0 {
160 return Ok(());
161 }
162 }
163 },
164 }
165 mutex.remove(host_uri)
166 };
167 if let Some(conn) = conn {
168 match conn {
169 Connection::Amqp(mut conn, _) => {
170 conn.close().await?;
171 }
172 Connection::Mqtt(mut conn, _) => {
173 conn.close().await?;
174 }
175 }
176 }
177 Ok(())
178}
179
180fn new_ctrl_queues(
183 conn: &Connection,
184 opts: &Options,
185 prefix: &str,
186) -> Result<Arc<Mutex<Queue>>, String> {
187 let ctrl: Arc<Mutex<Queue>>;
188
189 if opts.unit_id.len() == 0 {
190 if opts.unit_code.len() != 0 {
191 return Err("unit_id and unit_code must both empty or non-empty".to_string());
192 }
193 } else {
194 if opts.unit_code.len() == 0 {
195 return Err("unit_id and unit_code must both empty or non-empty".to_string());
196 }
197 }
198 if opts.id.len() == 0 {
199 return Err("`id` cannot be empty".to_string());
200 }
201 if opts.name.len() == 0 {
202 return Err("`name` cannot be empty".to_string());
203 }
204
205 let unit = match opts.unit_code.len() {
206 0 => "_",
207 _ => opts.unit_code.as_str(),
208 };
209
210 match conn {
211 Connection::Amqp(conn, _) => {
212 let prefetch = match opts.prefetch {
213 None => DEF_PREFETCH,
214 Some(prefetch) => match prefetch {
215 0 => DEF_PREFETCH,
216 _ => prefetch,
217 },
218 };
219
220 let ctrl_opts = QueueOptions::Amqp(
221 AmqpQueueOptions {
222 name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
223 is_recv: false,
224 reliable: true,
225 broadcast: false,
226 prefetch,
227 ..Default::default()
228 },
229 conn,
230 );
231 ctrl = Arc::new(Mutex::new(Queue::new(ctrl_opts)?));
232 }
233 Connection::Mqtt(conn, _) => {
234 let ctrl_opts = QueueOptions::Mqtt(
235 MqttQueueOptions {
236 name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
237 is_recv: false,
238 reliable: true,
239 broadcast: false,
240 shared_prefix: opts.shared_prefix.clone(),
241 ..Default::default()
242 },
243 conn,
244 );
245 ctrl = Arc::new(Mutex::new(Queue::new(ctrl_opts)?));
246 }
247 }
248
249 Ok(ctrl)
250}
251
252fn new_data_queues(
258 conn: &Connection,
259 opts: &Options,
260 prefix: &str,
261 is_network: bool,
262) -> Result<
263 (
264 Arc<Mutex<Queue>>,
265 Arc<Mutex<Queue>>,
266 Option<Arc<Mutex<Queue>>>,
267 Arc<Mutex<Queue>>,
268 ),
269 String,
270> {
271 let uldata: Arc<Mutex<Queue>>;
272 let dldata: Arc<Mutex<Queue>>;
273 let dldata_resp: Option<Arc<Mutex<Queue>>>;
274 let dldata_result: Arc<Mutex<Queue>>;
275
276 if opts.unit_id.len() == 0 {
277 if opts.unit_code.len() != 0 {
278 return Err("unit_id and unit_code must both empty or non-empty".to_string());
279 }
280 } else {
281 if opts.unit_code.len() == 0 {
282 return Err("unit_id and unit_code must both empty or non-empty".to_string());
283 }
284 }
285 if opts.id.len() == 0 {
286 return Err("`id` cannot be empty".to_string());
287 }
288 if opts.name.len() == 0 {
289 return Err("`name` cannot be empty".to_string());
290 }
291
292 let unit = match opts.unit_code.len() {
293 0 => "_",
294 _ => opts.unit_code.as_str(),
295 };
296
297 match conn {
298 Connection::Amqp(conn, _) => {
299 let prefetch = match opts.prefetch {
300 None => DEF_PREFETCH,
301 Some(prefetch) => match prefetch {
302 0 => DEF_PREFETCH,
303 _ => prefetch,
304 },
305 };
306
307 let uldata_opts = QueueOptions::Amqp(
308 AmqpQueueOptions {
309 name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
310 is_recv: is_network,
311 reliable: true,
312 persistent: opts.persistent,
313 broadcast: false,
314 prefetch,
315 ..Default::default()
316 },
317 conn,
318 );
319 let dldata_opts = QueueOptions::Amqp(
320 AmqpQueueOptions {
321 name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
322 is_recv: !is_network,
323 reliable: true,
324 persistent: opts.persistent,
325 broadcast: false,
326 prefetch,
327 ..Default::default()
328 },
329 conn,
330 );
331 let dldata_resp_opts = QueueOptions::Amqp(
332 AmqpQueueOptions {
333 name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
334 is_recv: is_network,
335 reliable: true,
336 persistent: opts.persistent,
337 broadcast: false,
338 prefetch,
339 ..Default::default()
340 },
341 conn,
342 );
343 let dldata_result_opts = QueueOptions::Amqp(
344 AmqpQueueOptions {
345 name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
346 is_recv: is_network,
347 reliable: true,
348 persistent: opts.persistent,
349 broadcast: false,
350 prefetch,
351 ..Default::default()
352 },
353 conn,
354 );
355 uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
356 dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
357 dldata_resp = match is_network {
358 false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
359 true => None,
360 };
361 dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
362 }
363 Connection::Mqtt(conn, _) => {
364 let uldata_opts = QueueOptions::Mqtt(
365 MqttQueueOptions {
366 name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
367 is_recv: is_network,
368 reliable: true,
369 broadcast: false,
370 shared_prefix: opts.shared_prefix.clone(),
371 ..Default::default()
372 },
373 conn,
374 );
375 let dldata_opts = QueueOptions::Mqtt(
376 MqttQueueOptions {
377 name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
378 is_recv: !is_network,
379 reliable: true,
380 broadcast: false,
381 shared_prefix: opts.shared_prefix.clone(),
382 ..Default::default()
383 },
384 conn,
385 );
386 let dldata_resp_opts = QueueOptions::Mqtt(
387 MqttQueueOptions {
388 name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
389 is_recv: is_network,
390 reliable: true,
391 broadcast: false,
392 shared_prefix: opts.shared_prefix.clone(),
393 ..Default::default()
394 },
395 conn,
396 );
397 let dldata_result_opts = QueueOptions::Mqtt(
398 MqttQueueOptions {
399 name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
400 is_recv: is_network,
401 reliable: true,
402 broadcast: false,
403 shared_prefix: opts.shared_prefix.clone(),
404 ..Default::default()
405 },
406 conn,
407 );
408 uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
409 dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
410 dldata_resp = match is_network {
411 false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
412 true => None,
413 };
414 dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
415 }
416 }
417
418 Ok((uldata, dldata, dldata_resp, dldata_result))
419}