1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
//! General purposed interfaces for message queues. Now we provide the following implementations:
//! - AMQP 0-9-1
//! - MQTT
//! By using these classes, you can configure queues with the following properties:
//! - Unicast or broadcast.
//! - Reliable or best-effort.
//! **Notes**
//! - MQTT uses **shared queues** to implement unicast.
//! - AMQP uses **confirm channels** to implement reliable publish, and MQTT uses **QoS 1** to
//! implement reliable publish/subscribe.
//! # Relationships of Connections and Queues
//! The term **connection** describes a TCP/TLS connection to the message broker.
//! The term **queue** describes a message queue or a topic within a connection.
//! You can use one connection to manage multiple queues, or one connection to manage one queue.
//! A queue can only be a receiver or a sender at a time.
//! ### Connections for sender/receiver queues with the same name
//! The sender and the receiver are usually different programs, there are two connections to hold
//! two queues.
//! For the special case that a program acts both the sender and the receiver using the same queue:
//! - The AMQP implementation uses one **Channel** for one queue, so the program can manages all
//! queues with one connection.
//! - The MQTT implementation **MUST** uses one connection for one queue, or both sender and
//! receiver will receive packets.
//! # Test
//! Please prepare a [RabbitMQ]( broker and a [EMQX](
//! broker at **localhost** for testing.
//! - To install using Docker:
//! $ docker run --rm --name rabbitmq -d -p 5672:5672 rabbitmq:management-alpine
//! $ docker run --rm --name emqx -d -p 1883:1883 emqx/emqx
//! Then run the test:
//! $ cargo test --test integration_test -- --nocapture
//! # Example
//! Run RabbitMQ and then run AMQP example:
//! $ cargo run --example simple
//! Run EMQX and then run MQTT example:
//! $ RUN_MQTT= cargo run --example simple
use std::{error::Error as StdError, fmt, sync::Arc};
use async_trait::async_trait;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
pub mod connection;
pub mod queue;
mod amqp;
mod mqtt;
pub use amqp::{AmqpConnection, AmqpConnectionOptions, AmqpQueue, AmqpQueueOptions};
pub use mqtt::{MqttConnection, MqttConnectionOptions, MqttQueue, MqttQueueOptions};
use queue::{EventHandler, GmqQueue, MessageHandler, Status};
/// general-mq error.
#[derive(Clone, Debug)]
pub enum Error {
/// The queue does not have [`MessageHandler`].
/// The connection is not connected or the queue (topic) is not
/// connected (declared/subscribed).
/// The queue is a receiver that cannot send messages.
pub enum Queue {
pub enum QueueOptions<'a> {
Amqp(AmqpQueueOptions, &'a AmqpConnection),
Mqtt(MqttQueueOptions, &'a MqttConnection),
/// Identifier length of inner handlers.
pub(crate) const ID_SIZE: usize = 24;
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::NoMsgHandler => write!(f, "no message handler"),
Error::NotConnected => write!(f, "not connected"),
Error::QueueIsReceiver => write!(f, "this queue is a receiver"),
impl StdError for Error {}
impl Queue {
pub fn new(opts: QueueOptions) -> Result<Self, String> {
match opts {
QueueOptions::Amqp(opts, conn) => Ok(Queue::Amqp(AmqpQueue::new(opts, conn)?)),
QueueOptions::Mqtt(opts, conn) => Ok(Queue::Mqtt(MqttQueue::new(opts, conn)?)),
impl GmqQueue for Queue {
fn name(&self) -> &str {
match self {
Queue::Amqp(q) =>,
Queue::Mqtt(q) =>,
fn is_recv(&self) -> bool {
match self {
Queue::Amqp(q) => q.is_recv(),
Queue::Mqtt(q) => q.is_recv(),
fn status(&self) -> Status {
match self {
Queue::Amqp(q) => q.status(),
Queue::Mqtt(q) => q.status(),
fn set_handler(&mut self, handler: Arc<dyn EventHandler>) {
match self {
Queue::Amqp(q) => q.set_handler(handler),
Queue::Mqtt(q) => q.set_handler(handler),
fn clear_handler(&mut self) {
match self {
Queue::Amqp(q) => q.clear_handler(),
Queue::Mqtt(q) => q.clear_handler(),
fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>) {
match self {
Queue::Amqp(q) => q.set_msg_handler(handler),
Queue::Mqtt(q) => q.set_msg_handler(handler),
fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
match self {
Queue::Amqp(q) => q.connect(),
Queue::Mqtt(q) => q.connect(),
async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
match self {
Queue::Amqp(q) => q.close().await,
Queue::Mqtt(q) => q.close().await,
async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
match self {
Queue::Amqp(q) => q.send_msg(payload).await,
Queue::Mqtt(q) => q.send_msg(payload).await,
/// Generate random alphanumeric with the specified length.
pub fn randomstring(len: usize) -> String {
let mut rng = thread_rng();
.map(|()| rng.sample(Alphanumeric))