Crate actix_mqtt_client

Source
Expand description

§A MQTT client based on actix framework

The actix-mqtt-client crate is a mqtt client based on the actix framework

§Basic usage and example

First, create 2 actix actors, one for receiving publish messages, the other one for receiving error messages from the client, you can also create an optional actix actor for receiving the stop message:

pub struct ErrorActor;

impl actix::Actor for ErrorActor {
    type Context = actix::Context<Self>;
}

impl actix::Handler<ErrorMessage> for ErrorActor {
    type Result = ();
    fn handle(&mut self, error: ErrorMessage, _: &mut Self::Context) -> Self::Result {
        log::error!("{}", error.0);
    }
}

pub struct MessageActor;

impl actix::Actor for MessageActor {
    type Context = actix::Context<Self>;
}

impl actix::Handler<PublishMessage> for MessageActor {
    type Result = ();
    fn handle(
        &mut self,
        msg: PublishMessage,
        _: &mut Self::Context,
    ) -> Self::Result {
        log::info!(
            "Got message: id:{}, topic: {}, payload: {:?}",
            msg.id,
            msg.topic_name,
            msg.payload
        );
    }
}

Then, connect to the server(using tokio) and use the read and write part of the stream along with the actors to create a MqttClient:

use std::io::Error as IoError;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use actix::{Actor, Arbiter, System};
use env_logger;
use tokio::io::split;
use tokio::net::TcpStream;
use tokio::time::{sleep_until, Instant};
use actix_mqtt_client::client::{MqttClient, MqttOptions};

let sys = System::new();
let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
sys.block_on(async move {
   let result = async move {
       let stream = TcpStream::connect(socket_addr).await?;
       let (r, w) = split(stream);
       log::info!("TCP connected");
       let mut client = MqttClient::new(
           r,
           w,
           String::from("test"),
           MqttOptions::default(),
           MessageActor.start().recipient(),
           ErrorActor.start().recipient(),
           None,
       );
       client.connect().await?;
       // Waiting for the client to be connected
       while !client.is_connected().await? {
           let delay_time = Instant::now() + Duration::new(1, 0);
           sleep_until(delay_time).await;
       }
       log::info!("MQTT connected");
       log::info!("Subscribe");
       client
           .subscribe(String::from("test"), mqtt::QualityOfService::Level2)
           .await?;
       log::info!("Publish");
       client
           .publish(
               String::from("test"),
               mqtt::QualityOfService::Level0,
               Vec::from("test".as_bytes()),
           )
           .await?;
       log::info!("Wait for 10s");
       let delay_time = Instant::now() + Duration::new(10, 0);
       sleep_until(delay_time).await;
       client
           .publish(
               String::from("test"),
               mqtt::QualityOfService::Level1,
               Vec::from("test2".as_bytes()),
           )
           .await?;
       log::info!("Wait for 10s");
       let delay_time = Instant::now() + Duration::new(10, 0);
       sleep_until(delay_time).await;
       client
           .publish(
               String::from("test"),
               mqtt::QualityOfService::Level2,
               Vec::from("test3".as_bytes()),
           )
           .await?;
       log::info!("Wait for 10s");
       let delay_time = Instant::now() + Duration::new(10, 0);
       sleep_until(delay_time).await;
       log::info!("Disconnect");
       client.disconnect(false).await?;
       log::info!("Check if disconnect is successful");
       Ok(assert_eq!(true, client.is_disconnected())) as Result<(), IoError>
   }
   .await;
   result.unwrap()
});
sys.run().unwrap();

Re-exports§

pub use actix;
pub use futures;
pub use tokio;

Structs§

ErrorMessage
The actix message containing the error happens inside the client
MqttClient
The client for connecting to the MQTT server
MqttOptions
The options for setting up MQTT connection
PublishMessage
The actix message containing the payload of a MQTT publish packet
StopMessage
The actix message indicating that the client is about to stop

Enums§

QualityOfService