rumqttc_dev_patched/
notice.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use tokio::sync::oneshot;

use crate::{
    v5::mqttbytes::v5::{
        PubAckReason, PubCompReason, PubRecReason, SubscribeReasonCode as V5SubscribeReasonCode,
        UnsubAckReason,
    },
    SubscribeReasonCode,
};

#[derive(Debug, thiserror::Error)]
pub enum NoticeError {
    #[error("Eventloop dropped Sender")]
    Recv,
    #[error(" v4 Subscription Failure Reason Code: {0:?}")]
    V4Subscribe(SubscribeReasonCode),
    #[error(" v5 Subscription Failure Reason Code: {0:?}")]
    V5Subscribe(V5SubscribeReasonCode),
    #[error(" v5 Unsubscription Failure Reason: {0:?}")]
    V5Unsubscribe(UnsubAckReason),
    #[error(" v5 Publish Ack Failure Reason Code: {0:?}")]
    V5PubAck(PubAckReason),
    #[error(" v5 Publish Rec Failure Reason Code: {0:?}")]
    V5PubRec(PubRecReason),
    #[error(" v5 Publish Comp Failure Reason Code: {0:?}")]
    V5PubComp(PubCompReason),
    #[error(" Dropped due to session reconnect with previous state expire/lost")]
    SessionReset,
}

impl From<oneshot::error::RecvError> for NoticeError {
    fn from(_: oneshot::error::RecvError) -> Self {
        Self::Recv
    }
}

type NoticeResult = Result<(), NoticeError>;

/// A token through which the user is notified of the publish/subscribe/unsubscribe packet being acked by the broker.
#[derive(Debug)]
pub struct NoticeFuture(pub(crate) oneshot::Receiver<NoticeResult>);

impl NoticeFuture {
    /// Wait for broker to acknowledge by blocking the current thread
    ///
    /// # Panics
    /// Panics if called in an async context
    pub fn wait(self) -> NoticeResult {
        self.0.blocking_recv()?
    }

    /// Await the packet acknowledgement from broker, without blocking the current thread
    pub async fn wait_async(self) -> NoticeResult {
        self.0.await?
    }

    /// Attempts to check if the broker acknowledged the packet, without blocking the current thread
    /// or consuming the notice.
    ///
    /// It will return [`None`] if the packet wasn't acknowledged.
    ///
    /// Multiple calls to this functions can fail with [`NoticeError::Recv`] if the notice was
    /// already waited and the packet was already acknowledged and [`Some`] value was returned.
    pub fn try_wait(&mut self) -> Option<NoticeResult> {
        match self.0.try_recv() {
            Ok(res) => Some(res),
            Err(oneshot::error::TryRecvError::Closed) => Some(Err(NoticeError::Recv)),
            Err(oneshot::error::TryRecvError::Empty) => None,
        }
    }
}

#[derive(Debug)]
pub struct NoticeTx(pub(crate) oneshot::Sender<NoticeResult>);

impl NoticeTx {
    pub fn new() -> (Self, NoticeFuture) {
        let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();

        (NoticeTx(notice_tx), NoticeFuture(notice_rx))
    }

    pub fn success(self) {
        _ = self.0.send(Ok(()));
    }

    pub fn error(self, e: NoticeError) {
        _ = self.0.send(Err(e));
    }
}