rumqttc_dev_patched/
notice.rsuse tokio::sync::oneshot;
use crate::{
v5::mqttbytes::v5::{
PubAckReason, PubCompReason, PubRecReason, SubscribeReasonCode as V5SubscribeReasonCode,
UnsubAckReason,
},
SubscribeReasonCode,
};
#[derive(Debug, thiserror::Error)]
pub enum NoticeError {
#[error("Eventloop dropped Sender")]
Recv,
#[error(" v4 Subscription Failure Reason Code: {0:?}")]
V4Subscribe(SubscribeReasonCode),
#[error(" v5 Subscription Failure Reason Code: {0:?}")]
V5Subscribe(V5SubscribeReasonCode),
#[error(" v5 Unsubscription Failure Reason: {0:?}")]
V5Unsubscribe(UnsubAckReason),
#[error(" v5 Publish Ack Failure Reason Code: {0:?}")]
V5PubAck(PubAckReason),
#[error(" v5 Publish Rec Failure Reason Code: {0:?}")]
V5PubRec(PubRecReason),
#[error(" v5 Publish Comp Failure Reason Code: {0:?}")]
V5PubComp(PubCompReason),
#[error(" Dropped due to session reconnect with previous state expire/lost")]
SessionReset,
}
impl From<oneshot::error::RecvError> for NoticeError {
fn from(_: oneshot::error::RecvError) -> Self {
Self::Recv
}
}
type NoticeResult = Result<(), NoticeError>;
#[derive(Debug)]
pub struct NoticeFuture(pub(crate) oneshot::Receiver<NoticeResult>);
impl NoticeFuture {
pub fn wait(self) -> NoticeResult {
self.0.blocking_recv()?
}
pub async fn wait_async(self) -> NoticeResult {
self.0.await?
}
pub fn try_wait(&mut self) -> Option<NoticeResult> {
match self.0.try_recv() {
Ok(res) => Some(res),
Err(oneshot::error::TryRecvError::Closed) => Some(Err(NoticeError::Recv)),
Err(oneshot::error::TryRecvError::Empty) => None,
}
}
}
#[derive(Debug)]
pub struct NoticeTx(pub(crate) oneshot::Sender<NoticeResult>);
impl NoticeTx {
pub fn new() -> (Self, NoticeFuture) {
let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
(NoticeTx(notice_tx), NoticeFuture(notice_rx))
}
pub fn success(self) {
_ = self.0.send(Ok(()));
}
pub fn error(self, e: NoticeError) {
_ = self.0.send(Err(e));
}
}