ntex_mqtt/v3/
handshake.rs

1use std::{fmt, rc::Rc};
2
3use ntex_io::IoBoxed;
4use ntex_util::time::Seconds;
5
6use super::{codec as mqtt, shared::MqttShared, sink::MqttSink};
7
8const DEFAULT_KEEPALIVE: Seconds = Seconds(30);
9
10/// Connect message
11pub struct Handshake {
12    io: IoBoxed,
13    pkt: Box<mqtt::Connect>,
14    pkt_size: u32,
15    shared: Rc<MqttShared>,
16}
17
18impl Handshake {
19    pub(crate) fn new(
20        pkt: Box<mqtt::Connect>,
21        pkt_size: u32,
22        io: IoBoxed,
23        shared: Rc<MqttShared>,
24    ) -> Self {
25        Self { io, pkt, pkt_size, shared }
26    }
27
28    #[inline]
29    pub fn packet(&self) -> &mqtt::Connect {
30        &self.pkt
31    }
32
33    #[inline]
34    pub fn packet_mut(&mut self) -> &mut mqtt::Connect {
35        &mut self.pkt
36    }
37
38    #[inline]
39    pub fn packet_size(&self) -> u32 {
40        self.pkt_size
41    }
42
43    #[inline]
44    pub fn io(&self) -> &IoBoxed {
45        &self.io
46    }
47
48    /// Returns mqtt server sink
49    pub fn sink(&self) -> MqttSink {
50        MqttSink::new(self.shared.clone())
51    }
52
53    /// Ack handshake message and set state
54    pub fn ack<St>(self, st: St, session_present: bool) -> HandshakeAck<St> {
55        let Handshake { io, shared, pkt, .. } = self;
56        // [MQTT-3.1.2-24].
57        let keepalive = if pkt.keep_alive != 0 {
58            Seconds((pkt.keep_alive >> 1).checked_add(pkt.keep_alive).unwrap_or(u16::MAX))
59        } else {
60            DEFAULT_KEEPALIVE
61        };
62        HandshakeAck {
63            io,
64            shared,
65            keepalive,
66            session_present,
67            session: Some(st),
68            max_send: None,
69            return_code: mqtt::ConnectAckReason::ConnectionAccepted,
70        }
71    }
72
73    /// Create connect ack object with `identifier rejected` return code
74    pub fn identifier_rejected<St>(self) -> HandshakeAck<St> {
75        HandshakeAck {
76            io: self.io,
77            shared: self.shared,
78            session: None,
79            session_present: false,
80            keepalive: DEFAULT_KEEPALIVE,
81            max_send: None,
82            return_code: mqtt::ConnectAckReason::IdentifierRejected,
83        }
84    }
85
86    /// Create connect ack object with `bad user name or password` return code
87    pub fn bad_username_or_pwd<St>(self) -> HandshakeAck<St> {
88        HandshakeAck {
89            io: self.io,
90            shared: self.shared,
91            session: None,
92            session_present: false,
93            max_send: None,
94            keepalive: DEFAULT_KEEPALIVE,
95            return_code: mqtt::ConnectAckReason::BadUserNameOrPassword,
96        }
97    }
98
99    /// Create connect ack object with `not authorized` return code
100    pub fn not_authorized<St>(self) -> HandshakeAck<St> {
101        HandshakeAck {
102            io: self.io,
103            shared: self.shared,
104            session: None,
105            session_present: false,
106            max_send: None,
107            keepalive: DEFAULT_KEEPALIVE,
108            return_code: mqtt::ConnectAckReason::NotAuthorized,
109        }
110    }
111
112    /// Create connect ack object with `service unavailable` return code
113    pub fn service_unavailable<St>(self) -> HandshakeAck<St> {
114        HandshakeAck {
115            io: self.io,
116            shared: self.shared,
117            session: None,
118            session_present: false,
119            max_send: None,
120            keepalive: DEFAULT_KEEPALIVE,
121            return_code: mqtt::ConnectAckReason::ServiceUnavailable,
122        }
123    }
124}
125
126impl fmt::Debug for Handshake {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        self.pkt.fmt(f)
129    }
130}
131
132/// Ack connect message
133pub struct HandshakeAck<St> {
134    pub(crate) io: IoBoxed,
135    pub(crate) session: Option<St>,
136    pub(crate) session_present: bool,
137    pub(crate) return_code: mqtt::ConnectAckReason,
138    pub(crate) shared: Rc<MqttShared>,
139    pub(crate) keepalive: Seconds,
140    pub(crate) max_send: Option<u16>,
141}
142
143impl<St> HandshakeAck<St> {
144    /// Set idle time-out for the connection in seconds
145    ///
146    /// By default idle time-out is set to 30 seconds.
147    pub fn idle_timeout(mut self, timeout: Seconds) -> Self {
148        self.keepalive = timeout;
149        self
150    }
151
152    /// Number of outgoing concurrent messages.
153    ///
154    /// By default outgoing is set to 16 messages
155    pub fn max_send(mut self, val: u16) -> Self {
156        self.max_send = Some(val);
157        self
158    }
159}