general_mq/amqp/
queue.rs

1use std::{
2    error::Error as StdError,
3    sync::{Arc, Mutex},
4    time::Duration,
5};
6
7use amqprs::{
8    channel::{
9        BasicAckArguments, BasicConsumeArguments, BasicNackArguments, BasicPublishArguments,
10        BasicQosArguments, Channel, ConfirmSelectArguments, ExchangeDeclareArguments, ExchangeType,
11        QueueBindArguments, QueueDeclareArguments,
12    },
13    consumer::AsyncConsumer,
14    error::Error as AmqprsError,
15    BasicProperties, Deliver,
16};
17use async_trait::async_trait;
18use tokio::{
19    task::{self, JoinHandle},
20    time,
21};
22
23use super::connection::AmqpConnection;
24use crate::{
25    connection::{GmqConnection, Status as ConnStatus},
26    queue::{
27        name_validate, EventHandler, GmqQueue, Message, MessageHandler, Status, QUEUE_NAME_PATTERN,
28    },
29    Error,
30};
31
32/// Manages an AMQP queue.
33#[derive(Clone)]
34pub struct AmqpQueue {
35    /// Options of the queue.
36    opts: AmqpQueueOptions,
37    /// The associated [`crate::AmqpConnection`].
38    conn: Arc<Mutex<AmqpConnection>>,
39    /// Hold the channel instance.
40    channel: Arc<Mutex<Option<Channel>>>,
41    /// Queue status.
42    status: Arc<Mutex<Status>>,
43    /// The event handler.
44    handler: Arc<Mutex<Option<Arc<dyn EventHandler>>>>,
45    /// The message handler.
46    msg_handler: Arc<Mutex<Option<Arc<dyn MessageHandler>>>>,
47    /// The event loop to manage and monitor the channel instance.
48    ev_loop: Arc<Mutex<Option<JoinHandle<()>>>>,
49}
50
51/// The queue options.
52#[derive(Clone)]
53pub struct AmqpQueueOptions {
54    /// The queue name that is used to map a AMQP queue (unicast) or an exchange (broadcast).
55    ///
56    /// The pattern is [`QUEUE_NAME_PATTERN`].
57    pub name: String,
58    /// `true` for the receiver and `false` for the sender.
59    pub is_recv: bool,
60    /// Reliable by selecting the confirm channel (for publish).
61    pub reliable: bool,
62    /// `true` for broadcast and `false` for unicast.
63    pub broadcast: bool,
64    /// Time in milliseconds from disconnection to reconnection.
65    ///
66    /// Default or zero value is `1000`.
67    pub reconnect_millis: u64,
68    /// The QoS of the receiver queue.
69    ///
70    /// **Note**: this value **MUST** be a positive value.
71    pub prefetch: u16,
72    /// Use persistent delivery mode.
73    pub persistent: bool,
74}
75
76/// The AMQP [`Message`] implementation.
77struct AmqpMessage {
78    /// Hold the consumer callback channel to operate ack/nack.
79    channel: Channel,
80    /// Hold the consumer callback deliver to operate ack/nack.
81    delivery_tag: u64,
82    /// Hold the consumer callback content.
83    content: Vec<u8>,
84}
85
86/// The [`amqprs::consumer::AsyncConsumer`] implementation.
87struct Consumer {
88    /// The associated [`AmqpQueue`].
89    queue: Arc<AmqpQueue>,
90}
91
92/// Default reconnect time in milliseconds.
93const DEF_RECONN_TIME_MS: u64 = 1000;
94
95impl AmqpQueue {
96    /// Create a queue instance.
97    pub fn new(opts: AmqpQueueOptions, conn: &AmqpConnection) -> Result<AmqpQueue, String> {
98        let name = opts.name.as_str();
99        if name.len() == 0 {
100            return Err("queue name cannot be empty".to_string());
101        } else if !name_validate(name) {
102            return Err(format!(
103                "queue name {} is not match {}",
104                name, QUEUE_NAME_PATTERN
105            ));
106        } else if opts.is_recv && opts.prefetch == 0 {
107            return Err("prefetch cannot be zero for a receiver".to_string());
108        }
109        let mut opts = opts;
110        if opts.reconnect_millis == 0 {
111            opts.reconnect_millis = DEF_RECONN_TIME_MS;
112        }
113
114        Ok(AmqpQueue {
115            opts,
116            conn: Arc::new(Mutex::new(conn.clone())),
117            channel: Arc::new(Mutex::new(None)),
118            status: Arc::new(Mutex::new(Status::Closed)),
119            handler: Arc::new(Mutex::new(None)),
120            msg_handler: Arc::new(Mutex::new(None)),
121            ev_loop: Arc::new(Mutex::new(None)),
122        })
123    }
124
125    /// To get the associated connection status.
126    fn conn_status(&self) -> ConnStatus {
127        self.conn.lock().unwrap().status()
128    }
129
130    /// To get the event handler.
131    fn handler(&self) -> Option<Arc<dyn EventHandler>> {
132        self.handler.lock().unwrap().clone()
133    }
134
135    /// To get the message handler.
136    fn msg_handler(&self) -> Option<Arc<dyn MessageHandler>> {
137        self.msg_handler.lock().unwrap().clone()
138    }
139
140    /// The error handling.
141    fn on_error(&self, err: Box<dyn StdError + Send + Sync>) {
142        let handler = { (*self.handler.lock().unwrap()).clone() };
143        if let Some(handler) = handler {
144            let q = Arc::new(self.clone());
145            task::spawn(async move {
146                handler.on_error(q, err).await;
147            });
148        }
149    }
150}
151
152#[async_trait]
153impl GmqQueue for AmqpQueue {
154    fn name(&self) -> &str {
155        self.opts.name.as_str()
156    }
157
158    fn is_recv(&self) -> bool {
159        self.opts.is_recv
160    }
161
162    fn status(&self) -> Status {
163        *self.status.lock().unwrap()
164    }
165
166    fn set_handler(&mut self, handler: Arc<dyn EventHandler>) {
167        *self.handler.lock().unwrap() = Some(handler);
168    }
169
170    fn clear_handler(&mut self) {
171        let _ = (*self.handler.lock().unwrap()).take();
172    }
173
174    fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>) {
175        *self.msg_handler.lock().unwrap() = Some(handler);
176    }
177
178    fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
179        if self.opts.is_recv && self.msg_handler().is_none() {
180            return Err(Box::new(Error::NoMsgHandler));
181        }
182
183        {
184            let mut task_handle_mutex = self.ev_loop.lock().unwrap();
185            if (*task_handle_mutex).is_some() {
186                return Ok(());
187            }
188            *self.status.lock().unwrap() = Status::Connecting;
189            *task_handle_mutex = Some(create_event_loop(self));
190        }
191        Ok(())
192    }
193
194    async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
195        match { self.ev_loop.lock().unwrap().take() } {
196            None => return Ok(()),
197            Some(handle) => handle.abort(),
198        }
199        {
200            *self.status.lock().unwrap() = Status::Closing;
201        }
202
203        let channel = { self.channel.lock().unwrap().take() };
204
205        let mut result: Result<(), AmqprsError> = Ok(());
206        if let Some(channel) = channel {
207            result = channel.close().await;
208        }
209
210        {
211            *self.status.lock().unwrap() = Status::Closed;
212        }
213        if let Some(handler) = { (*self.handler.lock().unwrap()).clone() } {
214            let queue = Arc::new(self.clone());
215            task::spawn(async move {
216                handler.on_status(queue, Status::Closed).await;
217            });
218        }
219
220        result?;
221        Ok(())
222    }
223
224    async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
225        if self.opts.is_recv {
226            return Err(Box::new(Error::QueueIsReceiver));
227        }
228
229        let channel = {
230            match self.channel.lock().unwrap().as_ref() {
231                None => return Err(Box::new(Error::NotConnected)),
232                Some(channel) => channel.clone(),
233            }
234        };
235
236        let mut prop = BasicProperties::default();
237        if self.opts.persistent {
238            prop.with_persistence(true);
239        }
240        let mut args = match self.opts.reliable {
241            false => BasicPublishArguments::default(),
242            true => BasicPublishArguments {
243                mandatory: true,
244                ..Default::default()
245            },
246        };
247        if self.opts.broadcast {
248            args.exchange(self.opts.name.clone());
249        } else {
250            args.routing_key(self.opts.name.clone());
251        }
252
253        channel.basic_publish(prop, payload, args).await?;
254        Ok(())
255    }
256}
257
258impl Default for AmqpQueueOptions {
259    fn default() -> Self {
260        AmqpQueueOptions {
261            name: "".to_string(),
262            is_recv: false,
263            reliable: false,
264            broadcast: false,
265            reconnect_millis: DEF_RECONN_TIME_MS,
266            prefetch: 1,
267            persistent: false,
268        }
269    }
270}
271
272#[async_trait]
273impl Message for AmqpMessage {
274    fn payload(&self) -> &[u8] {
275        &self.content
276    }
277
278    async fn ack(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
279        let args = BasicAckArguments {
280            delivery_tag: self.delivery_tag,
281            ..Default::default()
282        };
283        self.channel.basic_ack(args).await?;
284        Ok(())
285    }
286
287    async fn nack(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
288        let args = BasicNackArguments {
289            delivery_tag: self.delivery_tag,
290            requeue: true,
291            ..Default::default()
292        };
293        self.channel.basic_nack(args).await?;
294        Ok(())
295    }
296}
297
298#[async_trait]
299impl AsyncConsumer for Consumer {
300    async fn consume(
301        &mut self,
302        channel: &Channel,
303        deliver: Deliver,
304        _basic_properties: BasicProperties,
305        content: Vec<u8>,
306    ) {
307        let queue = self.queue.clone();
308        let handler = {
309            match self.queue.msg_handler().as_ref() {
310                None => return (),
311                Some(handler) => handler.clone(),
312            }
313        };
314        let message = Box::new(AmqpMessage {
315            channel: channel.clone(),
316            delivery_tag: deliver.delivery_tag(),
317            content,
318        });
319
320        task::spawn(async move {
321            handler.on_message(queue, message).await;
322        });
323    }
324}
325
326/// To create an event loop runtime task.
327fn create_event_loop(queue: &AmqpQueue) -> JoinHandle<()> {
328    let this = Arc::new(queue.clone());
329    task::spawn(async move {
330        loop {
331            match this.status() {
332                Status::Closing | Status::Closed => break,
333                Status::Connecting => {
334                    if this.conn_status() != ConnStatus::Connected {
335                        time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
336                        continue;
337                    }
338
339                    let raw_conn = { this.conn.lock().unwrap().get_raw_connection() };
340                    let channel = if let Some(raw_conn) = raw_conn {
341                        match raw_conn.open_channel(None).await {
342                            Err(e) => {
343                                this.on_error(Box::new(e));
344                                time::sleep(Duration::from_millis(this.opts.reconnect_millis))
345                                    .await;
346                                continue;
347                            }
348                            Ok(channel) => channel,
349                        }
350                    } else {
351                        time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
352                        continue;
353                    };
354                    if this.opts.reliable {
355                        let args = ConfirmSelectArguments::default();
356                        if let Err(e) = channel.confirm_select(args).await {
357                            this.on_error(Box::new(e));
358                            time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
359                            continue;
360                        }
361                    }
362
363                    let name = this.opts.name.as_str();
364                    if this.opts.broadcast {
365                        let args = ExchangeDeclareArguments::of_type(name, ExchangeType::Fanout);
366                        if let Err(e) = channel.exchange_declare(args).await {
367                            this.on_error(Box::new(e));
368                            time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
369                            continue;
370                        }
371
372                        if this.opts.is_recv {
373                            let mut args = QueueDeclareArguments::default();
374                            args.exclusive(true);
375                            let queue_name = match channel.queue_declare(args).await {
376                                Err(e) => {
377                                    this.on_error(Box::new(e));
378                                    time::sleep(Duration::from_millis(this.opts.reconnect_millis))
379                                        .await;
380                                    continue;
381                                }
382                                Ok(Some((queue_name, _, _))) => queue_name,
383                                _ => {
384                                    this.on_error(Box::new(AmqprsError::ChannelUseError(
385                                        "unknown queue_declare error".to_string(),
386                                    )));
387                                    time::sleep(Duration::from_millis(this.opts.reconnect_millis))
388                                        .await;
389                                    continue;
390                                }
391                            };
392
393                            let args = QueueBindArguments {
394                                queue: queue_name.clone(),
395                                exchange: name.to_string(),
396                                routing_key: "".to_string(),
397                                ..Default::default()
398                            };
399                            if let Err(e) = channel.queue_bind(args).await {
400                                this.on_error(Box::new(e));
401                                time::sleep(Duration::from_millis(this.opts.reconnect_millis))
402                                    .await;
403                                continue;
404                            }
405
406                            let args = BasicQosArguments {
407                                prefetch_count: this.opts.prefetch,
408                                ..Default::default()
409                            };
410                            if let Err(e) = channel.basic_qos(args).await {
411                                this.on_error(Box::new(e));
412                                time::sleep(Duration::from_millis(this.opts.reconnect_millis))
413                                    .await;
414                                continue;
415                            }
416
417                            let args = BasicConsumeArguments::new(&queue_name, "");
418                            let consumer = Consumer {
419                                queue: this.clone(),
420                            };
421                            if let Err(e) = channel.basic_consume(consumer, args).await {
422                                this.on_error(Box::new(e));
423                                time::sleep(Duration::from_millis(this.opts.reconnect_millis))
424                                    .await;
425                                continue;
426                            }
427                        }
428                    } else {
429                        let mut args = QueueDeclareArguments::new(name);
430                        args.durable(true);
431                        if let Err(e) = channel.queue_declare(args).await {
432                            this.on_error(Box::new(e));
433                            time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
434                            continue;
435                        }
436
437                        if this.opts.is_recv {
438                            let args = BasicQosArguments {
439                                prefetch_count: this.opts.prefetch,
440                                ..Default::default()
441                            };
442                            if let Err(e) = channel.basic_qos(args).await {
443                                this.on_error(Box::new(e));
444                                time::sleep(Duration::from_millis(this.opts.reconnect_millis))
445                                    .await;
446                                continue;
447                            }
448
449                            let args = BasicConsumeArguments::new(name, "");
450                            let consumer = Consumer {
451                                queue: this.clone(),
452                            };
453                            if let Err(e) = channel.basic_consume(consumer, args).await {
454                                this.on_error(Box::new(e));
455                                time::sleep(Duration::from_millis(this.opts.reconnect_millis))
456                                    .await;
457                                continue;
458                            }
459                        }
460                    }
461
462                    {
463                        *this.channel.lock().unwrap() = Some(channel);
464                        *this.status.lock().unwrap() = Status::Connected;
465                    }
466                    if let Some(handler) = this.handler() {
467                        let queue = this.clone();
468                        task::spawn(async move {
469                            handler.on_status(queue, Status::Connected).await;
470                        });
471                    }
472                }
473                Status::Connected => {
474                    time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
475                    let mut to_disconnected = true;
476                    {
477                        if let Some(channel) = (*this.channel.lock().unwrap()).as_ref() {
478                            if channel.is_open() {
479                                to_disconnected = false;
480                            }
481                        }
482                    }
483                    if to_disconnected {
484                        to_disconnected_fn(this.clone()).await;
485                    }
486                }
487                Status::Disconnected => {
488                    *this.status.lock().unwrap() = Status::Connecting;
489                }
490            }
491        }
492    })
493}
494
495/// The utilization function for handling disconnected.
496async fn to_disconnected_fn(queue: Arc<AmqpQueue>) {
497    {
498        let mut status_mutex = queue.status.lock().unwrap();
499        if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
500            return;
501        }
502        queue.channel.lock().unwrap().take();
503        *status_mutex = Status::Disconnected;
504    }
505
506    let handler = { (*queue.handler.lock().unwrap()).clone() };
507    if let Some(handler) = handler {
508        let q = queue.clone();
509        task::spawn(async move {
510            handler.on_status(q, Status::Disconnected).await;
511        });
512    }
513    time::sleep(Duration::from_millis(queue.opts.reconnect_millis)).await;
514    {
515        let mut status_mutex = queue.status.lock().unwrap();
516        if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
517            return;
518        }
519        *status_mutex = Status::Connecting;
520    }
521
522    let handler = { (*queue.handler.lock().unwrap()).clone() };
523    if let Some(handler) = handler {
524        let q = queue.clone();
525        task::spawn(async move {
526            handler.on_status(q, Status::Connecting).await;
527        });
528    }
529}