ntex_mqtt/v3/
handshake.rs1use std::{fmt, rc::Rc};
2
3use ntex_io::IoBoxed;
4use ntex_util::time::Seconds;
5
6use super::{codec as mqtt, shared::MqttShared, sink::MqttSink};
7
8const DEFAULT_KEEPALIVE: Seconds = Seconds(30);
9
10pub struct Handshake {
12 io: IoBoxed,
13 pkt: Box<mqtt::Connect>,
14 pkt_size: u32,
15 shared: Rc<MqttShared>,
16}
17
18impl Handshake {
19 pub(crate) fn new(
20 pkt: Box<mqtt::Connect>,
21 pkt_size: u32,
22 io: IoBoxed,
23 shared: Rc<MqttShared>,
24 ) -> Self {
25 Self { io, pkt, pkt_size, shared }
26 }
27
28 #[inline]
29 pub fn packet(&self) -> &mqtt::Connect {
30 &self.pkt
31 }
32
33 #[inline]
34 pub fn packet_mut(&mut self) -> &mut mqtt::Connect {
35 &mut self.pkt
36 }
37
38 #[inline]
39 pub fn packet_size(&self) -> u32 {
40 self.pkt_size
41 }
42
43 #[inline]
44 pub fn io(&self) -> &IoBoxed {
45 &self.io
46 }
47
48 pub fn sink(&self) -> MqttSink {
50 MqttSink::new(self.shared.clone())
51 }
52
53 pub fn ack<St>(self, st: St, session_present: bool) -> HandshakeAck<St> {
55 let Handshake { io, shared, pkt, .. } = self;
56 let keepalive = if pkt.keep_alive != 0 {
58 Seconds((pkt.keep_alive >> 1).checked_add(pkt.keep_alive).unwrap_or(u16::MAX))
59 } else {
60 DEFAULT_KEEPALIVE
61 };
62 HandshakeAck {
63 io,
64 shared,
65 keepalive,
66 session_present,
67 session: Some(st),
68 max_send: None,
69 return_code: mqtt::ConnectAckReason::ConnectionAccepted,
70 }
71 }
72
73 pub fn identifier_rejected<St>(self) -> HandshakeAck<St> {
75 HandshakeAck {
76 io: self.io,
77 shared: self.shared,
78 session: None,
79 session_present: false,
80 keepalive: DEFAULT_KEEPALIVE,
81 max_send: None,
82 return_code: mqtt::ConnectAckReason::IdentifierRejected,
83 }
84 }
85
86 pub fn bad_username_or_pwd<St>(self) -> HandshakeAck<St> {
88 HandshakeAck {
89 io: self.io,
90 shared: self.shared,
91 session: None,
92 session_present: false,
93 max_send: None,
94 keepalive: DEFAULT_KEEPALIVE,
95 return_code: mqtt::ConnectAckReason::BadUserNameOrPassword,
96 }
97 }
98
99 pub fn not_authorized<St>(self) -> HandshakeAck<St> {
101 HandshakeAck {
102 io: self.io,
103 shared: self.shared,
104 session: None,
105 session_present: false,
106 max_send: None,
107 keepalive: DEFAULT_KEEPALIVE,
108 return_code: mqtt::ConnectAckReason::NotAuthorized,
109 }
110 }
111
112 pub fn service_unavailable<St>(self) -> HandshakeAck<St> {
114 HandshakeAck {
115 io: self.io,
116 shared: self.shared,
117 session: None,
118 session_present: false,
119 max_send: None,
120 keepalive: DEFAULT_KEEPALIVE,
121 return_code: mqtt::ConnectAckReason::ServiceUnavailable,
122 }
123 }
124}
125
126impl fmt::Debug for Handshake {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 self.pkt.fmt(f)
129 }
130}
131
132pub struct HandshakeAck<St> {
134 pub(crate) io: IoBoxed,
135 pub(crate) session: Option<St>,
136 pub(crate) session_present: bool,
137 pub(crate) return_code: mqtt::ConnectAckReason,
138 pub(crate) shared: Rc<MqttShared>,
139 pub(crate) keepalive: Seconds,
140 pub(crate) max_send: Option<u16>,
141}
142
143impl<St> HandshakeAck<St> {
144 pub fn idle_timeout(mut self, timeout: Seconds) -> Self {
148 self.keepalive = timeout;
149 self
150 }
151
152 pub fn max_send(mut self, val: u16) -> Self {
156 self.max_send = Some(val);
157 self
158 }
159}