sylvia_iot_coremgr/libs/mq/
mod.rsuse std::{
collections::HashMap,
fmt,
sync::{Arc, Mutex},
};
use url::Url;
use general_mq::{
connection::GmqConnection, AmqpConnection, AmqpConnectionOptions, MqttConnection,
MqttConnectionOptions,
};
pub mod data;
pub mod emqx;
pub mod rabbitmq;
pub mod rumqttd;
#[derive(Clone)]
pub enum Connection {
Amqp(AmqpConnection, Arc<Mutex<isize>>),
Mqtt(MqttConnection, Arc<Mutex<isize>>),
}
pub enum QueueType {
Application,
Network,
}
impl fmt::Display for QueueType {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
QueueType::Application => fmt.write_str("application"),
QueueType::Network => fmt.write_str("network"),
}
}
}
impl Copy for QueueType {}
impl Clone for QueueType {
fn clone(&self) -> QueueType {
*self
}
}
pub fn to_username(q_type: QueueType, unit: &str, code: &str) -> String {
format!("{}.{}.{}", q_type, unit_code(unit), code)
}
fn unit_code(code: &str) -> &str {
match code {
"" => "_",
_ => code,
}
}
fn get_connection(
conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
host_uri: &Url,
) -> Result<Connection, String> {
let uri = host_uri.to_string();
{
let mutex = conn_pool.lock().unwrap();
if let Some(conn) = mutex.get(&uri) {
return Ok(conn.clone());
}
}
match host_uri.scheme() {
"amqp" | "amqps" => {
let opts = AmqpConnectionOptions {
uri: host_uri.to_string(),
..Default::default()
};
let mut conn = AmqpConnection::new(opts)?;
let _ = conn.connect();
let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
{
conn_pool.lock().unwrap().insert(uri, conn.clone());
}
Ok(conn)
}
"mqtt" | "mqtts" => {
let opts = MqttConnectionOptions {
uri: host_uri.to_string(),
..Default::default()
};
let mut conn = MqttConnection::new(opts)?;
let _ = conn.connect();
let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
{
conn_pool.lock().unwrap().insert(uri, conn.clone());
}
Ok(conn)
}
s => Err(format!("unsupport scheme {}", s)),
}
}