use std::time::Duration;
use super::mqttbytes::v5::{
Filter, PubAck, PubRec, Publish, PublishProperties, Subscribe, SubscribeProperties,
Unsubscribe, UnsubscribeProperties,
};
use super::mqttbytes::{valid_filter, QoS};
use super::{ConnectionError, Event, EventLoop, MqttOptions, Request};
use crate::{valid_topic, NoticeFuture, NoticeTx};
use bytes::Bytes;
use flume::{SendError, Sender, TrySendError};
use futures_util::FutureExt;
use tokio::runtime::{self, Runtime};
use tokio::time::timeout;
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error("Failed to send mqtt requests to eventloop")]
Request(Request),
#[error("Failed to send mqtt requests to eventloop")]
TryRequest(Request),
}
impl From<SendError<(NoticeTx, Request)>> for ClientError {
fn from(e: SendError<(NoticeTx, Request)>) -> Self {
Self::Request(e.into_inner().1)
}
}
impl From<TrySendError<(NoticeTx, Request)>> for ClientError {
fn from(e: TrySendError<(NoticeTx, Request)>) -> Self {
Self::TryRequest(e.into_inner().1)
}
}
#[derive(Clone, Debug)]
pub struct AsyncClient {
request_tx: Sender<(NoticeTx, Request)>,
}
impl AsyncClient {
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.requests_tx.clone();
let client = AsyncClient { request_tx };
(client, eventloop)
}
pub fn from_senders(request_tx: Sender<(NoticeTx, Request)>) -> AsyncClient {
AsyncClient { request_tx }
}
async fn handle_publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: Option<PublishProperties>,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
let topic = topic.into();
let mut publish = Publish::new(&topic, qos, payload, properties);
publish.retain = retain;
let request = Request::Publish(publish);
if !valid_topic(&topic) {
return Err(ClientError::Request(request));
}
let (notice_tx, future) = NoticeTx::new();
self.request_tx.send_async((notice_tx, request)).await?;
Ok(future)
}
pub async fn publish_with_properties<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: PublishProperties,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, Some(properties))
.await
}
pub async fn publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, None).await
}
fn handle_try_publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: Option<PublishProperties>,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
let topic = topic.into();
let mut publish = Publish::new(&topic, qos, payload, properties);
publish.retain = retain;
let request = Request::Publish(publish);
if !valid_topic(&topic) {
return Err(ClientError::TryRequest(request));
}
let (notice_tx, future) = NoticeTx::new();
self.request_tx.try_send((notice_tx, request))?;
Ok(future)
}
pub fn try_publish_with_properties<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: PublishProperties,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
self.handle_try_publish(topic, qos, retain, payload, Some(properties))
}
pub fn try_publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
self.handle_try_publish(topic, qos, retain, payload, None)
}
pub async fn ack(&self, publish: &Publish) -> Result<NoticeFuture, ClientError> {
let ack = get_ack_req(publish);
let (notice_tx, future) = NoticeTx::new();
if let Some(ack) = ack {
self.request_tx.send_async((notice_tx, ack)).await?;
}
Ok(future)
}
pub fn try_ack(&self, publish: &Publish) -> Result<NoticeFuture, ClientError> {
let ack = get_ack_req(publish);
let (notice_tx, future) = NoticeTx::new();
if let Some(ack) = ack {
self.request_tx.try_send((notice_tx, ack))?;
}
Ok(future)
}
async fn handle_publish_bytes<S>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: Bytes,
properties: Option<PublishProperties>,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
{
let topic = topic.into();
let mut publish = Publish::new(&topic, qos, payload, properties);
publish.retain = retain;
let request = Request::Publish(publish);
if !valid_topic(&topic) {
return Err(ClientError::TryRequest(request));
}
let (notice_tx, future) = NoticeTx::new();
self.request_tx.send_async((notice_tx, request)).await?;
Ok(future)
}
pub async fn publish_bytes_with_properties<S>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: Bytes,
properties: PublishProperties,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
{
self.handle_publish_bytes(topic, qos, retain, payload, Some(properties))
.await
}
pub async fn publish_bytes<S>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: Bytes,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
{
self.handle_publish_bytes(topic, qos, retain, payload, None)
.await
}
async fn handle_subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: Option<SubscribeProperties>,
) -> Result<NoticeFuture, ClientError> {
let filter = Filter::new(topic, qos);
let is_filter_valid = valid_filter(&filter.path);
let subscribe = Subscribe::new(filter, properties);
let request = Request::Subscribe(subscribe);
if !is_filter_valid {
return Err(ClientError::Request(request));
}
let (notice_tx, future) = NoticeTx::new();
self.request_tx.send_async((notice_tx, request)).await?;
Ok(future)
}
pub async fn subscribe_with_properties<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: SubscribeProperties,
) -> Result<NoticeFuture, ClientError> {
self.handle_subscribe(topic, qos, Some(properties)).await
}
pub async fn subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
) -> Result<NoticeFuture, ClientError> {
self.handle_subscribe(topic, qos, None).await
}
fn handle_try_subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: Option<SubscribeProperties>,
) -> Result<NoticeFuture, ClientError> {
let filter = Filter::new(topic, qos);
let is_filter_valid = valid_filter(&filter.path);
let subscribe = Subscribe::new(filter, properties);
let request = Request::Subscribe(subscribe);
if !is_filter_valid {
return Err(ClientError::TryRequest(request));
}
let (notice_tx, future) = NoticeTx::new();
self.request_tx.try_send((notice_tx, request))?;
Ok(future)
}
pub fn try_subscribe_with_properties<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: SubscribeProperties,
) -> Result<NoticeFuture, ClientError> {
self.handle_try_subscribe(topic, qos, Some(properties))
}
pub fn try_subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
) -> Result<NoticeFuture, ClientError> {
self.handle_try_subscribe(topic, qos, None)
}
async fn handle_subscribe_many<T>(
&self,
topics: T,
properties: Option<SubscribeProperties>,
) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter, properties);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
}
let (notice_tx, future) = NoticeTx::new();
self.request_tx.send_async((notice_tx, request)).await?;
Ok(future)
}
pub async fn subscribe_many_with_properties<T>(
&self,
topics: T,
properties: SubscribeProperties,
) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_subscribe_many(topics, Some(properties)).await
}
pub async fn subscribe_many<T>(&self, topics: T) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_subscribe_many(topics, None).await
}
fn handle_try_subscribe_many<T>(
&self,
topics: T,
properties: Option<SubscribeProperties>,
) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter, properties);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::TryRequest(request));
}
let (notice_tx, future) = NoticeTx::new();
self.request_tx.try_send((notice_tx, request))?;
Ok(future)
}
pub fn try_subscribe_many_with_properties<T>(
&self,
topics: T,
properties: SubscribeProperties,
) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_try_subscribe_many(topics, Some(properties))
}
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_try_subscribe_many(topics, None)
}
async fn handle_unsubscribe<S: Into<String>>(
&self,
topic: S,
properties: Option<UnsubscribeProperties>,
) -> Result<NoticeFuture, ClientError> {
let unsubscribe = Unsubscribe::new(topic, properties);
let request = Request::Unsubscribe(unsubscribe);
let (notice_tx, future) = NoticeTx::new();
self.request_tx.try_send((notice_tx, request))?;
Ok(future)
}
pub async fn unsubscribe_with_properties<S: Into<String>>(
&self,
topic: S,
properties: UnsubscribeProperties,
) -> Result<NoticeFuture, ClientError> {
self.handle_unsubscribe(topic, Some(properties)).await
}
pub async fn unsubscribe<S: Into<String>>(
&self,
topic: S,
) -> Result<NoticeFuture, ClientError> {
self.handle_unsubscribe(topic, None).await
}
fn handle_try_unsubscribe<S: Into<String>>(
&self,
topic: S,
properties: Option<UnsubscribeProperties>,
) -> Result<NoticeFuture, ClientError> {
let unsubscribe = Unsubscribe::new(topic, properties);
let request = Request::Unsubscribe(unsubscribe);
let (notice_tx, future) = NoticeTx::new();
self.request_tx.try_send((notice_tx, request))?;
Ok(future)
}
pub fn try_unsubscribe_with_properties<S: Into<String>>(
&self,
topic: S,
properties: UnsubscribeProperties,
) -> Result<NoticeFuture, ClientError> {
self.handle_try_unsubscribe(topic, Some(properties))
}
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<NoticeFuture, ClientError> {
self.handle_try_unsubscribe(topic, None)
}
pub async fn disconnect(&self) -> Result<NoticeFuture, ClientError> {
let request = Request::Disconnect;
let (notice_tx, future) = NoticeTx::new();
self.request_tx.send_async((notice_tx, request)).await?;
Ok(future)
}
pub fn try_disconnect(&self) -> Result<NoticeFuture, ClientError> {
let request = Request::Disconnect;
let (notice_tx, future) = NoticeTx::new();
self.request_tx.try_send((notice_tx, request))?;
Ok(future)
}
}
fn get_ack_req(publish: &Publish) -> Option<Request> {
let ack = match publish.qos {
QoS::AtMostOnce => return None,
QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid, None)),
QoS::ExactlyOnce => Request::PubRec(PubRec::new(publish.pkid, None)),
};
Some(ack)
}
#[derive(Clone)]
pub struct Client {
client: AsyncClient,
}
impl Client {
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
let (client, eventloop) = AsyncClient::new(options, cap);
let client = Client { client };
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let connection = Connection::new(eventloop, runtime);
(client, connection)
}
pub fn from_sender(request_tx: Sender<(NoticeTx, Request)>) -> Client {
Client {
client: AsyncClient::from_senders(request_tx),
}
}
fn handle_publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: Option<PublishProperties>,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
let topic = topic.into();
let mut publish = Publish::new(&topic, qos, payload, properties);
publish.retain = retain;
let request = Request::Publish(publish);
if !valid_topic(&topic) {
return Err(ClientError::Request(request));
}
let (notice_tx, future) = NoticeTx::new();
self.client.request_tx.send((notice_tx, request))?;
Ok(future)
}
pub fn publish_with_properties<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: PublishProperties,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, Some(properties))
}
pub fn publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, None)
}
pub fn try_publish_with_properties<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: PublishProperties,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
self.client
.try_publish_with_properties(topic, qos, retain, payload, properties)
}
pub fn try_publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
) -> Result<NoticeFuture, ClientError>
where
S: Into<String>,
P: Into<Bytes>,
{
self.client.try_publish(topic, qos, retain, payload)
}
pub fn ack(&self, publish: &Publish) -> Result<NoticeFuture, ClientError> {
let ack = get_ack_req(publish);
let (notice_tx, future) = NoticeTx::new();
if let Some(ack) = ack {
self.client.request_tx.send((notice_tx, ack))?;
}
Ok(future)
}
pub fn try_ack(&self, publish: &Publish) -> Result<NoticeFuture, ClientError> {
self.client.try_ack(publish)
}
fn handle_subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: Option<SubscribeProperties>,
) -> Result<NoticeFuture, ClientError> {
let filter = Filter::new(topic, qos);
let is_filter_valid = valid_filter(&filter.path);
let subscribe = Subscribe::new(filter, properties);
let request = Request::Subscribe(subscribe);
if !is_filter_valid {
return Err(ClientError::Request(request));
}
let (notice_tx, future) = NoticeTx::new();
self.client.request_tx.try_send((notice_tx, request))?;
Ok(future)
}
pub fn subscribe_with_properties<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: SubscribeProperties,
) -> Result<NoticeFuture, ClientError> {
self.handle_subscribe(topic, qos, Some(properties))
}
pub fn subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
) -> Result<NoticeFuture, ClientError> {
self.handle_subscribe(topic, qos, None)
}
pub fn try_subscribe_with_properties<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: SubscribeProperties,
) -> Result<NoticeFuture, ClientError> {
self.client
.try_subscribe_with_properties(topic, qos, properties)
}
pub fn try_subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
) -> Result<NoticeFuture, ClientError> {
self.client.try_subscribe(topic, qos)
}
fn handle_subscribe_many<T>(
&self,
topics: T,
properties: Option<SubscribeProperties>,
) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter, properties);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
}
let (notice_tx, future) = NoticeTx::new();
self.client.request_tx.try_send((notice_tx, request))?;
Ok(future)
}
pub fn subscribe_many_with_properties<T>(
&self,
topics: T,
properties: SubscribeProperties,
) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_subscribe_many(topics, Some(properties))
}
pub fn subscribe_many<T>(&self, topics: T) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_subscribe_many(topics, None)
}
pub fn try_subscribe_many_with_properties<T>(
&self,
topics: T,
properties: SubscribeProperties,
) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.client
.try_subscribe_many_with_properties(topics, properties)
}
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<NoticeFuture, ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.client.try_subscribe_many(topics)
}
fn handle_unsubscribe<S: Into<String>>(
&self,
topic: S,
properties: Option<UnsubscribeProperties>,
) -> Result<NoticeFuture, ClientError> {
let unsubscribe = Unsubscribe::new(topic, properties);
let request = Request::Unsubscribe(unsubscribe);
let (notice_tx, future) = NoticeTx::new();
self.client.request_tx.try_send((notice_tx, request))?;
Ok(future)
}
pub fn unsubscribe_with_properties<S: Into<String>>(
&self,
topic: S,
properties: UnsubscribeProperties,
) -> Result<NoticeFuture, ClientError> {
self.handle_unsubscribe(topic, Some(properties))
}
pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<NoticeFuture, ClientError> {
self.handle_unsubscribe(topic, None)
}
pub fn try_unsubscribe_with_properties<S: Into<String>>(
&self,
topic: S,
properties: UnsubscribeProperties,
) -> Result<NoticeFuture, ClientError> {
self.client
.try_unsubscribe_with_properties(topic, properties)
}
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<NoticeFuture, ClientError> {
self.client.try_unsubscribe(topic)
}
pub fn disconnect(&self) -> Result<NoticeFuture, ClientError> {
let request = Request::Disconnect;
let (notice_tx, future) = NoticeTx::new();
self.client.request_tx.send((notice_tx, request))?;
Ok(future)
}
pub fn try_disconnect(&self) -> Result<NoticeFuture, ClientError> {
self.client.try_disconnect()
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct RecvError;
#[derive(Debug, Eq, PartialEq)]
pub enum TryRecvError {
Disconnected,
Empty,
}
#[derive(Debug, Eq, PartialEq)]
pub enum RecvTimeoutError {
Disconnected,
Timeout,
}
pub struct Connection {
pub eventloop: EventLoop,
runtime: Runtime,
}
impl Connection {
fn new(eventloop: EventLoop, runtime: Runtime) -> Connection {
Connection { eventloop, runtime }
}
#[must_use = "Connection should be iterated over a loop to make progress"]
pub fn iter(&mut self) -> Iter<'_> {
Iter { connection: self }
}
pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
let f = self.eventloop.poll();
let event = self.runtime.block_on(f);
resolve_event(event).ok_or(RecvError)
}
pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
let f = self.eventloop.poll();
let _guard = self.runtime.enter();
let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
resolve_event(event).ok_or(TryRecvError::Disconnected)
}
pub fn recv_timeout(
&mut self,
duration: Duration,
) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
let f = self.eventloop.poll();
let event = self
.runtime
.block_on(async { timeout(duration, f).await })
.map_err(|_| RecvTimeoutError::Timeout)?;
resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
}
}
fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
match event {
Ok(v) => Some(Ok(v)),
Err(ConnectionError::RequestsDone) => {
trace!("Done with requests");
None
}
Err(e) => Some(Err(e)),
}
}
pub struct Iter<'a> {
connection: &'a mut Connection,
}
impl Iterator for Iter<'_> {
type Item = Result<Event, ConnectionError>;
fn next(&mut self) -> Option<Self::Item> {
self.connection.recv().ok()
}
}
#[cfg(test)]
mod test {
use crate::v5::mqttbytes::v5::LastWill;
use super::*;
#[test]
fn calling_iter_twice_on_connection_shouldnt_panic() {
use std::time::Duration;
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
.set_last_will(will);
let (_, mut connection) = Client::new(mqttoptions, 10);
let _ = connection.iter();
let _ = connection.iter();
}
#[test]
fn should_be_able_to_build_test_client_from_channel() {
let (tx, rx) = flume::bounded(1);
let client = Client::from_sender(tx);
client
.publish("hello/world", QoS::ExactlyOnce, false, "good bye")
.expect("Should be able to publish");
let _ = rx.try_recv().expect("Should have message");
}
}