general_mq/amqp/
connection.rs

1use std::{
2    collections::HashMap,
3    error::Error as StdError,
4    str::FromStr,
5    sync::{Arc, Mutex},
6    time::Duration,
7};
8
9use amqprs::{
10    connection::{Connection as AmqprsConnection, OpenConnectionArguments},
11    error::Error as AmqprsError,
12    security::SecurityCredentials,
13    tls::TlsAdaptor,
14};
15use async_trait::async_trait;
16use lapin::uri::{AMQPScheme, AMQPUri};
17use tokio::{
18    task::{self, JoinHandle},
19    time,
20};
21
22use crate::{
23    connection::{EventHandler, GmqConnection, Status},
24    randomstring, ID_SIZE,
25};
26
27/// Manages an AMQP connection.
28#[derive(Clone)]
29pub struct AmqpConnection {
30    /// Options of the connection.
31    opts: InnerOptions,
32    /// Connection status.
33    status: Arc<Mutex<Status>>,
34    /// Hold the connection instance.
35    conn: Arc<Mutex<Option<AmqprsConnection>>>,
36    /// Event handlers.
37    handlers: Arc<Mutex<HashMap<String, Arc<dyn EventHandler>>>>,
38    /// The event loop to manage and monitor the connection instance.
39    ev_loop: Arc<Mutex<Option<JoinHandle<()>>>>,
40}
41
42/// The connection options.
43pub struct AmqpConnectionOptions {
44    /// Connection URI. Use `amqp|amqps://username:password@host:port/vhost` format.
45    ///
46    /// Default is `amqp://localhost/%2f`.
47    pub uri: String,
48    /// Connection timeout in milliseconds.
49    ///
50    /// Default or zero value is `3000`.
51    pub connect_timeout_millis: u64,
52    /// Time in milliseconds from disconnection to reconnection.
53    ///
54    /// Default or zero value is `1000`.
55    pub reconnect_millis: u64,
56}
57
58/// The validated options for management.
59#[derive(Clone)]
60struct InnerOptions {
61    /// The formatted URI resource.
62    args: OpenConnectionArguments,
63    /// Time in milliseconds from disconnection to reconnection.
64    reconnect_millis: u64,
65}
66
67/// Default connect timeout in milliseconds.
68const DEF_CONN_TIMEOUT_MS: u64 = 3000;
69/// Default reconnect time in milliseconds.
70const DEF_RECONN_TIME_MS: u64 = 1000;
71
72impl AmqpConnection {
73    /// Create a connection instance.
74    pub fn new(opts: AmqpConnectionOptions) -> Result<AmqpConnection, String> {
75        let mut uri = AMQPUri::from_str(opts.uri.as_str())?;
76        uri.query.connection_timeout = match opts.connect_timeout_millis {
77            0 => Some(DEF_CONN_TIMEOUT_MS),
78            _ => Some(opts.connect_timeout_millis),
79        };
80        if uri.vhost.len() == 0 {
81            uri.vhost = "/".to_string();
82        }
83        let mut args = OpenConnectionArguments::default();
84        args.host(&uri.authority.host)
85            .port(uri.authority.port)
86            .credentials(SecurityCredentials::new_plain(
87                &uri.authority.userinfo.username,
88                &uri.authority.userinfo.password,
89            ))
90            .virtual_host(&uri.vhost);
91        if uri.scheme == AMQPScheme::AMQPS {
92            let adaptor = match TlsAdaptor::without_client_auth(None, uri.authority.host.clone()) {
93                Err(e) => return Err(e.to_string()),
94                Ok(adaptor) => adaptor,
95            };
96            args.tls_adaptor(adaptor);
97        }
98
99        Ok(AmqpConnection {
100            opts: InnerOptions {
101                args,
102                reconnect_millis: match opts.reconnect_millis {
103                    0 => DEF_RECONN_TIME_MS,
104                    _ => opts.reconnect_millis,
105                },
106            },
107            status: Arc::new(Mutex::new(Status::Closed)),
108            conn: Arc::new(Mutex::new(None)),
109            handlers: Arc::new(Mutex::new(HashMap::<String, Arc<dyn EventHandler>>::new())),
110            ev_loop: Arc::new(Mutex::new(None)),
111        })
112    }
113
114    /// To get the raw AMQP connection instance for channel declaration.
115    pub(super) fn get_raw_connection(&self) -> Option<AmqprsConnection> {
116        match self.conn.lock().unwrap().as_ref() {
117            None => None,
118            Some(conn) => Some(conn.clone()),
119        }
120    }
121}
122
123#[async_trait]
124impl GmqConnection for AmqpConnection {
125    fn status(&self) -> Status {
126        *self.status.lock().unwrap()
127    }
128
129    fn add_handler(&mut self, handler: Arc<dyn EventHandler>) -> String {
130        let id = randomstring(ID_SIZE);
131        self.handlers.lock().unwrap().insert(id.clone(), handler);
132        id
133    }
134
135    fn remove_handler(&mut self, id: &str) {
136        self.handlers.lock().unwrap().remove(id);
137    }
138
139    fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
140        {
141            let mut task_handle_mutex = self.ev_loop.lock().unwrap();
142            if (*task_handle_mutex).is_some() {
143                return Ok(());
144            }
145            *self.status.lock().unwrap() = Status::Connecting;
146            *task_handle_mutex = Some(create_event_loop(self));
147        }
148        Ok(())
149    }
150
151    async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
152        match { self.ev_loop.lock().unwrap().take() } {
153            None => return Ok(()),
154            Some(handle) => handle.abort(),
155        }
156        {
157            *self.status.lock().unwrap() = Status::Closing;
158        }
159
160        let conn = { self.conn.lock().unwrap().take() };
161        let mut result: Result<(), AmqprsError> = Ok(());
162        if let Some(conn) = conn {
163            result = conn.close().await;
164        }
165
166        {
167            *self.status.lock().unwrap() = Status::Closed;
168        }
169        let handlers = { (*self.handlers.lock().unwrap()).clone() };
170        for (id, handler) in handlers {
171            let conn = Arc::new(self.clone());
172            task::spawn(async move {
173                handler.on_status(id.clone(), conn, Status::Closed).await;
174            });
175        }
176
177        result?;
178        Ok(())
179    }
180}
181
182impl Default for AmqpConnectionOptions {
183    fn default() -> Self {
184        AmqpConnectionOptions {
185            uri: "amqp://localhost".to_string(),
186            connect_timeout_millis: DEF_CONN_TIMEOUT_MS,
187            reconnect_millis: DEF_RECONN_TIME_MS,
188        }
189    }
190}
191
192/// To create an event loop runtime task.
193fn create_event_loop(conn: &AmqpConnection) -> JoinHandle<()> {
194    let this = Arc::new(conn.clone());
195    task::spawn(async move {
196        loop {
197            match this.status() {
198                Status::Closing | Status::Closed => break,
199                Status::Connecting => {
200                    let conn = match AmqprsConnection::open(&this.opts.args).await {
201                        Err(_) => {
202                            time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
203                            continue;
204                        }
205                        Ok(conn) => conn,
206                    };
207                    {
208                        let mut status_mutex = this.status.lock().unwrap();
209                        if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
210                            continue;
211                        }
212                        *status_mutex = Status::Connected;
213                    }
214                    {
215                        *this.conn.lock().unwrap() = Some(conn);
216                    }
217
218                    let handlers = { (*this.handlers.lock().unwrap()).clone() };
219                    for (id, handler) in handlers {
220                        let conn = this.clone();
221                        task::spawn(async move {
222                            handler.on_status(id.clone(), conn, Status::Connected).await;
223                        });
224                    }
225                }
226                Status::Connected => {
227                    time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
228                    let mut to_disconnected = true;
229                    {
230                        if let Some(conn) = (*this.conn.lock().unwrap()).as_ref() {
231                            if conn.is_open() {
232                                to_disconnected = false;
233                            }
234                        }
235                    }
236                    if !to_disconnected {
237                        continue;
238                    }
239
240                    {
241                        let mut status_mutex = this.status.lock().unwrap();
242                        if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
243                            continue;
244                        }
245                        *status_mutex = Status::Disconnected;
246                    }
247                    let conn = { this.conn.lock().unwrap().take() };
248                    if let Some(conn) = conn {
249                        let _ = conn.close().await;
250                    }
251
252                    let handlers = { (*this.handlers.lock().unwrap()).clone() };
253                    for (id, handler) in handlers {
254                        let conn = this.clone();
255                        task::spawn(async move {
256                            handler
257                                .on_status(id.clone(), conn, Status::Disconnected)
258                                .await;
259                        });
260                    }
261                    time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
262                    {
263                        let mut status_mutex = this.status.lock().unwrap();
264                        if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
265                            continue;
266                        }
267                        *status_mutex = Status::Connecting;
268                    }
269                    let handlers = { (*this.handlers.lock().unwrap()).clone() };
270                    for (id, handler) in handlers {
271                        let conn = this.clone();
272                        task::spawn(async move {
273                            handler
274                                .on_status(id.clone(), conn, Status::Connecting)
275                                .await;
276                        });
277                    }
278                }
279                Status::Disconnected => {
280                    *this.status.lock().unwrap() = Status::Connecting;
281                }
282            }
283        }
284    })
285}