1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use derive_more::From;
use either::Either;
use std::io;

use super::framed::DispatcherError;

/// Errors which can occur when attempting to handle mqtt connection.
#[derive(Debug)]
pub enum MqttError<E> {
    /// Publish handler service error
    Service(E),
    /// Protocol error
    Protocol(ProtocolError),
    /// Handshake timeout
    HandshakeTimeout,
    /// Peer disconnect
    Disconnected,
    /// Protocol specific unhandled error (for v3.1.1 only)
    V3ProtocolError,
}

/// Protocol level errors
#[derive(Debug)]
pub enum ProtocolError {
    /// Mqtt parse error
    Decode(DecodeError),
    /// Mqtt encode error
    Encode(EncodeError),
    /// Unexpected packet
    Unexpected(u8, &'static str),
    /// Packet id of publish ack packet does not match of send publish packet
    PacketIdMismatch,
    /// Topic alias is greater than max topic alias
    MaxTopicAlias,
    /// Number of in-flight messages exceeded
    ReceiveMaximumExceeded,
    /// Unknown topic alias
    UnknownTopicAlias,
    /// Keep alive timeout
    KeepAliveTimeout,
    /// Unexpected io error
    Io(io::Error),
}

impl<E> From<Either<E, ProtocolError>> for MqttError<E> {
    fn from(err: Either<E, ProtocolError>) -> Self {
        match err {
            Either::Left(e) => MqttError::Service(e),
            Either::Right(e) => MqttError::Protocol(e),
        }
    }
}

impl<E> From<Either<DecodeError, io::Error>> for MqttError<E> {
    fn from(err: Either<DecodeError, io::Error>) -> Self {
        match err {
            Either::Left(err) => MqttError::Protocol(ProtocolError::Decode(err)),
            Either::Right(err) => MqttError::Protocol(ProtocolError::Io(err)),
        }
    }
}

impl<E> From<Either<EncodeError, io::Error>> for MqttError<E> {
    fn from(err: Either<EncodeError, io::Error>) -> Self {
        match err {
            Either::Left(err) => MqttError::Protocol(ProtocolError::Encode(err)),
            Either::Right(err) => MqttError::Protocol(ProtocolError::Io(err)),
        }
    }
}

impl From<DispatcherError<crate::v3::codec::Codec>> for ProtocolError {
    fn from(err: DispatcherError<crate::v3::codec::Codec>) -> Self {
        match err {
            DispatcherError::KeepAlive => ProtocolError::KeepAliveTimeout,
            DispatcherError::Encoder(err) => ProtocolError::Encode(err),
            DispatcherError::Decoder(err) => ProtocolError::Decode(err),
            DispatcherError::Io(err) => ProtocolError::Io(err),
        }
    }
}

impl From<DispatcherError<crate::v5::codec::Codec>> for ProtocolError {
    fn from(err: DispatcherError<crate::v5::codec::Codec>) -> Self {
        match err {
            DispatcherError::KeepAlive => ProtocolError::KeepAliveTimeout,
            DispatcherError::Encoder(err) => ProtocolError::Encode(err),
            DispatcherError::Decoder(err) => ProtocolError::Decode(err),
            DispatcherError::Io(err) => ProtocolError::Io(err),
        }
    }
}

#[derive(Debug, From)]
pub enum DecodeError {
    InvalidProtocol,
    InvalidLength,
    MalformedPacket,
    UnsupportedProtocolLevel,
    ConnectReservedFlagSet,
    ConnAckReservedFlagSet,
    InvalidClientId,
    UnsupportedPacketType,
    // MQTT v3 only
    PacketIdRequired,
    MaxSizeExceeded,
    Utf8Error(std::str::Utf8Error),
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum EncodeError {
    InvalidLength,
    MalformedPacket,
    PacketIdRequired,
    UnsupportedVersion,
}

impl PartialEq for DecodeError {
    fn eq(&self, other: &Self) -> bool {
        match (self, other) {
            (DecodeError::InvalidProtocol, DecodeError::InvalidProtocol) => true,
            (DecodeError::InvalidLength, DecodeError::InvalidLength) => true,
            (DecodeError::UnsupportedProtocolLevel, DecodeError::UnsupportedProtocolLevel) => {
                true
            }
            (DecodeError::ConnectReservedFlagSet, DecodeError::ConnectReservedFlagSet) => true,
            (DecodeError::ConnAckReservedFlagSet, DecodeError::ConnAckReservedFlagSet) => true,
            (DecodeError::InvalidClientId, DecodeError::InvalidClientId) => true,
            (DecodeError::UnsupportedPacketType, DecodeError::UnsupportedPacketType) => true,
            (DecodeError::PacketIdRequired, DecodeError::PacketIdRequired) => true,
            (DecodeError::MaxSizeExceeded, DecodeError::MaxSizeExceeded) => true,
            (DecodeError::MalformedPacket, DecodeError::MalformedPacket) => true,
            (DecodeError::Utf8Error(_), _) => false,
            _ => false,
        }
    }
}