general_mq/
queue.rs

1//! Traits and enumerations for queues.
2
3use std::{error::Error as StdError, sync::Arc};
4
5use async_trait::async_trait;
6use regex::Regex;
7
8/// Queue status.
9#[derive(Debug, PartialEq)]
10pub enum Status {
11    /// The queue is closing.
12    Closing,
13    /// The queue is closed by the program.
14    Closed,
15    /// Connecting to the message queue.
16    Connecting,
17    /// Connected to the message queue.
18    Connected,
19    /// The queue is not connected. It will retry connecting to the queue automatically.
20    Disconnected,
21}
22
23/// The accepted pattern of the queue name.
24pub const QUEUE_NAME_PATTERN: &'static str = r"^[a-z0-9_-]+([\.]{1}[a-z0-9_-]+)*$";
25
26/// The operations for queues.
27#[async_trait]
28pub trait GmqQueue: Send + Sync {
29    /// To get the queue name.
30    fn name(&self) -> &str;
31
32    /// Is the queue a receiver.
33    fn is_recv(&self) -> bool;
34
35    /// To get the connection status.
36    fn status(&self) -> Status;
37
38    /// To set the queue event handler.
39    fn set_handler(&mut self, handler: Arc<dyn EventHandler>);
40
41    /// To remove the queue event handler.
42    fn clear_handler(&mut self);
43
44    /// To set the queue message handler.
45    fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>);
46
47    /// To connect to the message queue. The [`GmqQueue`] will connect to the queue using another
48    /// runtime task and report status with [`Status`]s.
49    ///
50    /// **Note** You MUST call `set_msg_handler()` before `connect()`.
51    fn connect(&mut self) -> Result<(), Box<dyn StdError>>;
52
53    /// To close the queue.
54    async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>>;
55
56    /// To send a message (for **senders** only).
57    async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>>;
58}
59
60/// The operations for incoming messages.
61#[async_trait]
62pub trait Message: Send + Sync {
63    /// To get the payload.
64    fn payload(&self) -> &[u8];
65
66    /// Use this if the message is processed successfully.
67    async fn ack(&self) -> Result<(), Box<dyn StdError + Send + Sync>>;
68
69    /// To requeue the message and the broker will send the message in the future.
70    ///
71    /// **Note**: only AMQP or protocols that support requeuing are effective.
72    async fn nack(&self) -> Result<(), Box<dyn StdError + Send + Sync>>;
73}
74
75/// The event handler for queues.
76#[async_trait]
77pub trait EventHandler: Send + Sync {
78    /// Triggered when there are errors.
79    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>);
80
81    /// Triggered by [`Status`].
82    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status);
83}
84
85/// The message handler for queues.
86#[async_trait]
87pub trait MessageHandler: Send + Sync {
88    /// Triggered for new incoming [`Message`]s.
89    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>);
90}
91
92impl Copy for Status {}
93
94impl Clone for Status {
95    fn clone(&self) -> Status {
96        *self
97    }
98}
99
100/// To validate the queue name.
101pub(crate) fn name_validate(name: &str) -> bool {
102    let re = Regex::new(QUEUE_NAME_PATTERN).unwrap();
103    re.is_match(name)
104}