general_mq/amqp/
connection.rs1use std::{
2 collections::HashMap,
3 error::Error as StdError,
4 str::FromStr,
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9use amqprs::{
10 connection::{Connection as AmqprsConnection, OpenConnectionArguments},
11 error::Error as AmqprsError,
12 security::SecurityCredentials,
13 tls::TlsAdaptor,
14};
15use async_trait::async_trait;
16use lapin::uri::{AMQPScheme, AMQPUri};
17use tokio::{
18 task::{self, JoinHandle},
19 time,
20};
21
22use crate::{
23 connection::{EventHandler, GmqConnection, Status},
24 randomstring, ID_SIZE,
25};
26
27#[derive(Clone)]
29pub struct AmqpConnection {
30 opts: InnerOptions,
32 status: Arc<Mutex<Status>>,
34 conn: Arc<Mutex<Option<AmqprsConnection>>>,
36 handlers: Arc<Mutex<HashMap<String, Arc<dyn EventHandler>>>>,
38 ev_loop: Arc<Mutex<Option<JoinHandle<()>>>>,
40}
41
42pub struct AmqpConnectionOptions {
44 pub uri: String,
48 pub connect_timeout_millis: u64,
52 pub reconnect_millis: u64,
56}
57
58#[derive(Clone)]
60struct InnerOptions {
61 args: OpenConnectionArguments,
63 reconnect_millis: u64,
65}
66
67const DEF_CONN_TIMEOUT_MS: u64 = 3000;
69const DEF_RECONN_TIME_MS: u64 = 1000;
71
72impl AmqpConnection {
73 pub fn new(opts: AmqpConnectionOptions) -> Result<AmqpConnection, String> {
75 let mut uri = AMQPUri::from_str(opts.uri.as_str())?;
76 uri.query.connection_timeout = match opts.connect_timeout_millis {
77 0 => Some(DEF_CONN_TIMEOUT_MS),
78 _ => Some(opts.connect_timeout_millis),
79 };
80 if uri.vhost.len() == 0 {
81 uri.vhost = "/".to_string();
82 }
83 let mut args = OpenConnectionArguments::default();
84 args.host(&uri.authority.host)
85 .port(uri.authority.port)
86 .credentials(SecurityCredentials::new_plain(
87 &uri.authority.userinfo.username,
88 &uri.authority.userinfo.password,
89 ))
90 .virtual_host(&uri.vhost);
91 if uri.scheme == AMQPScheme::AMQPS {
92 let adaptor = match TlsAdaptor::without_client_auth(None, uri.authority.host.clone()) {
93 Err(e) => return Err(e.to_string()),
94 Ok(adaptor) => adaptor,
95 };
96 args.tls_adaptor(adaptor);
97 }
98
99 Ok(AmqpConnection {
100 opts: InnerOptions {
101 args,
102 reconnect_millis: match opts.reconnect_millis {
103 0 => DEF_RECONN_TIME_MS,
104 _ => opts.reconnect_millis,
105 },
106 },
107 status: Arc::new(Mutex::new(Status::Closed)),
108 conn: Arc::new(Mutex::new(None)),
109 handlers: Arc::new(Mutex::new(HashMap::<String, Arc<dyn EventHandler>>::new())),
110 ev_loop: Arc::new(Mutex::new(None)),
111 })
112 }
113
114 pub(super) fn get_raw_connection(&self) -> Option<AmqprsConnection> {
116 match self.conn.lock().unwrap().as_ref() {
117 None => None,
118 Some(conn) => Some(conn.clone()),
119 }
120 }
121}
122
123#[async_trait]
124impl GmqConnection for AmqpConnection {
125 fn status(&self) -> Status {
126 *self.status.lock().unwrap()
127 }
128
129 fn add_handler(&mut self, handler: Arc<dyn EventHandler>) -> String {
130 let id = randomstring(ID_SIZE);
131 self.handlers.lock().unwrap().insert(id.clone(), handler);
132 id
133 }
134
135 fn remove_handler(&mut self, id: &str) {
136 self.handlers.lock().unwrap().remove(id);
137 }
138
139 fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
140 {
141 let mut task_handle_mutex = self.ev_loop.lock().unwrap();
142 if (*task_handle_mutex).is_some() {
143 return Ok(());
144 }
145 *self.status.lock().unwrap() = Status::Connecting;
146 *task_handle_mutex = Some(create_event_loop(self));
147 }
148 Ok(())
149 }
150
151 async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
152 match { self.ev_loop.lock().unwrap().take() } {
153 None => return Ok(()),
154 Some(handle) => handle.abort(),
155 }
156 {
157 *self.status.lock().unwrap() = Status::Closing;
158 }
159
160 let conn = { self.conn.lock().unwrap().take() };
161 let mut result: Result<(), AmqprsError> = Ok(());
162 if let Some(conn) = conn {
163 result = conn.close().await;
164 }
165
166 {
167 *self.status.lock().unwrap() = Status::Closed;
168 }
169 let handlers = { (*self.handlers.lock().unwrap()).clone() };
170 for (id, handler) in handlers {
171 let conn = Arc::new(self.clone());
172 task::spawn(async move {
173 handler.on_status(id.clone(), conn, Status::Closed).await;
174 });
175 }
176
177 result?;
178 Ok(())
179 }
180}
181
182impl Default for AmqpConnectionOptions {
183 fn default() -> Self {
184 AmqpConnectionOptions {
185 uri: "amqp://localhost".to_string(),
186 connect_timeout_millis: DEF_CONN_TIMEOUT_MS,
187 reconnect_millis: DEF_RECONN_TIME_MS,
188 }
189 }
190}
191
192fn create_event_loop(conn: &AmqpConnection) -> JoinHandle<()> {
194 let this = Arc::new(conn.clone());
195 task::spawn(async move {
196 loop {
197 match this.status() {
198 Status::Closing | Status::Closed => break,
199 Status::Connecting => {
200 let conn = match AmqprsConnection::open(&this.opts.args).await {
201 Err(_) => {
202 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
203 continue;
204 }
205 Ok(conn) => conn,
206 };
207 {
208 let mut status_mutex = this.status.lock().unwrap();
209 if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
210 continue;
211 }
212 *status_mutex = Status::Connected;
213 }
214 {
215 *this.conn.lock().unwrap() = Some(conn);
216 }
217
218 let handlers = { (*this.handlers.lock().unwrap()).clone() };
219 for (id, handler) in handlers {
220 let conn = this.clone();
221 task::spawn(async move {
222 handler.on_status(id.clone(), conn, Status::Connected).await;
223 });
224 }
225 }
226 Status::Connected => {
227 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
228 let mut to_disconnected = true;
229 {
230 if let Some(conn) = (*this.conn.lock().unwrap()).as_ref() {
231 if conn.is_open() {
232 to_disconnected = false;
233 }
234 }
235 }
236 if !to_disconnected {
237 continue;
238 }
239
240 {
241 let mut status_mutex = this.status.lock().unwrap();
242 if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
243 continue;
244 }
245 *status_mutex = Status::Disconnected;
246 }
247 let conn = { this.conn.lock().unwrap().take() };
248 if let Some(conn) = conn {
249 let _ = conn.close().await;
250 }
251
252 let handlers = { (*this.handlers.lock().unwrap()).clone() };
253 for (id, handler) in handlers {
254 let conn = this.clone();
255 task::spawn(async move {
256 handler
257 .on_status(id.clone(), conn, Status::Disconnected)
258 .await;
259 });
260 }
261 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
262 {
263 let mut status_mutex = this.status.lock().unwrap();
264 if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
265 continue;
266 }
267 *status_mutex = Status::Connecting;
268 }
269 let handlers = { (*this.handlers.lock().unwrap()).clone() };
270 for (id, handler) in handlers {
271 let conn = this.clone();
272 task::spawn(async move {
273 handler
274 .on_status(id.clone(), conn, Status::Connecting)
275 .await;
276 });
277 }
278 }
279 Status::Disconnected => {
280 *this.status.lock().unwrap() = Status::Connecting;
281 }
282 }
283 }
284 })
285}