use std::{
collections::HashMap,
error::Error as StdError,
sync::{Arc, Mutex},
};
use serde::{Deserialize, Serialize};
use url::Url;
use general_mq::{
connection::GmqConnection, queue::Status, AmqpConnection, AmqpConnectionOptions,
AmqpQueueOptions, MqttConnection, MqttConnectionOptions, MqttQueueOptions, Queue, QueueOptions,
};
pub mod application;
pub mod control;
pub mod data;
pub mod network;
#[derive(Clone)]
pub enum Connection {
Amqp(AmqpConnection, Arc<Mutex<isize>>),
Mqtt(MqttConnection, Arc<Mutex<isize>>),
}
#[derive(PartialEq)]
pub enum MgrStatus {
NotReady,
Ready,
}
pub struct MgrMqStatus {
pub uldata: Status,
pub dldata: Status,
pub dldata_resp: Status,
pub dldata_result: Status,
pub ctrl: Status,
}
#[derive(Default, Deserialize, Serialize)]
pub struct Options {
#[serde(rename = "unitId")]
pub unit_id: String,
#[serde(rename = "unitCode")]
pub unit_code: String,
pub id: String,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub prefetch: Option<u16>,
pub persistent: bool,
#[serde(rename = "sharedPrefix", skip_serializing_if = "Option::is_none")]
pub shared_prefix: Option<String>,
}
pub const SUPPORT_SCHEMES: &'static [&'static str] = &["amqp", "amqps", "mqtt", "mqtts"];
const DEF_PREFETCH: u16 = 100;
impl Copy for MgrStatus {}
impl Clone for MgrStatus {
fn clone(&self) -> MgrStatus {
*self
}
}
fn get_connection(
conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
host_uri: &Url,
) -> Result<Connection, String> {
let uri = host_uri.to_string();
let mut mutex = conn_pool.lock().unwrap();
if let Some(conn) = mutex.get(&uri) {
return Ok(conn.clone());
}
match host_uri.scheme() {
"amqp" | "amqps" => {
let opts = AmqpConnectionOptions {
uri: host_uri.to_string(),
..Default::default()
};
let mut conn = AmqpConnection::new(opts)?;
let _ = conn.connect();
let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
mutex.insert(uri, conn.clone());
Ok(conn)
}
"mqtt" | "mqtts" => {
let opts = MqttConnectionOptions {
uri: host_uri.to_string(),
..Default::default()
};
let mut conn = MqttConnection::new(opts)?;
let _ = conn.connect();
let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
mutex.insert(uri, conn.clone());
Ok(conn)
}
s => Err(format!("unsupport scheme {}", s)),
}
}
async fn remove_connection(
conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
host_uri: &String,
count: isize,
) -> Result<(), Box<dyn StdError + Send + Sync>> {
let conn = {
let mut mutex = conn_pool.lock().unwrap();
match mutex.get(host_uri) {
None => return Ok(()),
Some(conn) => match conn {
Connection::Amqp(_, counter) => {
let mut mutex = counter.lock().unwrap();
*mutex -= count;
if *mutex > 0 {
return Ok(());
}
}
Connection::Mqtt(_, counter) => {
let mut mutex = counter.lock().unwrap();
*mutex -= count;
if *mutex > 0 {
return Ok(());
}
}
},
}
mutex.remove(host_uri)
};
if let Some(conn) = conn {
match conn {
Connection::Amqp(mut conn, _) => {
conn.close().await?;
}
Connection::Mqtt(mut conn, _) => {
conn.close().await?;
}
}
}
Ok(())
}
fn new_ctrl_queues(
conn: &Connection,
opts: &Options,
prefix: &str,
) -> Result<Arc<Mutex<Queue>>, String> {
let ctrl: Arc<Mutex<Queue>>;
if opts.unit_id.len() == 0 {
if opts.unit_code.len() != 0 {
return Err("unit_id and unit_code must both empty or non-empty".to_string());
}
} else {
if opts.unit_code.len() == 0 {
return Err("unit_id and unit_code must both empty or non-empty".to_string());
}
}
if opts.id.len() == 0 {
return Err("`id` cannot be empty".to_string());
}
if opts.name.len() == 0 {
return Err("`name` cannot be empty".to_string());
}
let unit = match opts.unit_code.len() {
0 => "_",
_ => opts.unit_code.as_str(),
};
match conn {
Connection::Amqp(conn, _) => {
let prefetch = match opts.prefetch {
None => DEF_PREFETCH,
Some(prefetch) => match prefetch {
0 => DEF_PREFETCH,
_ => prefetch,
},
};
let ctrl_opts = QueueOptions::Amqp(
AmqpQueueOptions {
name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
is_recv: false,
reliable: true,
broadcast: false,
prefetch,
..Default::default()
},
conn,
);
ctrl = Arc::new(Mutex::new(Queue::new(ctrl_opts)?));
}
Connection::Mqtt(conn, _) => {
let ctrl_opts = QueueOptions::Mqtt(
MqttQueueOptions {
name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
is_recv: false,
reliable: true,
broadcast: false,
shared_prefix: opts.shared_prefix.clone(),
..Default::default()
},
conn,
);
ctrl = Arc::new(Mutex::new(Queue::new(ctrl_opts)?));
}
}
Ok(ctrl)
}
fn new_data_queues(
conn: &Connection,
opts: &Options,
prefix: &str,
is_network: bool,
) -> Result<
(
Arc<Mutex<Queue>>,
Arc<Mutex<Queue>>,
Option<Arc<Mutex<Queue>>>,
Arc<Mutex<Queue>>,
),
String,
> {
let uldata: Arc<Mutex<Queue>>;
let dldata: Arc<Mutex<Queue>>;
let dldata_resp: Option<Arc<Mutex<Queue>>>;
let dldata_result: Arc<Mutex<Queue>>;
if opts.unit_id.len() == 0 {
if opts.unit_code.len() != 0 {
return Err("unit_id and unit_code must both empty or non-empty".to_string());
}
} else {
if opts.unit_code.len() == 0 {
return Err("unit_id and unit_code must both empty or non-empty".to_string());
}
}
if opts.id.len() == 0 {
return Err("`id` cannot be empty".to_string());
}
if opts.name.len() == 0 {
return Err("`name` cannot be empty".to_string());
}
let unit = match opts.unit_code.len() {
0 => "_",
_ => opts.unit_code.as_str(),
};
match conn {
Connection::Amqp(conn, _) => {
let prefetch = match opts.prefetch {
None => DEF_PREFETCH,
Some(prefetch) => match prefetch {
0 => DEF_PREFETCH,
_ => prefetch,
},
};
let uldata_opts = QueueOptions::Amqp(
AmqpQueueOptions {
name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
is_recv: is_network,
reliable: true,
persistent: opts.persistent,
broadcast: false,
prefetch,
..Default::default()
},
conn,
);
let dldata_opts = QueueOptions::Amqp(
AmqpQueueOptions {
name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
is_recv: !is_network,
reliable: true,
persistent: opts.persistent,
broadcast: false,
prefetch,
..Default::default()
},
conn,
);
let dldata_resp_opts = QueueOptions::Amqp(
AmqpQueueOptions {
name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
is_recv: is_network,
reliable: true,
persistent: opts.persistent,
broadcast: false,
prefetch,
..Default::default()
},
conn,
);
let dldata_result_opts = QueueOptions::Amqp(
AmqpQueueOptions {
name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
is_recv: is_network,
reliable: true,
persistent: opts.persistent,
broadcast: false,
prefetch,
..Default::default()
},
conn,
);
uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
dldata_resp = match is_network {
false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
true => None,
};
dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
}
Connection::Mqtt(conn, _) => {
let uldata_opts = QueueOptions::Mqtt(
MqttQueueOptions {
name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
is_recv: is_network,
reliable: true,
broadcast: false,
shared_prefix: opts.shared_prefix.clone(),
..Default::default()
},
conn,
);
let dldata_opts = QueueOptions::Mqtt(
MqttQueueOptions {
name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
is_recv: !is_network,
reliable: true,
broadcast: false,
shared_prefix: opts.shared_prefix.clone(),
..Default::default()
},
conn,
);
let dldata_resp_opts = QueueOptions::Mqtt(
MqttQueueOptions {
name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
is_recv: is_network,
reliable: true,
broadcast: false,
shared_prefix: opts.shared_prefix.clone(),
..Default::default()
},
conn,
);
let dldata_result_opts = QueueOptions::Mqtt(
MqttQueueOptions {
name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
is_recv: is_network,
reliable: true,
broadcast: false,
shared_prefix: opts.shared_prefix.clone(),
..Default::default()
},
conn,
);
uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
dldata_resp = match is_network {
false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
true => None,
};
dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
}
}
Ok((uldata, dldata, dldata_resp, dldata_result))
}