general_mq/queue.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
//! Traits and enumerations for queues.
use std::{error::Error as StdError, sync::Arc};
use async_trait::async_trait;
use regex::Regex;
/// Queue status.
#[derive(Debug, PartialEq)]
pub enum Status {
/// The queue is closing.
Closing,
/// The queue is closed by the program.
Closed,
/// Connecting to the message queue.
Connecting,
/// Connected to the message queue.
Connected,
/// The queue is not connected. It will retry connecting to the queue automatically.
Disconnected,
}
/// The accepted pattern of the queue name.
pub const QUEUE_NAME_PATTERN: &'static str = r"^[a-z0-9_-]+([\.]{1}[a-z0-9_-]+)*$";
/// The operations for queues.
#[async_trait]
pub trait GmqQueue: Send + Sync {
/// To get the queue name.
fn name(&self) -> &str;
/// Is the queue a receiver.
fn is_recv(&self) -> bool;
/// To get the connection status.
fn status(&self) -> Status;
/// To set the queue event handler.
fn set_handler(&mut self, handler: Arc<dyn EventHandler>);
/// To remove the queue event handler.
fn clear_handler(&mut self);
/// To set the queue message handler.
fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>);
/// To connect to the message queue. The [`GmqQueue`] will connect to the queue using another
/// runtime task and report status with [`Status`]s.
///
/// **Note** You MUST call `set_msg_handler()` before `connect()`.
fn connect(&mut self) -> Result<(), Box<dyn StdError>>;
/// To close the queue.
async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>>;
/// To send a message (for **senders** only).
async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>>;
}
/// The operations for incoming messages.
#[async_trait]
pub trait Message: Send + Sync {
/// To get the payload.
fn payload(&self) -> &[u8];
/// Use this if the message is processed successfully.
async fn ack(&self) -> Result<(), Box<dyn StdError + Send + Sync>>;
/// To requeue the message and the broker will send the message in the future.
///
/// **Note**: only AMQP or protocols that support requeuing are effective.
async fn nack(&self) -> Result<(), Box<dyn StdError + Send + Sync>>;
}
/// The event handler for queues.
#[async_trait]
pub trait EventHandler: Send + Sync {
/// Triggered when there are errors.
async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>);
/// Triggered by [`Status`].
async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status);
}
/// The message handler for queues.
#[async_trait]
pub trait MessageHandler: Send + Sync {
/// Triggered for new incoming [`Message`]s.
async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>);
}
impl Copy for Status {}
impl Clone for Status {
fn clone(&self) -> Status {
*self
}
}
/// To validate the queue name.
pub(crate) fn name_validate(name: &str) -> bool {
let re = Regex::new(QUEUE_NAME_PATTERN).unwrap();
re.is_match(name)
}