libp2p_swarm/behaviour/
toggle.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::task::{Context, Poll};
22
23use either::Either;
24use futures::future;
25use libp2p_core::{transport::PortUse, upgrade::DeniedUpgrade, Endpoint, Multiaddr};
26use libp2p_identity::PeerId;
27
28use crate::{
29    behaviour::FromSwarm,
30    connection::ConnectionId,
31    handler::{
32        AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent,
33        DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError,
34        SubstreamProtocol,
35    },
36    upgrade::SendWrapper,
37    ConnectionDenied, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
38};
39
40/// Implementation of `NetworkBehaviour` that can be either in the disabled or enabled state.
41///
42/// The state can only be chosen at initialization.
43pub struct Toggle<TBehaviour> {
44    inner: Option<TBehaviour>,
45}
46
47impl<TBehaviour> Toggle<TBehaviour> {
48    /// Returns `true` if `Toggle` is enabled and `false` if it's disabled.
49    pub fn is_enabled(&self) -> bool {
50        self.inner.is_some()
51    }
52
53    /// Returns a reference to the inner `NetworkBehaviour`.
54    pub fn as_ref(&self) -> Option<&TBehaviour> {
55        self.inner.as_ref()
56    }
57
58    /// Returns a mutable reference to the inner `NetworkBehaviour`.
59    pub fn as_mut(&mut self) -> Option<&mut TBehaviour> {
60        self.inner.as_mut()
61    }
62}
63
64impl<TBehaviour> From<Option<TBehaviour>> for Toggle<TBehaviour> {
65    fn from(inner: Option<TBehaviour>) -> Self {
66        Toggle { inner }
67    }
68}
69
70impl<TBehaviour> NetworkBehaviour for Toggle<TBehaviour>
71where
72    TBehaviour: NetworkBehaviour,
73{
74    type ConnectionHandler = ToggleConnectionHandler<THandler<TBehaviour>>;
75    type ToSwarm = TBehaviour::ToSwarm;
76
77    fn handle_pending_inbound_connection(
78        &mut self,
79        connection_id: ConnectionId,
80        local_addr: &Multiaddr,
81        remote_addr: &Multiaddr,
82    ) -> Result<(), ConnectionDenied> {
83        let inner = match self.inner.as_mut() {
84            None => return Ok(()),
85            Some(inner) => inner,
86        };
87
88        inner.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
89
90        Ok(())
91    }
92
93    fn handle_established_inbound_connection(
94        &mut self,
95        connection_id: ConnectionId,
96        peer: PeerId,
97        local_addr: &Multiaddr,
98        remote_addr: &Multiaddr,
99    ) -> Result<THandler<Self>, ConnectionDenied> {
100        let inner = match self.inner.as_mut() {
101            None => return Ok(ToggleConnectionHandler { inner: None }),
102            Some(inner) => inner,
103        };
104
105        let handler = inner.handle_established_inbound_connection(
106            connection_id,
107            peer,
108            local_addr,
109            remote_addr,
110        )?;
111
112        Ok(ToggleConnectionHandler {
113            inner: Some(handler),
114        })
115    }
116
117    fn handle_pending_outbound_connection(
118        &mut self,
119        connection_id: ConnectionId,
120        maybe_peer: Option<PeerId>,
121        addresses: &[Multiaddr],
122        effective_role: Endpoint,
123    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
124        let inner = match self.inner.as_mut() {
125            None => return Ok(vec![]),
126            Some(inner) => inner,
127        };
128
129        let addresses = inner.handle_pending_outbound_connection(
130            connection_id,
131            maybe_peer,
132            addresses,
133            effective_role,
134        )?;
135
136        Ok(addresses)
137    }
138
139    fn handle_established_outbound_connection(
140        &mut self,
141        connection_id: ConnectionId,
142        peer: PeerId,
143        addr: &Multiaddr,
144        role_override: Endpoint,
145        port_use: PortUse,
146    ) -> Result<THandler<Self>, ConnectionDenied> {
147        let inner = match self.inner.as_mut() {
148            None => return Ok(ToggleConnectionHandler { inner: None }),
149            Some(inner) => inner,
150        };
151
152        let handler = inner.handle_established_outbound_connection(
153            connection_id,
154            peer,
155            addr,
156            role_override,
157            port_use,
158        )?;
159
160        Ok(ToggleConnectionHandler {
161            inner: Some(handler),
162        })
163    }
164
165    fn on_swarm_event(&mut self, event: FromSwarm) {
166        if let Some(behaviour) = &mut self.inner {
167            behaviour.on_swarm_event(event);
168        }
169    }
170
171    fn on_connection_handler_event(
172        &mut self,
173        peer_id: PeerId,
174        connection_id: ConnectionId,
175        event: THandlerOutEvent<Self>,
176    ) {
177        if let Some(behaviour) = &mut self.inner {
178            behaviour.on_connection_handler_event(peer_id, connection_id, event)
179        }
180    }
181
182    fn poll(
183        &mut self,
184        cx: &mut Context<'_>,
185    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
186        if let Some(inner) = self.inner.as_mut() {
187            inner.poll(cx)
188        } else {
189            Poll::Pending
190        }
191    }
192}
193
194/// Implementation of [`ConnectionHandler`] that can be in the disabled state.
195pub struct ToggleConnectionHandler<TInner> {
196    inner: Option<TInner>,
197}
198
199impl<TInner> ToggleConnectionHandler<TInner>
200where
201    TInner: ConnectionHandler,
202{
203    #[expect(deprecated)] // TODO: Remove when {In, Out}boundOpenInfo is fully removed.
204    fn on_fully_negotiated_inbound(
205        &mut self,
206        FullyNegotiatedInbound {
207            protocol: out,
208            info,
209        }: FullyNegotiatedInbound<
210            <Self as ConnectionHandler>::InboundProtocol,
211            <Self as ConnectionHandler>::InboundOpenInfo,
212        >,
213    ) {
214        let out = match out {
215            future::Either::Left(out) => out,
216            // TODO: remove when Rust 1.82 is MSRV
217            #[allow(unreachable_patterns)]
218            future::Either::Right(v) => libp2p_core::util::unreachable(v),
219        };
220
221        if let Either::Left(info) = info {
222            self.inner
223                .as_mut()
224                .expect("Can't receive an inbound substream if disabled; QED")
225                .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
226                    FullyNegotiatedInbound {
227                        protocol: out,
228                        info,
229                    },
230                ));
231        } else {
232            panic!("Unexpected Either::Right in enabled `on_fully_negotiated_inbound`.")
233        }
234    }
235    #[expect(deprecated)] // TODO: Remove when {In, Out}boundOpenInfo is fully removed.
236    fn on_listen_upgrade_error(
237        &mut self,
238        ListenUpgradeError { info, error: err }: ListenUpgradeError<
239            <Self as ConnectionHandler>::InboundOpenInfo,
240            <Self as ConnectionHandler>::InboundProtocol,
241        >,
242    ) {
243        let (inner, info) = match (self.inner.as_mut(), info) {
244            (Some(inner), Either::Left(info)) => (inner, info),
245            // Ignore listen upgrade errors in disabled state.
246            (None, Either::Right(())) => return,
247            (Some(_), Either::Right(())) => panic!(
248                "Unexpected `Either::Right` inbound info through \
249                 `on_listen_upgrade_error` in enabled state.",
250            ),
251            (None, Either::Left(_)) => panic!(
252                "Unexpected `Either::Left` inbound info through \
253                 `on_listen_upgrade_error` in disabled state.",
254            ),
255        };
256
257        let err = match err {
258            Either::Left(e) => e,
259            // TODO: remove when Rust 1.82 is MSRV
260            #[allow(unreachable_patterns)]
261            Either::Right(v) => libp2p_core::util::unreachable(v),
262        };
263
264        inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
265            info,
266            error: err,
267        }));
268    }
269}
270
271#[expect(deprecated)] // TODO: Remove when {In, Out}boundOpenInfo is fully removed.
272impl<TInner> ConnectionHandler for ToggleConnectionHandler<TInner>
273where
274    TInner: ConnectionHandler,
275{
276    type FromBehaviour = TInner::FromBehaviour;
277    type ToBehaviour = TInner::ToBehaviour;
278    type InboundProtocol = Either<SendWrapper<TInner::InboundProtocol>, SendWrapper<DeniedUpgrade>>;
279    type OutboundProtocol = TInner::OutboundProtocol;
280    type OutboundOpenInfo = TInner::OutboundOpenInfo;
281    type InboundOpenInfo = Either<TInner::InboundOpenInfo, ()>;
282
283    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
284        if let Some(inner) = self.inner.as_ref() {
285            inner
286                .listen_protocol()
287                .map_upgrade(|u| Either::Left(SendWrapper(u)))
288                .map_info(Either::Left)
289        } else {
290            SubstreamProtocol::new(Either::Right(SendWrapper(DeniedUpgrade)), Either::Right(()))
291        }
292    }
293
294    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
295        self.inner
296            .as_mut()
297            .expect("Can't receive events if disabled; QED")
298            .on_behaviour_event(event)
299    }
300
301    fn connection_keep_alive(&self) -> bool {
302        self.inner
303            .as_ref()
304            .map(|h| h.connection_keep_alive())
305            .unwrap_or(false)
306    }
307
308    fn poll(
309        &mut self,
310        cx: &mut Context<'_>,
311    ) -> Poll<
312        ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
313    > {
314        if let Some(inner) = self.inner.as_mut() {
315            inner.poll(cx)
316        } else {
317            Poll::Pending
318        }
319    }
320
321    fn on_connection_event(
322        &mut self,
323        event: ConnectionEvent<
324            Self::InboundProtocol,
325            Self::OutboundProtocol,
326            Self::InboundOpenInfo,
327            Self::OutboundOpenInfo,
328        >,
329    ) {
330        match event {
331            ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
332                self.on_fully_negotiated_inbound(fully_negotiated_inbound)
333            }
334            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
335                protocol: out,
336                info,
337            }) => self
338                .inner
339                .as_mut()
340                .expect("Can't receive an outbound substream if disabled; QED")
341                .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
342                    FullyNegotiatedOutbound {
343                        protocol: out,
344                        info,
345                    },
346                )),
347            ConnectionEvent::AddressChange(address_change) => {
348                if let Some(inner) = self.inner.as_mut() {
349                    inner.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
350                        new_address: address_change.new_address,
351                    }));
352                }
353            }
354            ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => self
355                .inner
356                .as_mut()
357                .expect("Can't receive an outbound substream if disabled; QED")
358                .on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
359                    info,
360                    error: err,
361                })),
362            ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
363                self.on_listen_upgrade_error(listen_upgrade_error)
364            }
365            ConnectionEvent::LocalProtocolsChange(change) => {
366                if let Some(inner) = self.inner.as_mut() {
367                    inner.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
368                }
369            }
370            ConnectionEvent::RemoteProtocolsChange(change) => {
371                if let Some(inner) = self.inner.as_mut() {
372                    inner.on_connection_event(ConnectionEvent::RemoteProtocolsChange(change));
373                }
374            }
375        }
376    }
377
378    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
379        let Some(inner) = self.inner.as_mut() else {
380            return Poll::Ready(None);
381        };
382
383        inner.poll_close(cx)
384    }
385}