1use std::{
2 error::Error as StdError,
3 sync::{Arc, Mutex},
4 time::Duration,
5};
6
7use amqprs::{
8 channel::{
9 BasicAckArguments, BasicConsumeArguments, BasicNackArguments, BasicPublishArguments,
10 BasicQosArguments, Channel, ConfirmSelectArguments, ExchangeDeclareArguments, ExchangeType,
11 QueueBindArguments, QueueDeclareArguments,
12 },
13 consumer::AsyncConsumer,
14 error::Error as AmqprsError,
15 BasicProperties, Deliver,
16};
17use async_trait::async_trait;
18use tokio::{
19 task::{self, JoinHandle},
20 time,
21};
22
23use super::connection::AmqpConnection;
24use crate::{
25 connection::{GmqConnection, Status as ConnStatus},
26 queue::{
27 name_validate, EventHandler, GmqQueue, Message, MessageHandler, Status, QUEUE_NAME_PATTERN,
28 },
29 Error,
30};
31
32#[derive(Clone)]
34pub struct AmqpQueue {
35 opts: AmqpQueueOptions,
37 conn: Arc<Mutex<AmqpConnection>>,
39 channel: Arc<Mutex<Option<Channel>>>,
41 status: Arc<Mutex<Status>>,
43 handler: Arc<Mutex<Option<Arc<dyn EventHandler>>>>,
45 msg_handler: Arc<Mutex<Option<Arc<dyn MessageHandler>>>>,
47 ev_loop: Arc<Mutex<Option<JoinHandle<()>>>>,
49}
50
51#[derive(Clone)]
53pub struct AmqpQueueOptions {
54 pub name: String,
58 pub is_recv: bool,
60 pub reliable: bool,
62 pub broadcast: bool,
64 pub reconnect_millis: u64,
68 pub prefetch: u16,
72 pub persistent: bool,
74}
75
76struct AmqpMessage {
78 channel: Channel,
80 delivery_tag: u64,
82 content: Vec<u8>,
84}
85
86struct Consumer {
88 queue: Arc<AmqpQueue>,
90}
91
92const DEF_RECONN_TIME_MS: u64 = 1000;
94
95impl AmqpQueue {
96 pub fn new(opts: AmqpQueueOptions, conn: &AmqpConnection) -> Result<AmqpQueue, String> {
98 let name = opts.name.as_str();
99 if name.len() == 0 {
100 return Err("queue name cannot be empty".to_string());
101 } else if !name_validate(name) {
102 return Err(format!(
103 "queue name {} is not match {}",
104 name, QUEUE_NAME_PATTERN
105 ));
106 } else if opts.is_recv && opts.prefetch == 0 {
107 return Err("prefetch cannot be zero for a receiver".to_string());
108 }
109 let mut opts = opts;
110 if opts.reconnect_millis == 0 {
111 opts.reconnect_millis = DEF_RECONN_TIME_MS;
112 }
113
114 Ok(AmqpQueue {
115 opts,
116 conn: Arc::new(Mutex::new(conn.clone())),
117 channel: Arc::new(Mutex::new(None)),
118 status: Arc::new(Mutex::new(Status::Closed)),
119 handler: Arc::new(Mutex::new(None)),
120 msg_handler: Arc::new(Mutex::new(None)),
121 ev_loop: Arc::new(Mutex::new(None)),
122 })
123 }
124
125 fn conn_status(&self) -> ConnStatus {
127 self.conn.lock().unwrap().status()
128 }
129
130 fn handler(&self) -> Option<Arc<dyn EventHandler>> {
132 self.handler.lock().unwrap().clone()
133 }
134
135 fn msg_handler(&self) -> Option<Arc<dyn MessageHandler>> {
137 self.msg_handler.lock().unwrap().clone()
138 }
139
140 fn on_error(&self, err: Box<dyn StdError + Send + Sync>) {
142 let handler = { (*self.handler.lock().unwrap()).clone() };
143 if let Some(handler) = handler {
144 let q = Arc::new(self.clone());
145 task::spawn(async move {
146 handler.on_error(q, err).await;
147 });
148 }
149 }
150}
151
152#[async_trait]
153impl GmqQueue for AmqpQueue {
154 fn name(&self) -> &str {
155 self.opts.name.as_str()
156 }
157
158 fn is_recv(&self) -> bool {
159 self.opts.is_recv
160 }
161
162 fn status(&self) -> Status {
163 *self.status.lock().unwrap()
164 }
165
166 fn set_handler(&mut self, handler: Arc<dyn EventHandler>) {
167 *self.handler.lock().unwrap() = Some(handler);
168 }
169
170 fn clear_handler(&mut self) {
171 let _ = (*self.handler.lock().unwrap()).take();
172 }
173
174 fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>) {
175 *self.msg_handler.lock().unwrap() = Some(handler);
176 }
177
178 fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
179 if self.opts.is_recv && self.msg_handler().is_none() {
180 return Err(Box::new(Error::NoMsgHandler));
181 }
182
183 {
184 let mut task_handle_mutex = self.ev_loop.lock().unwrap();
185 if (*task_handle_mutex).is_some() {
186 return Ok(());
187 }
188 *self.status.lock().unwrap() = Status::Connecting;
189 *task_handle_mutex = Some(create_event_loop(self));
190 }
191 Ok(())
192 }
193
194 async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
195 match { self.ev_loop.lock().unwrap().take() } {
196 None => return Ok(()),
197 Some(handle) => handle.abort(),
198 }
199 {
200 *self.status.lock().unwrap() = Status::Closing;
201 }
202
203 let channel = { self.channel.lock().unwrap().take() };
204
205 let mut result: Result<(), AmqprsError> = Ok(());
206 if let Some(channel) = channel {
207 result = channel.close().await;
208 }
209
210 {
211 *self.status.lock().unwrap() = Status::Closed;
212 }
213 if let Some(handler) = { (*self.handler.lock().unwrap()).clone() } {
214 let queue = Arc::new(self.clone());
215 task::spawn(async move {
216 handler.on_status(queue, Status::Closed).await;
217 });
218 }
219
220 result?;
221 Ok(())
222 }
223
224 async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
225 if self.opts.is_recv {
226 return Err(Box::new(Error::QueueIsReceiver));
227 }
228
229 let channel = {
230 match self.channel.lock().unwrap().as_ref() {
231 None => return Err(Box::new(Error::NotConnected)),
232 Some(channel) => channel.clone(),
233 }
234 };
235
236 let mut prop = BasicProperties::default();
237 if self.opts.persistent {
238 prop.with_persistence(true);
239 }
240 let mut args = match self.opts.reliable {
241 false => BasicPublishArguments::default(),
242 true => BasicPublishArguments {
243 mandatory: true,
244 ..Default::default()
245 },
246 };
247 if self.opts.broadcast {
248 args.exchange(self.opts.name.clone());
249 } else {
250 args.routing_key(self.opts.name.clone());
251 }
252
253 channel.basic_publish(prop, payload, args).await?;
254 Ok(())
255 }
256}
257
258impl Default for AmqpQueueOptions {
259 fn default() -> Self {
260 AmqpQueueOptions {
261 name: "".to_string(),
262 is_recv: false,
263 reliable: false,
264 broadcast: false,
265 reconnect_millis: DEF_RECONN_TIME_MS,
266 prefetch: 1,
267 persistent: false,
268 }
269 }
270}
271
272#[async_trait]
273impl Message for AmqpMessage {
274 fn payload(&self) -> &[u8] {
275 &self.content
276 }
277
278 async fn ack(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
279 let args = BasicAckArguments {
280 delivery_tag: self.delivery_tag,
281 ..Default::default()
282 };
283 self.channel.basic_ack(args).await?;
284 Ok(())
285 }
286
287 async fn nack(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
288 let args = BasicNackArguments {
289 delivery_tag: self.delivery_tag,
290 requeue: true,
291 ..Default::default()
292 };
293 self.channel.basic_nack(args).await?;
294 Ok(())
295 }
296}
297
298#[async_trait]
299impl AsyncConsumer for Consumer {
300 async fn consume(
301 &mut self,
302 channel: &Channel,
303 deliver: Deliver,
304 _basic_properties: BasicProperties,
305 content: Vec<u8>,
306 ) {
307 let queue = self.queue.clone();
308 let handler = {
309 match self.queue.msg_handler().as_ref() {
310 None => return (),
311 Some(handler) => handler.clone(),
312 }
313 };
314 let message = Box::new(AmqpMessage {
315 channel: channel.clone(),
316 delivery_tag: deliver.delivery_tag(),
317 content,
318 });
319
320 task::spawn(async move {
321 handler.on_message(queue, message).await;
322 });
323 }
324}
325
326fn create_event_loop(queue: &AmqpQueue) -> JoinHandle<()> {
328 let this = Arc::new(queue.clone());
329 task::spawn(async move {
330 loop {
331 match this.status() {
332 Status::Closing | Status::Closed => break,
333 Status::Connecting => {
334 if this.conn_status() != ConnStatus::Connected {
335 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
336 continue;
337 }
338
339 let raw_conn = { this.conn.lock().unwrap().get_raw_connection() };
340 let channel = if let Some(raw_conn) = raw_conn {
341 match raw_conn.open_channel(None).await {
342 Err(e) => {
343 this.on_error(Box::new(e));
344 time::sleep(Duration::from_millis(this.opts.reconnect_millis))
345 .await;
346 continue;
347 }
348 Ok(channel) => channel,
349 }
350 } else {
351 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
352 continue;
353 };
354 if this.opts.reliable {
355 let args = ConfirmSelectArguments::default();
356 if let Err(e) = channel.confirm_select(args).await {
357 this.on_error(Box::new(e));
358 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
359 continue;
360 }
361 }
362
363 let name = this.opts.name.as_str();
364 if this.opts.broadcast {
365 let args = ExchangeDeclareArguments::of_type(name, ExchangeType::Fanout);
366 if let Err(e) = channel.exchange_declare(args).await {
367 this.on_error(Box::new(e));
368 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
369 continue;
370 }
371
372 if this.opts.is_recv {
373 let mut args = QueueDeclareArguments::default();
374 args.exclusive(true);
375 let queue_name = match channel.queue_declare(args).await {
376 Err(e) => {
377 this.on_error(Box::new(e));
378 time::sleep(Duration::from_millis(this.opts.reconnect_millis))
379 .await;
380 continue;
381 }
382 Ok(Some((queue_name, _, _))) => queue_name,
383 _ => {
384 this.on_error(Box::new(AmqprsError::ChannelUseError(
385 "unknown queue_declare error".to_string(),
386 )));
387 time::sleep(Duration::from_millis(this.opts.reconnect_millis))
388 .await;
389 continue;
390 }
391 };
392
393 let args = QueueBindArguments {
394 queue: queue_name.clone(),
395 exchange: name.to_string(),
396 routing_key: "".to_string(),
397 ..Default::default()
398 };
399 if let Err(e) = channel.queue_bind(args).await {
400 this.on_error(Box::new(e));
401 time::sleep(Duration::from_millis(this.opts.reconnect_millis))
402 .await;
403 continue;
404 }
405
406 let args = BasicQosArguments {
407 prefetch_count: this.opts.prefetch,
408 ..Default::default()
409 };
410 if let Err(e) = channel.basic_qos(args).await {
411 this.on_error(Box::new(e));
412 time::sleep(Duration::from_millis(this.opts.reconnect_millis))
413 .await;
414 continue;
415 }
416
417 let args = BasicConsumeArguments::new(&queue_name, "");
418 let consumer = Consumer {
419 queue: this.clone(),
420 };
421 if let Err(e) = channel.basic_consume(consumer, args).await {
422 this.on_error(Box::new(e));
423 time::sleep(Duration::from_millis(this.opts.reconnect_millis))
424 .await;
425 continue;
426 }
427 }
428 } else {
429 let mut args = QueueDeclareArguments::new(name);
430 args.durable(true);
431 if let Err(e) = channel.queue_declare(args).await {
432 this.on_error(Box::new(e));
433 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
434 continue;
435 }
436
437 if this.opts.is_recv {
438 let args = BasicQosArguments {
439 prefetch_count: this.opts.prefetch,
440 ..Default::default()
441 };
442 if let Err(e) = channel.basic_qos(args).await {
443 this.on_error(Box::new(e));
444 time::sleep(Duration::from_millis(this.opts.reconnect_millis))
445 .await;
446 continue;
447 }
448
449 let args = BasicConsumeArguments::new(name, "");
450 let consumer = Consumer {
451 queue: this.clone(),
452 };
453 if let Err(e) = channel.basic_consume(consumer, args).await {
454 this.on_error(Box::new(e));
455 time::sleep(Duration::from_millis(this.opts.reconnect_millis))
456 .await;
457 continue;
458 }
459 }
460 }
461
462 {
463 *this.channel.lock().unwrap() = Some(channel);
464 *this.status.lock().unwrap() = Status::Connected;
465 }
466 if let Some(handler) = this.handler() {
467 let queue = this.clone();
468 task::spawn(async move {
469 handler.on_status(queue, Status::Connected).await;
470 });
471 }
472 }
473 Status::Connected => {
474 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
475 let mut to_disconnected = true;
476 {
477 if let Some(channel) = (*this.channel.lock().unwrap()).as_ref() {
478 if channel.is_open() {
479 to_disconnected = false;
480 }
481 }
482 }
483 if to_disconnected {
484 to_disconnected_fn(this.clone()).await;
485 }
486 }
487 Status::Disconnected => {
488 *this.status.lock().unwrap() = Status::Connecting;
489 }
490 }
491 }
492 })
493}
494
495async fn to_disconnected_fn(queue: Arc<AmqpQueue>) {
497 {
498 let mut status_mutex = queue.status.lock().unwrap();
499 if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
500 return;
501 }
502 queue.channel.lock().unwrap().take();
503 *status_mutex = Status::Disconnected;
504 }
505
506 let handler = { (*queue.handler.lock().unwrap()).clone() };
507 if let Some(handler) = handler {
508 let q = queue.clone();
509 task::spawn(async move {
510 handler.on_status(q, Status::Disconnected).await;
511 });
512 }
513 time::sleep(Duration::from_millis(queue.opts.reconnect_millis)).await;
514 {
515 let mut status_mutex = queue.status.lock().unwrap();
516 if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
517 return;
518 }
519 *status_mutex = Status::Connecting;
520 }
521
522 let handler = { (*queue.handler.lock().unwrap()).clone() };
523 if let Some(handler) = handler {
524 let q = queue.clone();
525 task::spawn(async move {
526 handler.on_status(q, Status::Connecting).await;
527 });
528 }
529}