ntex_mqtt/
error.rs

1use std::{fmt, io, num::NonZeroU16};
2
3use ntex_util::future::Either;
4
5use crate::v5::codec::DisconnectReasonCode;
6
7/// Errors which can occur when attempting to handle mqtt connection.
8#[derive(Debug, thiserror::Error)]
9pub enum MqttError<E> {
10    /// Publish handler service error
11    #[error("Service error")]
12    Service(E),
13    /// Handshake error
14    #[error("Mqtt handshake error: {}", _0)]
15    Handshake(#[from] HandshakeError<E>),
16}
17
18/// Errors which can occur during mqtt connection handshake.
19#[derive(Debug, thiserror::Error)]
20pub enum HandshakeError<E> {
21    /// Handshake service error
22    #[error("Handshake service error")]
23    Service(E),
24    /// Protocol error
25    #[error("Mqtt protocol error: {}", _0)]
26    Protocol(#[from] ProtocolError),
27    /// Handshake timeout
28    #[error("Handshake timeout")]
29    Timeout,
30    /// Peer disconnect
31    #[error("Peer is disconnected, error: {:?}", _0)]
32    Disconnected(Option<io::Error>),
33}
34
35/// Errors related to payload processing
36#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
37pub enum PayloadError {
38    /// Protocol error
39    #[error("{0}")]
40    Protocol(#[from] ProtocolError),
41    /// Peer is disconnected
42    #[error("Peer is disconnected")]
43    Disconnected,
44}
45
46/// Protocol level errors
47#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
48pub enum ProtocolError {
49    /// MQTT decoding error
50    #[error("Decoding error: {0:?}")]
51    Decode(#[from] DecodeError),
52    /// MQTT encoding error
53    #[error("Encoding error: {0:?}")]
54    Encode(#[from] EncodeError),
55    /// Peer violated MQTT protocol specification
56    #[error("Protocol violation: {0}")]
57    ProtocolViolation(#[from] ProtocolViolationError),
58    /// Keep alive timeout
59    #[error("Keep Alive timeout")]
60    KeepAliveTimeout,
61    /// Read frame timeout
62    #[error("Read frame timeout")]
63    ReadTimeout,
64}
65
66#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
67#[error(transparent)]
68pub struct ProtocolViolationError {
69    inner: ViolationInner,
70}
71
72#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
73enum ViolationInner {
74    #[error("{message}")]
75    Common { reason: DisconnectReasonCode, message: &'static str },
76    #[error("{message}; received packet with type `{packet_type:b}`")]
77    UnexpectedPacket { packet_type: u8, message: &'static str },
78}
79
80impl ProtocolViolationError {
81    pub(crate) fn reason(&self) -> DisconnectReasonCode {
82        match self.inner {
83            ViolationInner::Common { reason, .. } => reason,
84            ViolationInner::UnexpectedPacket { .. } => DisconnectReasonCode::ProtocolError,
85        }
86    }
87}
88
89impl ProtocolError {
90    pub(crate) fn violation(reason: DisconnectReasonCode, message: &'static str) -> Self {
91        Self::ProtocolViolation(ProtocolViolationError {
92            inner: ViolationInner::Common { reason, message },
93        })
94    }
95    pub fn generic_violation(message: &'static str) -> Self {
96        Self::violation(DisconnectReasonCode::ProtocolError, message)
97    }
98
99    pub(crate) fn unexpected_packet(packet_type: u8, message: &'static str) -> ProtocolError {
100        Self::ProtocolViolation(ProtocolViolationError {
101            inner: ViolationInner::UnexpectedPacket { packet_type, message },
102        })
103    }
104    pub(crate) fn packet_id_mismatch() -> Self {
105        Self::generic_violation(
106            "Packet id of PUBACK packet does not match expected next value according to sending order of PUBLISH packets [MQTT-4.6.0-2]"
107        )
108    }
109}
110
111impl<E> From<io::Error> for MqttError<E> {
112    fn from(err: io::Error) -> Self {
113        MqttError::Handshake(HandshakeError::Disconnected(Some(err)))
114    }
115}
116
117impl<E> From<Either<io::Error, io::Error>> for MqttError<E> {
118    fn from(err: Either<io::Error, io::Error>) -> Self {
119        MqttError::Handshake(HandshakeError::Disconnected(Some(err.into_inner())))
120    }
121}
122
123impl<E> From<EncodeError> for MqttError<E> {
124    fn from(err: EncodeError) -> Self {
125        MqttError::Handshake(HandshakeError::Protocol(ProtocolError::Encode(err)))
126    }
127}
128
129impl<E> From<Either<DecodeError, io::Error>> for HandshakeError<E> {
130    fn from(err: Either<DecodeError, io::Error>) -> Self {
131        match err {
132            Either::Left(err) => HandshakeError::Protocol(ProtocolError::Decode(err)),
133            Either::Right(err) => HandshakeError::Disconnected(Some(err)),
134        }
135    }
136}
137
138#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
139pub enum DecodeError {
140    #[error("Invalid protocol")]
141    InvalidProtocol,
142    #[error("Invalid length")]
143    InvalidLength,
144    #[error("Malformed packet")]
145    MalformedPacket,
146    #[error("Unsupported protocol level")]
147    UnsupportedProtocolLevel,
148    #[error("Connect frame's reserved flag is set")]
149    ConnectReservedFlagSet,
150    #[error("ConnectAck frame's reserved flag is set")]
151    ConnAckReservedFlagSet,
152    #[error("Invalid client id")]
153    InvalidClientId,
154    #[error("Unsupported packet type")]
155    UnsupportedPacketType,
156    // MQTT v3 only
157    #[error("Packet id is required")]
158    PacketIdRequired,
159    #[error("Max size exceeded")]
160    MaxSizeExceeded,
161    #[error("utf8 error")]
162    Utf8Error,
163    #[error("Unexpected payload")]
164    UnexpectedPayload,
165}
166
167#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, thiserror::Error)]
168pub enum EncodeError {
169    #[error("Packet is bigger than peer's Maximum Packet Size")]
170    OverMaxPacketSize,
171    #[error("Streaming payload is bigger than Publish packet definition")]
172    OverPublishSize,
173    #[error("Streaming payload is incomplete")]
174    PublishIncomplete,
175    #[error("Invalid length")]
176    InvalidLength,
177    #[error("Malformed packet")]
178    MalformedPacket,
179    #[error("Packet id is required")]
180    PacketIdRequired,
181    #[error("Unexpected payload")]
182    UnexpectedPayload,
183    #[error("Publish packet is not completed, expect payload")]
184    ExpectPayload,
185    #[error("Unsupported version")]
186    UnsupportedVersion,
187}
188
189#[derive(Debug, PartialEq, Eq, Copy, Clone, thiserror::Error)]
190pub enum SendPacketError {
191    /// Encoder error
192    #[error("Encoding error {:?}", _0)]
193    Encode(#[from] EncodeError),
194    /// Provided packet id is in use
195    #[error("Provided packet id is in use")]
196    PacketIdInUse(NonZeroU16),
197    /// Unexpected release publish
198    #[error("Unexpected publish release")]
199    UnexpectedRelease,
200    /// Streaming has been cancelled
201    #[error("Streaming has been cancelled")]
202    StreamingCancelled,
203    /// Peer disconnected
204    #[error("Peer is disconnected")]
205    Disconnected,
206}
207
208/// Errors which can occur when attempting to handle mqtt client connection.
209#[derive(Debug, thiserror::Error)]
210pub enum ClientError<T: fmt::Debug> {
211    /// Connect negotiation failed
212    #[error("Connect ack failed: {:?}", _0)]
213    Ack(T),
214    /// Protocol error
215    #[error("Protocol error: {:?}", _0)]
216    Protocol(#[from] ProtocolError),
217    /// Handshake timeout
218    #[error("Handshake timeout")]
219    HandshakeTimeout,
220    /// Peer disconnected
221    #[error("Peer disconnected")]
222    Disconnected(Option<std::io::Error>),
223    /// Connect error
224    #[error("Connect error: {}", _0)]
225    Connect(#[from] ntex_net::connect::ConnectError),
226}
227
228impl<T: fmt::Debug> From<EncodeError> for ClientError<T> {
229    fn from(err: EncodeError) -> Self {
230        ClientError::Protocol(ProtocolError::Encode(err))
231    }
232}
233
234impl<T: fmt::Debug> From<Either<DecodeError, std::io::Error>> for ClientError<T> {
235    fn from(err: Either<DecodeError, std::io::Error>) -> Self {
236        match err {
237            Either::Left(err) => ClientError::Protocol(ProtocolError::Decode(err)),
238            Either::Right(err) => ClientError::Disconnected(Some(err)),
239        }
240    }
241}