sylvia_iot_broker/libs/mq/
data.rs1use std::{
2 collections::HashMap,
3 sync::{Arc, Mutex},
4};
5
6use url::Url;
7
8use general_mq::{
9 queue::{EventHandler, GmqQueue},
10 AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
11};
12
13use super::{get_connection, Connection};
14
15const QUEUE_NAME: &'static str = "broker.data";
16
17pub fn new(
19 conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
20 host_uri: &Url,
21 persistent: bool,
22 handler: Arc<dyn EventHandler>,
23) -> Result<Queue, String> {
24 let conn = get_connection(&conn_pool, host_uri)?;
25 let mut queue = match conn {
26 Connection::Amqp(conn, counter) => {
27 let opts = QueueOptions::Amqp(
28 AmqpQueueOptions {
29 name: QUEUE_NAME.to_string(),
30 is_recv: false,
31 reliable: true,
32 persistent,
33 broadcast: false,
34 ..Default::default()
35 },
36 &conn,
37 );
38 {
39 *counter.lock().unwrap() += 1;
40 }
41 Queue::new(opts)?
42 }
43 Connection::Mqtt(conn, counter) => {
44 let opts = QueueOptions::Mqtt(
45 MqttQueueOptions {
46 name: QUEUE_NAME.to_string(),
47 is_recv: false,
48 reliable: true,
49 broadcast: false,
50 ..Default::default()
51 },
52 &conn,
53 );
54 {
55 *counter.lock().unwrap() += 1;
56 }
57 Queue::new(opts)?
58 }
59 };
60 queue.set_handler(handler);
61 if let Err(e) = queue.connect() {
62 return Err(e.to_string());
63 }
64 Ok(queue)
65}