1use std::{fmt, io, num::NonZeroU16};
2
3use ntex_util::future::Either;
4
5use crate::v5::codec::DisconnectReasonCode;
6
7#[derive(Debug, thiserror::Error)]
9pub enum MqttError<E> {
10 #[error("Service error")]
12 Service(E),
13 #[error("Mqtt handshake error: {}", _0)]
15 Handshake(#[from] HandshakeError<E>),
16}
17
18#[derive(Debug, thiserror::Error)]
20pub enum HandshakeError<E> {
21 #[error("Handshake service error")]
23 Service(E),
24 #[error("Mqtt protocol error: {}", _0)]
26 Protocol(#[from] ProtocolError),
27 #[error("Handshake timeout")]
29 Timeout,
30 #[error("Peer is disconnected, error: {:?}", _0)]
32 Disconnected(Option<io::Error>),
33}
34
35#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
37pub enum PayloadError {
38 #[error("{0}")]
40 Protocol(#[from] ProtocolError),
41 #[error("Peer is disconnected")]
43 Disconnected,
44}
45
46#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
48pub enum ProtocolError {
49 #[error("Decoding error: {0:?}")]
51 Decode(#[from] DecodeError),
52 #[error("Encoding error: {0:?}")]
54 Encode(#[from] EncodeError),
55 #[error("Protocol violation: {0}")]
57 ProtocolViolation(#[from] ProtocolViolationError),
58 #[error("Keep Alive timeout")]
60 KeepAliveTimeout,
61 #[error("Read frame timeout")]
63 ReadTimeout,
64}
65
66#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
67#[error(transparent)]
68pub struct ProtocolViolationError {
69 inner: ViolationInner,
70}
71
72#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
73enum ViolationInner {
74 #[error("{message}")]
75 Common { reason: DisconnectReasonCode, message: &'static str },
76 #[error("{message}; received packet with type `{packet_type:b}`")]
77 UnexpectedPacket { packet_type: u8, message: &'static str },
78}
79
80impl ProtocolViolationError {
81 pub(crate) fn reason(&self) -> DisconnectReasonCode {
82 match self.inner {
83 ViolationInner::Common { reason, .. } => reason,
84 ViolationInner::UnexpectedPacket { .. } => DisconnectReasonCode::ProtocolError,
85 }
86 }
87}
88
89impl ProtocolError {
90 pub(crate) fn violation(reason: DisconnectReasonCode, message: &'static str) -> Self {
91 Self::ProtocolViolation(ProtocolViolationError {
92 inner: ViolationInner::Common { reason, message },
93 })
94 }
95 pub fn generic_violation(message: &'static str) -> Self {
96 Self::violation(DisconnectReasonCode::ProtocolError, message)
97 }
98
99 pub(crate) fn unexpected_packet(packet_type: u8, message: &'static str) -> ProtocolError {
100 Self::ProtocolViolation(ProtocolViolationError {
101 inner: ViolationInner::UnexpectedPacket { packet_type, message },
102 })
103 }
104 pub(crate) fn packet_id_mismatch() -> Self {
105 Self::generic_violation(
106 "Packet id of PUBACK packet does not match expected next value according to sending order of PUBLISH packets [MQTT-4.6.0-2]"
107 )
108 }
109}
110
111impl<E> From<io::Error> for MqttError<E> {
112 fn from(err: io::Error) -> Self {
113 MqttError::Handshake(HandshakeError::Disconnected(Some(err)))
114 }
115}
116
117impl<E> From<Either<io::Error, io::Error>> for MqttError<E> {
118 fn from(err: Either<io::Error, io::Error>) -> Self {
119 MqttError::Handshake(HandshakeError::Disconnected(Some(err.into_inner())))
120 }
121}
122
123impl<E> From<EncodeError> for MqttError<E> {
124 fn from(err: EncodeError) -> Self {
125 MqttError::Handshake(HandshakeError::Protocol(ProtocolError::Encode(err)))
126 }
127}
128
129impl<E> From<Either<DecodeError, io::Error>> for HandshakeError<E> {
130 fn from(err: Either<DecodeError, io::Error>) -> Self {
131 match err {
132 Either::Left(err) => HandshakeError::Protocol(ProtocolError::Decode(err)),
133 Either::Right(err) => HandshakeError::Disconnected(Some(err)),
134 }
135 }
136}
137
138#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
139pub enum DecodeError {
140 #[error("Invalid protocol")]
141 InvalidProtocol,
142 #[error("Invalid length")]
143 InvalidLength,
144 #[error("Malformed packet")]
145 MalformedPacket,
146 #[error("Unsupported protocol level")]
147 UnsupportedProtocolLevel,
148 #[error("Connect frame's reserved flag is set")]
149 ConnectReservedFlagSet,
150 #[error("ConnectAck frame's reserved flag is set")]
151 ConnAckReservedFlagSet,
152 #[error("Invalid client id")]
153 InvalidClientId,
154 #[error("Unsupported packet type")]
155 UnsupportedPacketType,
156 #[error("Packet id is required")]
158 PacketIdRequired,
159 #[error("Max size exceeded")]
160 MaxSizeExceeded,
161 #[error("utf8 error")]
162 Utf8Error,
163 #[error("Unexpected payload")]
164 UnexpectedPayload,
165}
166
167#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, thiserror::Error)]
168pub enum EncodeError {
169 #[error("Packet is bigger than peer's Maximum Packet Size")]
170 OverMaxPacketSize,
171 #[error("Streaming payload is bigger than Publish packet definition")]
172 OverPublishSize,
173 #[error("Streaming payload is incomplete")]
174 PublishIncomplete,
175 #[error("Invalid length")]
176 InvalidLength,
177 #[error("Malformed packet")]
178 MalformedPacket,
179 #[error("Packet id is required")]
180 PacketIdRequired,
181 #[error("Unexpected payload")]
182 UnexpectedPayload,
183 #[error("Publish packet is not completed, expect payload")]
184 ExpectPayload,
185 #[error("Unsupported version")]
186 UnsupportedVersion,
187}
188
189#[derive(Debug, PartialEq, Eq, Copy, Clone, thiserror::Error)]
190pub enum SendPacketError {
191 #[error("Encoding error {:?}", _0)]
193 Encode(#[from] EncodeError),
194 #[error("Provided packet id is in use")]
196 PacketIdInUse(NonZeroU16),
197 #[error("Unexpected publish release")]
199 UnexpectedRelease,
200 #[error("Streaming has been cancelled")]
202 StreamingCancelled,
203 #[error("Peer is disconnected")]
205 Disconnected,
206}
207
208#[derive(Debug, thiserror::Error)]
210pub enum ClientError<T: fmt::Debug> {
211 #[error("Connect ack failed: {:?}", _0)]
213 Ack(T),
214 #[error("Protocol error: {:?}", _0)]
216 Protocol(#[from] ProtocolError),
217 #[error("Handshake timeout")]
219 HandshakeTimeout,
220 #[error("Peer disconnected")]
222 Disconnected(Option<std::io::Error>),
223 #[error("Connect error: {}", _0)]
225 Connect(#[from] ntex_net::connect::ConnectError),
226}
227
228impl<T: fmt::Debug> From<EncodeError> for ClientError<T> {
229 fn from(err: EncodeError) -> Self {
230 ClientError::Protocol(ProtocolError::Encode(err))
231 }
232}
233
234impl<T: fmt::Debug> From<Either<DecodeError, std::io::Error>> for ClientError<T> {
235 fn from(err: Either<DecodeError, std::io::Error>) -> Self {
236 match err {
237 Either::Left(err) => ClientError::Protocol(ProtocolError::Decode(err)),
238 Either::Right(err) => ClientError::Disconnected(Some(err)),
239 }
240 }
241}