sc_network/protocol/notifications/
handler.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Implementations of the `IntoConnectionHandler` and `ConnectionHandler` traits for both incoming
20//! and outgoing substreams for all gossiping protocols.
21//!
22//! This is the main implementation of `ConnectionHandler` in this crate, that handles all the
23//! gossiping protocols that are Substrate-related and outside of the scope of libp2p.
24//!
25//! # Usage
26//!
27//! From an API perspective, for each of its protocols, the [`NotifsHandler`] is always in one of
28//! the following state (see [`State`]):
29//!
30//! - Closed substream. This is the initial state.
31//! - Closed substream, but remote desires them to be open.
32//! - Open substream.
33//! - Open substream, but remote desires them to be closed.
34//!
35//! Each protocol in the [`NotifsHandler`] can spontaneously switch between these states:
36//!
37//! - "Closed substream" to "Closed substream but open desired". When that happens, a
38//! [`NotifsHandlerOut::OpenDesiredByRemote`] is emitted.
39//! - "Closed substream but open desired" to "Closed substream" (i.e. the remote has cancelled
40//! their request). When that happens, a [`NotifsHandlerOut::CloseDesired`] is emitted.
41//! - "Open substream" to "Open substream but close desired". When that happens, a
42//! [`NotifsHandlerOut::CloseDesired`] is emitted.
43//!
44//! The user can instruct a protocol in the `NotifsHandler` to switch from "closed" to "open" or
45//! vice-versa by sending either a [`NotifsHandlerIn::Open`] or a [`NotifsHandlerIn::Close`]. The
46//! `NotifsHandler` must answer with [`NotifsHandlerOut::OpenResultOk`] or
47//! [`NotifsHandlerOut::OpenResultErr`], or with [`NotifsHandlerOut::CloseResult`].
48//!
49//! When a [`NotifsHandlerOut::OpenResultOk`] is emitted, the substream is now in the open state.
50//! When a [`NotifsHandlerOut::OpenResultErr`] or [`NotifsHandlerOut::CloseResult`] is emitted,
51//! the `NotifsHandler` is now (or remains) in the closed state.
52//!
53//! When a [`NotifsHandlerOut::OpenDesiredByRemote`] is emitted, the user should always send back
54//! either a [`NotifsHandlerIn::Open`] or a [`NotifsHandlerIn::Close`].If this isn't done, the
55//! remote will be left in a pending state.
56//!
57//! It is illegal to send a [`NotifsHandlerIn::Open`] before a previously-emitted
58//! [`NotifsHandlerIn::Open`] has gotten an answer.
59
60use crate::{
61	protocol::notifications::upgrade::{
62		NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutSubstream,
63		UpgradeCollec,
64	},
65	service::metrics::NotificationMetrics,
66	types::ProtocolName,
67};
68
69use bytes::BytesMut;
70use futures::{
71	channel::mpsc,
72	lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
73	prelude::*,
74};
75use libp2p::{
76	swarm::{
77		handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream,
78		SubstreamProtocol,
79	},
80	PeerId,
81};
82use log::error;
83use parking_lot::{Mutex, RwLock};
84use std::{
85	collections::VecDeque,
86	mem,
87	pin::Pin,
88	sync::Arc,
89	task::{Context, Poll},
90	time::{Duration, Instant},
91};
92
93/// Number of pending notifications in asynchronous contexts.
94/// See [`NotificationsSink::reserve_notification`] for context.
95pub(crate) const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
96
97/// Number of pending notifications in synchronous contexts.
98const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
99
100/// Maximum duration to open a substream and receive the handshake message. After that, we
101/// consider that we failed to open the substream.
102const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
103
104/// After successfully establishing a connection with the remote, we keep the connection open for
105/// at least this amount of time in order to give the rest of the code the chance to notify us to
106/// open substreams.
107const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
108
109/// The actual handler once the connection has been established.
110///
111/// See the documentation at the module level for more information.
112pub struct NotifsHandler {
113	/// List of notification protocols, specified by the user at initialization.
114	protocols: Vec<Protocol>,
115
116	/// When the connection with the remote has been successfully established.
117	when_connection_open: Instant,
118
119	/// Remote we are connected to.
120	peer_id: PeerId,
121
122	/// Events to return in priority from `poll`.
123	events_queue: VecDeque<
124		ConnectionHandlerEvent<NotificationsOut, usize, NotifsHandlerOut, NotifsHandlerError>,
125	>,
126
127	/// Metrics.
128	metrics: Option<Arc<NotificationMetrics>>,
129}
130
131impl NotifsHandler {
132	/// Creates new [`NotifsHandler`].
133	pub fn new(
134		peer_id: PeerId,
135		protocols: Vec<ProtocolConfig>,
136		metrics: Option<NotificationMetrics>,
137	) -> Self {
138		Self {
139			protocols: protocols
140				.into_iter()
141				.map(|config| {
142					let in_upgrade = NotificationsIn::new(
143						config.name.clone(),
144						config.fallback_names.clone(),
145						config.max_notification_size,
146					);
147
148					Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } }
149				})
150				.collect(),
151			peer_id,
152			when_connection_open: Instant::now(),
153			events_queue: VecDeque::with_capacity(16),
154			metrics: metrics.map_or(None, |metrics| Some(Arc::new(metrics))),
155		}
156	}
157}
158
159/// Configuration for a notifications protocol.
160#[derive(Debug, Clone)]
161pub struct ProtocolConfig {
162	/// Name of the protocol.
163	pub name: ProtocolName,
164	/// Names of the protocol to use if the main one isn't available.
165	pub fallback_names: Vec<ProtocolName>,
166	/// Handshake of the protocol. The `RwLock` is locked every time a new substream is opened.
167	pub handshake: Arc<RwLock<Vec<u8>>>,
168	/// Maximum allowed size for a notification.
169	pub max_notification_size: u64,
170}
171
172/// Fields specific for each individual protocol.
173struct Protocol {
174	/// Other fields.
175	config: ProtocolConfig,
176
177	/// Prototype for the inbound upgrade.
178	in_upgrade: NotificationsIn,
179
180	/// Current state of the substreams for this protocol.
181	state: State,
182}
183
184/// See the module-level documentation to learn about the meaning of these variants.
185enum State {
186	/// Protocol is in the "Closed" state.
187	Closed {
188		/// True if an outgoing substream is still in the process of being opened.
189		pending_opening: bool,
190	},
191
192	/// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been
193	/// emitted.
194	OpenDesiredByRemote {
195		/// Substream opened by the remote and that hasn't been accepted/rejected yet.
196		in_substream: NotificationsInSubstream<Stream>,
197
198		/// See [`State::Closed::pending_opening`].
199		pending_opening: bool,
200	},
201
202	/// Protocol is in the "Closed" state, but has received a [`NotifsHandlerIn::Open`] and is
203	/// consequently trying to open the various notifications substreams.
204	///
205	/// A [`NotifsHandlerOut::OpenResultOk`] or a [`NotifsHandlerOut::OpenResultErr`] event must
206	/// be emitted when transitioning to respectively [`State::Open`] or [`State::Closed`].
207	Opening {
208		/// Substream opened by the remote. If `Some`, has been accepted.
209		in_substream: Option<NotificationsInSubstream<Stream>>,
210		/// Is the connection inbound.
211		inbound: bool,
212	},
213
214	/// Protocol is in the "Open" state.
215	Open {
216		/// Contains the two `Receiver`s connected to the [`NotificationsSink`] that has been
217		/// sent out. The notifications to send out can be pulled from this receivers.
218		/// We use two different channels in order to have two different channel sizes, but from
219		/// the receiving point of view, the two channels are the same.
220		/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
221		notifications_sink_rx: stream::Peekable<
222			stream::Select<
223				stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
224				stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
225			>,
226		>,
227
228		/// Outbound substream that has been accepted by the remote.
229		///
230		/// Always `Some` on transition to [`State::Open`]. Switched to `None` only if the remote
231		/// closed the substream. If `None`, a [`NotifsHandlerOut::CloseDesired`] event has been
232		/// emitted.
233		out_substream: Option<NotificationsOutSubstream<Stream>>,
234
235		/// Substream opened by the remote.
236		///
237		/// Contrary to the `out_substream` field, operations continue as normal even if the
238		/// substream has been closed by the remote. A `None` is treated the same way as if there
239		/// was an idle substream.
240		in_substream: Option<NotificationsInSubstream<Stream>>,
241	},
242}
243
244/// Event that can be received by a `NotifsHandler`.
245#[derive(Debug, Clone)]
246pub enum NotifsHandlerIn {
247	/// Instruct the handler to open the notification substreams.
248	///
249	/// Must always be answered by a [`NotifsHandlerOut::OpenResultOk`] or a
250	/// [`NotifsHandlerOut::OpenResultErr`] event.
251	///
252	/// Importantly, it is forbidden to send a [`NotifsHandlerIn::Open`] while a previous one is
253	/// already in the fly. It is however possible if a `Close` is still in the fly.
254	Open {
255		/// Index of the protocol in the list of protocols passed at initialization.
256		protocol_index: usize,
257	},
258
259	/// Instruct the handler to close the notification substreams, or reject any pending incoming
260	/// substream request.
261	///
262	/// Must always be answered by a [`NotifsHandlerOut::CloseResult`] event.
263	Close {
264		/// Index of the protocol in the list of protocols passed at initialization.
265		protocol_index: usize,
266	},
267}
268
269/// Event that can be emitted by a `NotifsHandler`.
270#[derive(Debug)]
271pub enum NotifsHandlerOut {
272	/// Acknowledges a [`NotifsHandlerIn::Open`].
273	OpenResultOk {
274		/// Index of the protocol in the list of protocols passed at initialization.
275		protocol_index: usize,
276		/// Name of the protocol that was actually negotiated, if the default one wasn't available.
277		negotiated_fallback: Option<ProtocolName>,
278		/// Handshake that was sent to us.
279		/// This is normally a "Status" message, but this out of the concern of this code.
280		received_handshake: Vec<u8>,
281		/// How notifications can be sent to this node.
282		notifications_sink: NotificationsSink,
283		/// Is the connection inbound.
284		inbound: bool,
285	},
286
287	/// Acknowledges a [`NotifsHandlerIn::Open`]. The remote has refused the attempt to open
288	/// notification substreams.
289	OpenResultErr {
290		/// Index of the protocol in the list of protocols passed at initialization.
291		protocol_index: usize,
292	},
293
294	/// Acknowledges a [`NotifsHandlerIn::Close`].
295	CloseResult {
296		/// Index of the protocol in the list of protocols passed at initialization.
297		protocol_index: usize,
298	},
299
300	/// The remote would like the substreams to be open. Send a [`NotifsHandlerIn::Open`] or a
301	/// [`NotifsHandlerIn::Close`] in order to either accept or deny this request. If a
302	/// [`NotifsHandlerIn::Open`] or [`NotifsHandlerIn::Close`] has been sent before and has not
303	/// yet been acknowledged by a matching [`NotifsHandlerOut`], then you don't need to a send
304	/// another [`NotifsHandlerIn`].
305	OpenDesiredByRemote {
306		/// Index of the protocol in the list of protocols passed at initialization.
307		protocol_index: usize,
308		/// Received handshake.
309		handshake: Vec<u8>,
310	},
311
312	/// The remote would like the substreams to be closed. Send a [`NotifsHandlerIn::Close`] in
313	/// order to close them. If a [`NotifsHandlerIn::Close`] has been sent before and has not yet
314	/// been acknowledged by a [`NotifsHandlerOut::CloseResult`], then you don't need to a send
315	/// another one.
316	CloseDesired {
317		/// Index of the protocol in the list of protocols passed at initialization.
318		protocol_index: usize,
319	},
320
321	/// Received a message on a custom protocol substream.
322	///
323	/// Can only happen when the handler is in the open state.
324	Notification {
325		/// Index of the protocol in the list of protocols passed at initialization.
326		protocol_index: usize,
327		/// Message that has been received.
328		message: BytesMut,
329	},
330}
331
332/// Sink connected directly to the node background task. Allows sending notifications to the peer.
333///
334/// Can be cloned in order to obtain multiple references to the substream of the same peer.
335#[derive(Debug, Clone)]
336pub struct NotificationsSink {
337	inner: Arc<NotificationsSinkInner>,
338	metrics: Option<Arc<NotificationMetrics>>,
339}
340
341impl NotificationsSink {
342	/// Create new [`NotificationsSink`].
343	/// NOTE: only used for testing but must be `pub` as other crates in `client/network` use this.
344	pub fn new(
345		peer_id: PeerId,
346	) -> (Self, mpsc::Receiver<NotificationsSinkMessage>, mpsc::Receiver<NotificationsSinkMessage>)
347	{
348		let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
349		let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
350		(
351			NotificationsSink {
352				inner: Arc::new(NotificationsSinkInner {
353					peer_id,
354					async_channel: FuturesMutex::new(async_tx),
355					sync_channel: Mutex::new(Some(sync_tx)),
356				}),
357				metrics: None,
358			},
359			async_rx,
360			sync_rx,
361		)
362	}
363
364	/// Get reference to metrics.
365	pub fn metrics(&self) -> &Option<Arc<NotificationMetrics>> {
366		&self.metrics
367	}
368}
369
370#[derive(Debug)]
371struct NotificationsSinkInner {
372	/// Target of the sink.
373	peer_id: PeerId,
374	/// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
375	async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
376	/// Sender to use in synchronous contexts. Uses a synchronous mutex.
377	/// Contains `None` if the channel was full at some point, in which case the channel will
378	/// be closed in the near future anyway.
379	/// This channel has a large capacity and is meant to be used in contexts where
380	/// back-pressure cannot be properly exerted.
381	/// It will be removed in a future version.
382	sync_channel: Mutex<Option<mpsc::Sender<NotificationsSinkMessage>>>,
383}
384
385/// Message emitted through the [`NotificationsSink`] and processed by the background task
386/// dedicated to the peer.
387#[derive(Debug, PartialEq, Eq)]
388pub enum NotificationsSinkMessage {
389	/// Message emitted by [`NotificationsSink::reserve_notification`] and
390	/// [`NotificationsSink::write_notification_now`].
391	Notification { message: Vec<u8> },
392
393	/// Must close the connection.
394	ForceClose,
395}
396
397impl NotificationsSink {
398	/// Returns the [`PeerId`] the sink is connected to.
399	pub fn peer_id(&self) -> &PeerId {
400		&self.inner.peer_id
401	}
402
403	/// Sends a notification to the peer.
404	///
405	/// If too many messages are already buffered, the notification is silently discarded and the
406	/// connection to the peer will be closed shortly after.
407	///
408	/// The protocol name is expected to be checked ahead of calling this method. It is a logic
409	/// error to send a notification using an unknown protocol.
410	///
411	/// This method will be removed in a future version.
412	pub fn send_sync_notification(&self, message: impl Into<Vec<u8>>) {
413		let mut lock = self.inner.sync_channel.lock();
414
415		if let Some(tx) = lock.as_mut() {
416			let message = message.into();
417			let result = tx.try_send(NotificationsSinkMessage::Notification { message });
418
419			if result.is_err() {
420				// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
421				// buffer, and therefore `try_send` will succeed.
422				let _result2 = tx.clone().try_send(NotificationsSinkMessage::ForceClose);
423				debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
424
425				// Destroy the sender in order to not send more `ForceClose` messages.
426				*lock = None;
427			}
428		}
429	}
430
431	/// Wait until the remote is ready to accept a notification.
432	///
433	/// Returns an error in the case where the connection is closed.
434	///
435	/// The protocol name is expected to be checked ahead of calling this method. It is a logic
436	/// error to send a notification using an unknown protocol.
437	pub async fn reserve_notification(&self) -> Result<Ready<'_>, ()> {
438		let mut lock = self.inner.async_channel.lock().await;
439
440		let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
441		if poll_ready.is_ok() {
442			Ok(Ready { lock })
443		} else {
444			Err(())
445		}
446	}
447}
448
449/// Notification slot is reserved and the notification can actually be sent.
450#[must_use]
451#[derive(Debug)]
452pub struct Ready<'a> {
453	/// Guarded channel. The channel inside is guaranteed to not be full.
454	lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
455}
456
457impl<'a> Ready<'a> {
458	/// Consumes this slots reservation and actually queues the notification.
459	///
460	/// Returns an error if the substream has been closed.
461	pub fn send(mut self, notification: impl Into<Vec<u8>>) -> Result<(), ()> {
462		self.lock
463			.start_send(NotificationsSinkMessage::Notification { message: notification.into() })
464			.map_err(|_| ())
465	}
466}
467
468/// Error specific to the collection of protocols.
469#[derive(Debug, thiserror::Error)]
470pub enum NotifsHandlerError {
471	#[error("Channel of synchronous notifications is full.")]
472	SyncNotificationsClogged,
473}
474
475impl ConnectionHandler for NotifsHandler {
476	type FromBehaviour = NotifsHandlerIn;
477	type ToBehaviour = NotifsHandlerOut;
478	type Error = NotifsHandlerError;
479	type InboundProtocol = UpgradeCollec<NotificationsIn>;
480	type OutboundProtocol = NotificationsOut;
481	// Index within the `out_protocols`.
482	type OutboundOpenInfo = usize;
483	type InboundOpenInfo = ();
484
485	fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
486		let protocols = self
487			.protocols
488			.iter()
489			.map(|p| p.in_upgrade.clone())
490			.collect::<UpgradeCollec<_>>();
491
492		SubstreamProtocol::new(protocols, ())
493	}
494
495	fn on_connection_event(
496		&mut self,
497		event: ConnectionEvent<
498			'_,
499			Self::InboundProtocol,
500			Self::OutboundProtocol,
501			Self::InboundOpenInfo,
502			Self::OutboundOpenInfo,
503		>,
504	) {
505		match event {
506			ConnectionEvent::FullyNegotiatedInbound(inbound) => {
507				let (mut in_substream_open, protocol_index) = inbound.protocol;
508				let protocol_info = &mut self.protocols[protocol_index];
509
510				match protocol_info.state {
511					State::Closed { pending_opening } => {
512						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
513							NotifsHandlerOut::OpenDesiredByRemote {
514								protocol_index,
515								handshake: in_substream_open.handshake,
516							},
517						));
518
519						protocol_info.state = State::OpenDesiredByRemote {
520							in_substream: in_substream_open.substream,
521							pending_opening,
522						};
523					},
524					State::OpenDesiredByRemote { .. } => {
525						// If a substream already exists, silently drop the new one.
526						// Note that we drop the substream, which will send an equivalent to a
527						// TCP "RST" to the remote and force-close the substream. It might
528						// seem like an unclean way to get rid of a substream. However, keep
529						// in mind that it is invalid for the remote to open multiple such
530						// substreams, and therefore sending a "RST" is the most correct thing
531						// to do.
532						return
533					},
534					State::Opening { ref mut in_substream, .. } |
535					State::Open { ref mut in_substream, .. } => {
536						if in_substream.is_some() {
537							// Same remark as above.
538							return
539						}
540
541						// Create `handshake_message` on a separate line to be sure that the
542						// lock is released as soon as possible.
543						let handshake_message = protocol_info.config.handshake.read().clone();
544						in_substream_open.substream.send_handshake(handshake_message);
545						*in_substream = Some(in_substream_open.substream);
546					},
547				}
548			},
549			ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
550				let (new_open, protocol_index) = (outbound.protocol, outbound.info);
551
552				match self.protocols[protocol_index].state {
553					State::Closed { ref mut pending_opening } |
554					State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
555						debug_assert!(*pending_opening);
556						*pending_opening = false;
557					},
558					State::Open { .. } => {
559						error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler");
560						debug_assert!(false);
561					},
562					State::Opening { ref mut in_substream, inbound } => {
563						let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
564						let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
565						let notifications_sink = NotificationsSink {
566							inner: Arc::new(NotificationsSinkInner {
567								peer_id: self.peer_id,
568								async_channel: FuturesMutex::new(async_tx),
569								sync_channel: Mutex::new(Some(sync_tx)),
570							}),
571							metrics: self.metrics.clone(),
572						};
573
574						self.protocols[protocol_index].state = State::Open {
575							notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse())
576								.peekable(),
577							out_substream: Some(new_open.substream),
578							in_substream: in_substream.take(),
579						};
580
581						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
582							NotifsHandlerOut::OpenResultOk {
583								protocol_index,
584								negotiated_fallback: new_open.negotiated_fallback,
585								received_handshake: new_open.handshake,
586								notifications_sink,
587								inbound,
588							},
589						));
590					},
591				}
592			},
593			ConnectionEvent::AddressChange(_address_change) => {},
594			ConnectionEvent::LocalProtocolsChange(_) => {},
595			ConnectionEvent::RemoteProtocolsChange(_) => {},
596			ConnectionEvent::DialUpgradeError(dial_upgrade_error) => match self.protocols
597				[dial_upgrade_error.info]
598				.state
599			{
600				State::Closed { ref mut pending_opening } |
601				State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
602					debug_assert!(*pending_opening);
603					*pending_opening = false;
604				},
605
606				State::Opening { .. } => {
607					self.protocols[dial_upgrade_error.info].state =
608						State::Closed { pending_opening: false };
609
610					self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
611						NotifsHandlerOut::OpenResultErr { protocol_index: dial_upgrade_error.info },
612					));
613				},
614
615				// No substream is being open when already `Open`.
616				State::Open { .. } => debug_assert!(false),
617			},
618			ConnectionEvent::ListenUpgradeError(_listen_upgrade_error) => {},
619		}
620	}
621
622	fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
623		match message {
624			NotifsHandlerIn::Open { protocol_index } => {
625				let protocol_info = &mut self.protocols[protocol_index];
626				match &mut protocol_info.state {
627					State::Closed { pending_opening } => {
628						if !*pending_opening {
629							let proto = NotificationsOut::new(
630								protocol_info.config.name.clone(),
631								protocol_info.config.fallback_names.clone(),
632								protocol_info.config.handshake.read().clone(),
633								protocol_info.config.max_notification_size,
634							);
635
636							self.events_queue.push_back(
637								ConnectionHandlerEvent::OutboundSubstreamRequest {
638									protocol: SubstreamProtocol::new(proto, protocol_index)
639										.with_timeout(OPEN_TIMEOUT),
640								},
641							);
642						}
643
644						protocol_info.state = State::Opening { in_substream: None, inbound: false };
645					},
646					State::OpenDesiredByRemote { pending_opening, in_substream } => {
647						let handshake_message = protocol_info.config.handshake.read().clone();
648
649						if !*pending_opening {
650							let proto = NotificationsOut::new(
651								protocol_info.config.name.clone(),
652								protocol_info.config.fallback_names.clone(),
653								handshake_message.clone(),
654								protocol_info.config.max_notification_size,
655							);
656
657							self.events_queue.push_back(
658								ConnectionHandlerEvent::OutboundSubstreamRequest {
659									protocol: SubstreamProtocol::new(proto, protocol_index)
660										.with_timeout(OPEN_TIMEOUT),
661								},
662							);
663						}
664
665						in_substream.send_handshake(handshake_message);
666
667						// The state change is done in two steps because of borrowing issues.
668						let in_substream = match mem::replace(
669							&mut protocol_info.state,
670							State::Opening { in_substream: None, inbound: false },
671						) {
672							State::OpenDesiredByRemote { in_substream, .. } => in_substream,
673							_ => unreachable!(),
674						};
675						protocol_info.state =
676							State::Opening { in_substream: Some(in_substream), inbound: true };
677					},
678					State::Opening { .. } | State::Open { .. } => {
679						// As documented, it is forbidden to send an `Open` while there is already
680						// one in the fly.
681						error!(target: "sub-libp2p", "opening already-opened handler");
682						debug_assert!(false);
683					},
684				}
685			},
686
687			NotifsHandlerIn::Close { protocol_index } => {
688				match self.protocols[protocol_index].state {
689					State::Open { .. } => {
690						self.protocols[protocol_index].state =
691							State::Closed { pending_opening: false };
692					},
693					State::Opening { .. } => {
694						self.protocols[protocol_index].state =
695							State::Closed { pending_opening: true };
696
697						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
698							NotifsHandlerOut::OpenResultErr { protocol_index },
699						));
700					},
701					State::OpenDesiredByRemote { pending_opening, .. } => {
702						self.protocols[protocol_index].state = State::Closed { pending_opening };
703					},
704					State::Closed { .. } => {},
705				}
706
707				self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
708					NotifsHandlerOut::CloseResult { protocol_index },
709				));
710			},
711		}
712	}
713
714	fn connection_keep_alive(&self) -> KeepAlive {
715		// `Yes` if any protocol has some activity.
716		if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
717			return KeepAlive::Yes
718		}
719
720		// A grace period of `INITIAL_KEEPALIVE_TIME` must be given to leave time for the remote
721		// to express desire to open substreams.
722		#[allow(deprecated)]
723		KeepAlive::Until(self.when_connection_open + INITIAL_KEEPALIVE_TIME)
724	}
725
726	#[allow(deprecated)]
727	fn poll(
728		&mut self,
729		cx: &mut Context,
730	) -> Poll<
731		ConnectionHandlerEvent<
732			Self::OutboundProtocol,
733			Self::OutboundOpenInfo,
734			Self::ToBehaviour,
735			Self::Error,
736		>,
737	> {
738		if let Some(ev) = self.events_queue.pop_front() {
739			return Poll::Ready(ev)
740		}
741
742		// For each open substream, try send messages from `notifications_sink_rx` to the
743		// substream.
744		for protocol_index in 0..self.protocols.len() {
745			if let State::Open {
746				notifications_sink_rx, out_substream: Some(out_substream), ..
747			} = &mut self.protocols[protocol_index].state
748			{
749				loop {
750					// Only proceed with `out_substream.poll_ready_unpin` if there is an element
751					// available in `notifications_sink_rx`. This avoids waking up the task when
752					// a substream is ready to send if there isn't actually something to send.
753					#[allow(deprecated)]
754					match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
755						Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) =>
756							return Poll::Ready(ConnectionHandlerEvent::Close(
757								NotifsHandlerError::SyncNotificationsClogged,
758							)),
759						Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
760						Poll::Ready(None) | Poll::Pending => break,
761					}
762
763					// Before we extract the element from `notifications_sink_rx`, check that the
764					// substream is ready to accept a message.
765					match out_substream.poll_ready_unpin(cx) {
766						Poll::Ready(_) => {},
767						Poll::Pending => break,
768					}
769
770					// Now that the substream is ready for a message, grab what to send.
771					let message = match notifications_sink_rx.poll_next_unpin(cx) {
772						Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) =>
773							message,
774						Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) |
775						Poll::Ready(None) |
776						Poll::Pending => {
777							// Should never be reached, as per `poll_peek` above.
778							debug_assert!(false);
779							break
780						},
781					};
782
783					let _ = out_substream.start_send_unpin(message);
784					// Note that flushing is performed later down this function.
785				}
786			}
787		}
788
789		// Flush all outbound substreams.
790		// When `poll` returns `Poll::Ready`, the libp2p `Swarm` may decide to no longer call
791		// `poll` again before it is ready to accept more events.
792		// In order to make sure that substreams are flushed as soon as possible, the flush is
793		// performed before the code paths that can produce `Ready` (with some rare exceptions).
794		// Importantly, however, the flush is performed *after* notifications are queued with
795		// `Sink::start_send`.
796		// Note that we must call `poll_flush` on all substreams and not only on those we
797		// have called `Sink::start_send` on, because `NotificationsOutSubstream::poll_flush`
798		// also reports the substream termination (even if no data was written into it).
799		for protocol_index in 0..self.protocols.len() {
800			match &mut self.protocols[protocol_index].state {
801				State::Open { out_substream: out_substream @ Some(_), .. } => {
802					match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
803						Poll::Pending | Poll::Ready(Ok(())) => {},
804						Poll::Ready(Err(_)) => {
805							*out_substream = None;
806							let event = NotifsHandlerOut::CloseDesired { protocol_index };
807							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event))
808						},
809					};
810				},
811
812				State::Closed { .. } |
813				State::Opening { .. } |
814				State::Open { out_substream: None, .. } |
815				State::OpenDesiredByRemote { .. } => {},
816			}
817		}
818
819		// Poll inbound substreams.
820		for protocol_index in 0..self.protocols.len() {
821			// Inbound substreams being closed is always tolerated, except for the
822			// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
823			match &mut self.protocols[protocol_index].state {
824				State::Closed { .. } |
825				State::Open { in_substream: None, .. } |
826				State::Opening { in_substream: None, .. } => {},
827
828				State::Open { in_substream: in_substream @ Some(_), .. } =>
829					match futures::prelude::stream::Stream::poll_next(
830						Pin::new(in_substream.as_mut().unwrap()),
831						cx,
832					) {
833						Poll::Pending => {},
834						Poll::Ready(Some(Ok(message))) => {
835							let event = NotifsHandlerOut::Notification { protocol_index, message };
836							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event))
837						},
838						Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None,
839					},
840
841				State::OpenDesiredByRemote { in_substream, pending_opening } =>
842					match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
843						Poll::Pending => {},
844						Poll::Ready(Ok(())) => {},
845						Poll::Ready(Err(_)) => {
846							self.protocols[protocol_index].state =
847								State::Closed { pending_opening: *pending_opening };
848							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
849								NotifsHandlerOut::CloseDesired { protocol_index },
850							))
851						},
852					},
853
854				State::Opening { in_substream: in_substream @ Some(_), .. } =>
855					match NotificationsInSubstream::poll_process(
856						Pin::new(in_substream.as_mut().unwrap()),
857						cx,
858					) {
859						Poll::Pending => {},
860						Poll::Ready(Ok(())) => {},
861						Poll::Ready(Err(_)) => *in_substream = None,
862					},
863			}
864		}
865
866		// This is the only place in this method that can return `Pending`.
867		// By putting it at the very bottom, we are guaranteed that everything has been properly
868		// polled.
869		Poll::Pending
870	}
871}
872
873#[cfg(test)]
874pub mod tests {
875	use super::*;
876	use crate::protocol::notifications::upgrade::{
877		NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen,
878	};
879	use asynchronous_codec::Framed;
880	use libp2p::{
881		core::muxing::SubstreamBox,
882		swarm::handler::{self, StreamUpgradeError},
883	};
884	use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version};
885	use std::{
886		collections::HashMap,
887		io::{Error, IoSlice, IoSliceMut},
888	};
889	use tokio::sync::mpsc;
890	use unsigned_varint::codec::UviBytes;
891
892	struct OpenSubstream {
893		notifications: stream::Peekable<
894			stream::Select<
895				stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
896				stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
897			>,
898		>,
899		_in_substream: MockSubstream,
900		_out_substream: MockSubstream,
901	}
902
903	pub struct ConnectionYielder {
904		connections: HashMap<(PeerId, usize), OpenSubstream>,
905	}
906
907	impl ConnectionYielder {
908		/// Create new [`ConnectionYielder`].
909		pub fn new() -> Self {
910			Self { connections: HashMap::new() }
911		}
912
913		/// Open a new substream for peer.
914		pub fn open_substream(
915			&mut self,
916			peer: PeerId,
917			protocol_index: usize,
918			received_handshake: Vec<u8>,
919		) -> NotifsHandlerOut {
920			let (async_tx, async_rx) =
921				futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
922			let (sync_tx, sync_rx) =
923				futures::channel::mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
924			let notifications_sink = NotificationsSink {
925				inner: Arc::new(NotificationsSinkInner {
926					peer_id: peer,
927					async_channel: FuturesMutex::new(async_tx),
928					sync_channel: Mutex::new(Some(sync_tx)),
929				}),
930				metrics: None,
931			};
932			let (in_substream, out_substream) = MockSubstream::new();
933
934			self.connections.insert(
935				(peer, protocol_index),
936				OpenSubstream {
937					notifications: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
938					_in_substream: in_substream,
939					_out_substream: out_substream,
940				},
941			);
942
943			NotifsHandlerOut::OpenResultOk {
944				protocol_index,
945				negotiated_fallback: None,
946				received_handshake,
947				notifications_sink,
948				inbound: false,
949			}
950		}
951
952		/// Attempt to get next pending event from one of the notification sinks.
953		pub async fn get_next_event(&mut self, peer: PeerId, set: usize) -> Option<Vec<u8>> {
954			let substream = if let Some(info) = self.connections.get_mut(&(peer, set)) {
955				info
956			} else {
957				return None
958			};
959
960			futures::future::poll_fn(|cx| match substream.notifications.poll_next_unpin(cx) {
961				Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) =>
962					Poll::Ready(Some(message)),
963				Poll::Pending => Poll::Ready(None),
964				Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) => {
965					panic!("sink closed")
966				},
967			})
968			.await
969		}
970	}
971
972	struct MockSubstream {
973		pub rx: mpsc::Receiver<Vec<u8>>,
974		pub tx: mpsc::Sender<Vec<u8>>,
975		rx_buffer: BytesMut,
976	}
977
978	impl MockSubstream {
979		/// Create new substream pair.
980		pub fn new() -> (Self, Self) {
981			let (tx1, rx1) = mpsc::channel(32);
982			let (tx2, rx2) = mpsc::channel(32);
983
984			(
985				Self { rx: rx1, tx: tx2, rx_buffer: BytesMut::with_capacity(512) },
986				Self { rx: rx2, tx: tx1, rx_buffer: BytesMut::with_capacity(512) },
987			)
988		}
989
990		/// Create new negotiated substream pair.
991		pub async fn negotiated() -> (Stream, Stream) {
992			let (socket1, socket2) = Self::new();
993			let socket1 = SubstreamBox::new(socket1);
994			let socket2 = SubstreamBox::new(socket2);
995
996			let protos = vec!["/echo/1.0.0", "/echo/2.5.0"];
997			let (res1, res2) = tokio::join!(
998				dialer_select_proto(socket1, protos.clone(), Version::V1),
999				listener_select_proto(socket2, protos),
1000			);
1001
1002			(Self::stream_new(res1.unwrap().1), Self::stream_new(res2.unwrap().1))
1003		}
1004
1005		/// Unsafe substitute for `Stream::new` private constructor.
1006		fn stream_new(stream: Negotiated<SubstreamBox>) -> Stream {
1007			// Static asserts to make sure this doesn't break.
1008			const _: () = {
1009				assert!(
1010					core::mem::size_of::<Stream>() ==
1011						core::mem::size_of::<Negotiated<SubstreamBox>>()
1012				);
1013				assert!(
1014					core::mem::align_of::<Stream>() ==
1015						core::mem::align_of::<Negotiated<SubstreamBox>>()
1016				);
1017			};
1018
1019			unsafe { core::mem::transmute(stream) }
1020		}
1021	}
1022
1023	impl AsyncWrite for MockSubstream {
1024		fn poll_write<'a>(
1025			self: Pin<&mut Self>,
1026			_cx: &mut Context<'a>,
1027			buf: &[u8],
1028		) -> Poll<Result<usize, Error>> {
1029			match self.tx.try_send(buf.to_vec()) {
1030				Ok(_) => Poll::Ready(Ok(buf.len())),
1031				Err(_) => Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1032			}
1033		}
1034
1035		fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1036			Poll::Ready(Ok(()))
1037		}
1038
1039		fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1040			Poll::Ready(Ok(()))
1041		}
1042
1043		fn poll_write_vectored<'a, 'b>(
1044			self: Pin<&mut Self>,
1045			_cx: &mut Context<'a>,
1046			_bufs: &[IoSlice<'b>],
1047		) -> Poll<Result<usize, Error>> {
1048			unimplemented!();
1049		}
1050	}
1051
1052	impl AsyncRead for MockSubstream {
1053		fn poll_read<'a>(
1054			mut self: Pin<&mut Self>,
1055			cx: &mut Context<'a>,
1056			buf: &mut [u8],
1057		) -> Poll<Result<usize, Error>> {
1058			match self.rx.poll_recv(cx) {
1059				Poll::Ready(Some(data)) => self.rx_buffer.extend_from_slice(&data),
1060				Poll::Ready(None) =>
1061					return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1062				_ => {},
1063			}
1064
1065			let nsize = std::cmp::min(self.rx_buffer.len(), buf.len());
1066			let data = self.rx_buffer.split_to(nsize);
1067			buf[..nsize].copy_from_slice(&data[..]);
1068
1069			if nsize > 0 {
1070				return Poll::Ready(Ok(nsize))
1071			}
1072
1073			Poll::Pending
1074		}
1075
1076		fn poll_read_vectored<'a, 'b>(
1077			self: Pin<&mut Self>,
1078			_cx: &mut Context<'a>,
1079			_bufs: &mut [IoSliceMut<'b>],
1080		) -> Poll<Result<usize, Error>> {
1081			unimplemented!();
1082		}
1083	}
1084
1085	/// Create new [`NotifsHandler`].
1086	fn notifs_handler() -> NotifsHandler {
1087		let proto = Protocol {
1088			config: ProtocolConfig {
1089				name: "/foo".into(),
1090				fallback_names: vec![],
1091				handshake: Arc::new(RwLock::new(b"hello, world".to_vec())),
1092				max_notification_size: u64::MAX,
1093			},
1094			in_upgrade: NotificationsIn::new("/foo", Vec::new(), u64::MAX),
1095			state: State::Closed { pending_opening: false },
1096		};
1097
1098		NotifsHandler {
1099			protocols: vec![proto],
1100			when_connection_open: Instant::now(),
1101			peer_id: PeerId::random(),
1102			events_queue: VecDeque::new(),
1103			metrics: None,
1104		}
1105	}
1106
1107	// verify that if another substream is attempted to be opened by remote while an inbound
1108	// substream already exists, the new inbound stream is rejected and closed by the local node.
1109	#[tokio::test]
1110	async fn second_open_desired_by_remote_rejected() {
1111		let mut handler = notifs_handler();
1112		let (io, mut io2) = MockSubstream::negotiated().await;
1113		let mut codec = UviBytes::default();
1114		codec.set_max_len(usize::MAX);
1115
1116		let notif_in = NotificationsInOpen {
1117			handshake: b"hello, world".to_vec(),
1118			substream: NotificationsInSubstream::new(
1119				Framed::new(io, codec),
1120				NotificationsInSubstreamHandshake::NotSent,
1121			),
1122		};
1123
1124		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1125			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1126		));
1127
1128		// verify that the substream is in (partly) opened state
1129		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1130		futures::future::poll_fn(|cx| {
1131			let mut buf = Vec::with_capacity(512);
1132			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1133			Poll::Ready(())
1134		})
1135		.await;
1136
1137		// attempt to open another inbound substream and verify that it is rejected
1138		let (io, mut io2) = MockSubstream::negotiated().await;
1139		let mut codec = UviBytes::default();
1140		codec.set_max_len(usize::MAX);
1141
1142		let notif_in = NotificationsInOpen {
1143			handshake: b"hello, world".to_vec(),
1144			substream: NotificationsInSubstream::new(
1145				Framed::new(io, codec),
1146				NotificationsInSubstreamHandshake::NotSent,
1147			),
1148		};
1149
1150		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1151			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1152		));
1153
1154		// verify that the new substream is rejected and closed
1155		futures::future::poll_fn(|cx| {
1156			let mut buf = Vec::with_capacity(512);
1157
1158			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1159				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1160			}
1161
1162			Poll::Ready(())
1163		})
1164		.await;
1165	}
1166
1167	#[tokio::test]
1168	async fn open_rejected_if_substream_is_opening() {
1169		let mut handler = notifs_handler();
1170		let (io, mut io2) = MockSubstream::negotiated().await;
1171		let mut codec = UviBytes::default();
1172		codec.set_max_len(usize::MAX);
1173
1174		let notif_in = NotificationsInOpen {
1175			handshake: b"hello, world".to_vec(),
1176			substream: NotificationsInSubstream::new(
1177				Framed::new(io, codec),
1178				NotificationsInSubstreamHandshake::NotSent,
1179			),
1180		};
1181
1182		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1183			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1184		));
1185
1186		// verify that the substream is in (partly) opened state
1187		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1188		futures::future::poll_fn(|cx| {
1189			let mut buf = Vec::with_capacity(512);
1190			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1191			Poll::Ready(())
1192		})
1193		.await;
1194
1195		// move the handler state to 'Opening'
1196		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1197		assert!(std::matches!(
1198			handler.protocols[0].state,
1199			State::Opening { in_substream: Some(_), .. }
1200		));
1201
1202		// remote now tries to open another substream, verify that it is rejected and closed
1203		let (io, mut io2) = MockSubstream::negotiated().await;
1204		let mut codec = UviBytes::default();
1205		codec.set_max_len(usize::MAX);
1206
1207		let notif_in = NotificationsInOpen {
1208			handshake: b"hello, world".to_vec(),
1209			substream: NotificationsInSubstream::new(
1210				Framed::new(io, codec),
1211				NotificationsInSubstreamHandshake::NotSent,
1212			),
1213		};
1214
1215		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1216			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1217		));
1218
1219		// verify that the new substream is rejected and closed but that the first substream is
1220		// still in correct state
1221		futures::future::poll_fn(|cx| {
1222			let mut buf = Vec::with_capacity(512);
1223
1224			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1225				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1226			} else {
1227				panic!("unexpected result");
1228			}
1229
1230			Poll::Ready(())
1231		})
1232		.await;
1233		assert!(std::matches!(
1234			handler.protocols[0].state,
1235			State::Opening { in_substream: Some(_), .. }
1236		));
1237	}
1238
1239	#[tokio::test]
1240	async fn open_rejected_if_substream_already_open() {
1241		let mut handler = notifs_handler();
1242		let (io, mut io2) = MockSubstream::negotiated().await;
1243		let mut codec = UviBytes::default();
1244		codec.set_max_len(usize::MAX);
1245
1246		let notif_in = NotificationsInOpen {
1247			handshake: b"hello, world".to_vec(),
1248			substream: NotificationsInSubstream::new(
1249				Framed::new(io, codec),
1250				NotificationsInSubstreamHandshake::NotSent,
1251			),
1252		};
1253		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1254			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1255		));
1256
1257		// verify that the substream is in (partly) opened state
1258		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1259		futures::future::poll_fn(|cx| {
1260			let mut buf = Vec::with_capacity(512);
1261			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1262			Poll::Ready(())
1263		})
1264		.await;
1265
1266		// move the handler state to 'Opening'
1267		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1268		assert!(std::matches!(
1269			handler.protocols[0].state,
1270			State::Opening { in_substream: Some(_), .. }
1271		));
1272
1273		// accept the substream and move its state to `Open`
1274		let (io, _io2) = MockSubstream::negotiated().await;
1275		let mut codec = UviBytes::default();
1276		codec.set_max_len(usize::MAX);
1277
1278		let notif_out = NotificationsOutOpen {
1279			handshake: b"hello, world".to_vec(),
1280			negotiated_fallback: None,
1281			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1282		};
1283		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1284			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1285		));
1286
1287		assert!(std::matches!(
1288			handler.protocols[0].state,
1289			State::Open { in_substream: Some(_), .. }
1290		));
1291
1292		// remote now tries to open another substream, verify that it is rejected and closed
1293		let (io, mut io2) = MockSubstream::negotiated().await;
1294		let mut codec = UviBytes::default();
1295		codec.set_max_len(usize::MAX);
1296		let notif_in = NotificationsInOpen {
1297			handshake: b"hello, world".to_vec(),
1298			substream: NotificationsInSubstream::new(
1299				Framed::new(io, codec),
1300				NotificationsInSubstreamHandshake::NotSent,
1301			),
1302		};
1303		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1304			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1305		));
1306
1307		// verify that the new substream is rejected and closed but that the first substream is
1308		// still in correct state
1309		futures::future::poll_fn(|cx| {
1310			let mut buf = Vec::with_capacity(512);
1311
1312			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1313				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
1314			} else {
1315				panic!("unexpected result");
1316			}
1317
1318			Poll::Ready(())
1319		})
1320		.await;
1321		assert!(std::matches!(
1322			handler.protocols[0].state,
1323			State::Open { in_substream: Some(_), .. }
1324		));
1325	}
1326
1327	#[tokio::test]
1328	async fn fully_negotiated_resets_state_for_closed_substream() {
1329		let mut handler = notifs_handler();
1330		let (io, mut io2) = MockSubstream::negotiated().await;
1331		let mut codec = UviBytes::default();
1332		codec.set_max_len(usize::MAX);
1333
1334		let notif_in = NotificationsInOpen {
1335			handshake: b"hello, world".to_vec(),
1336			substream: NotificationsInSubstream::new(
1337				Framed::new(io, codec),
1338				NotificationsInSubstreamHandshake::NotSent,
1339			),
1340		};
1341		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1342			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1343		));
1344
1345		// verify that the substream is in (partly) opened state
1346		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1347		futures::future::poll_fn(|cx| {
1348			let mut buf = Vec::with_capacity(512);
1349			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1350			Poll::Ready(())
1351		})
1352		.await;
1353
1354		// first instruct the handler to open a connection and then close it right after
1355		// so the handler is in state `Closed { pending_opening: true }`
1356		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1357		assert!(std::matches!(
1358			handler.protocols[0].state,
1359			State::Opening { in_substream: Some(_), .. }
1360		));
1361
1362		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1363		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1364
1365		// verify that if the the outbound substream is successfully negotiated, the state is not
1366		// changed as the substream was commanded to be closed by the handler.
1367		let (io, _io2) = MockSubstream::negotiated().await;
1368		let mut codec = UviBytes::default();
1369		codec.set_max_len(usize::MAX);
1370
1371		let notif_out = NotificationsOutOpen {
1372			handshake: b"hello, world".to_vec(),
1373			negotiated_fallback: None,
1374			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1375		};
1376		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1377			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1378		));
1379
1380		assert!(std::matches!(
1381			handler.protocols[0].state,
1382			State::Closed { pending_opening: false }
1383		));
1384	}
1385
1386	#[tokio::test]
1387	async fn fully_negotiated_resets_state_for_open_desired_substream() {
1388		let mut handler = notifs_handler();
1389		let (io, mut io2) = MockSubstream::negotiated().await;
1390		let mut codec = UviBytes::default();
1391		codec.set_max_len(usize::MAX);
1392
1393		let notif_in = NotificationsInOpen {
1394			handshake: b"hello, world".to_vec(),
1395			substream: NotificationsInSubstream::new(
1396				Framed::new(io, codec),
1397				NotificationsInSubstreamHandshake::NotSent,
1398			),
1399		};
1400		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1401			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1402		));
1403
1404		// verify that the substream is in (partly) opened state
1405		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1406		futures::future::poll_fn(|cx| {
1407			let mut buf = Vec::with_capacity(512);
1408			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1409			Poll::Ready(())
1410		})
1411		.await;
1412
1413		// first instruct the handler to open a connection and then close it right after
1414		// so the handler is in state `Closed { pending_opening: true }`
1415		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1416		assert!(std::matches!(
1417			handler.protocols[0].state,
1418			State::Opening { in_substream: Some(_), .. }
1419		));
1420
1421		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1422		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1423
1424		// attempt to open another inbound substream and verify that it is rejected
1425		let (io, _io2) = MockSubstream::negotiated().await;
1426		let mut codec = UviBytes::default();
1427		codec.set_max_len(usize::MAX);
1428
1429		let notif_in = NotificationsInOpen {
1430			handshake: b"hello, world".to_vec(),
1431			substream: NotificationsInSubstream::new(
1432				Framed::new(io, codec),
1433				NotificationsInSubstreamHandshake::NotSent,
1434			),
1435		};
1436		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1437			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1438		));
1439
1440		assert!(std::matches!(
1441			handler.protocols[0].state,
1442			State::OpenDesiredByRemote { pending_opening: true, .. }
1443		));
1444
1445		// verify that if the the outbound substream is successfully negotiated, the state is not
1446		// changed as the substream was commanded to be closed by the handler.
1447		let (io, _io2) = MockSubstream::negotiated().await;
1448		let mut codec = UviBytes::default();
1449		codec.set_max_len(usize::MAX);
1450
1451		let notif_out = NotificationsOutOpen {
1452			handshake: b"hello, world".to_vec(),
1453			negotiated_fallback: None,
1454			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1455		};
1456
1457		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1458			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1459		));
1460
1461		assert!(std::matches!(
1462			handler.protocols[0].state,
1463			State::OpenDesiredByRemote { pending_opening: false, .. }
1464		));
1465	}
1466
1467	#[tokio::test]
1468	async fn dial_upgrade_error_resets_closed_outbound_state() {
1469		let mut handler = notifs_handler();
1470		let (io, mut io2) = MockSubstream::negotiated().await;
1471		let mut codec = UviBytes::default();
1472		codec.set_max_len(usize::MAX);
1473
1474		let notif_in = NotificationsInOpen {
1475			handshake: b"hello, world".to_vec(),
1476			substream: NotificationsInSubstream::new(
1477				Framed::new(io, codec),
1478				NotificationsInSubstreamHandshake::NotSent,
1479			),
1480		};
1481		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1482			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1483		));
1484
1485		// verify that the substream is in (partly) opened state
1486		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1487		futures::future::poll_fn(|cx| {
1488			let mut buf = Vec::with_capacity(512);
1489			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1490			Poll::Ready(())
1491		})
1492		.await;
1493
1494		// first instruct the handler to open a connection and then close it right after
1495		// so the handler is in state `Closed { pending_opening: true }`
1496		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1497		assert!(std::matches!(
1498			handler.protocols[0].state,
1499			State::Opening { in_substream: Some(_), .. }
1500		));
1501
1502		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1503		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1504
1505		// inject dial failure to an already closed substream and verify outbound state is reset
1506		handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1507			handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1508		));
1509		assert!(std::matches!(
1510			handler.protocols[0].state,
1511			State::Closed { pending_opening: false }
1512		));
1513	}
1514
1515	#[tokio::test]
1516	async fn dial_upgrade_error_resets_open_desired_state() {
1517		let mut handler = notifs_handler();
1518		let (io, mut io2) = MockSubstream::negotiated().await;
1519		let mut codec = UviBytes::default();
1520		codec.set_max_len(usize::MAX);
1521
1522		let notif_in = NotificationsInOpen {
1523			handshake: b"hello, world".to_vec(),
1524			substream: NotificationsInSubstream::new(
1525				Framed::new(io, codec),
1526				NotificationsInSubstreamHandshake::NotSent,
1527			),
1528		};
1529		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1530			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1531		));
1532
1533		// verify that the substream is in (partly) opened state
1534		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1535		futures::future::poll_fn(|cx| {
1536			let mut buf = Vec::with_capacity(512);
1537			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1538			Poll::Ready(())
1539		})
1540		.await;
1541
1542		// first instruct the handler to open a connection and then close it right after
1543		// so the handler is in state `Closed { pending_opening: true }`
1544		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1545		assert!(std::matches!(
1546			handler.protocols[0].state,
1547			State::Opening { in_substream: Some(_), .. }
1548		));
1549
1550		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1551		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1552
1553		let (io, _io2) = MockSubstream::negotiated().await;
1554		let mut codec = UviBytes::default();
1555		codec.set_max_len(usize::MAX);
1556
1557		let notif_in = NotificationsInOpen {
1558			handshake: b"hello, world".to_vec(),
1559			substream: NotificationsInSubstream::new(
1560				Framed::new(io, codec),
1561				NotificationsInSubstreamHandshake::NotSent,
1562			),
1563		};
1564		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1565			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1566		));
1567
1568		assert!(std::matches!(
1569			handler.protocols[0].state,
1570			State::OpenDesiredByRemote { pending_opening: true, .. }
1571		));
1572
1573		// inject dial failure to an already closed substream and verify outbound state is reset
1574		handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1575			handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1576		));
1577		assert!(std::matches!(
1578			handler.protocols[0].state,
1579			State::OpenDesiredByRemote { pending_opening: false, .. }
1580		));
1581	}
1582
1583	#[tokio::test]
1584	async fn sync_notifications_clogged() {
1585		let mut handler = notifs_handler();
1586		let (io, _) = MockSubstream::negotiated().await;
1587		let codec = UviBytes::default();
1588
1589		let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
1590		let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1);
1591		let notifications_sink = NotificationsSink {
1592			inner: Arc::new(NotificationsSinkInner {
1593				peer_id: PeerId::random(),
1594				async_channel: FuturesMutex::new(async_tx),
1595				sync_channel: Mutex::new(Some(sync_tx)),
1596			}),
1597			metrics: None,
1598		};
1599
1600		handler.protocols[0].state = State::Open {
1601			notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
1602			out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))),
1603			in_substream: None,
1604		};
1605
1606		notifications_sink.send_sync_notification(vec![1, 3, 3, 7]);
1607		notifications_sink.send_sync_notification(vec![1, 3, 3, 8]);
1608		notifications_sink.send_sync_notification(vec![1, 3, 3, 9]);
1609		notifications_sink.send_sync_notification(vec![1, 3, 4, 0]);
1610
1611		#[allow(deprecated)]
1612		futures::future::poll_fn(|cx| {
1613			assert!(std::matches!(
1614				handler.poll(cx),
1615				Poll::Ready(ConnectionHandlerEvent::Close(
1616					NotifsHandlerError::SyncNotificationsClogged,
1617				))
1618			));
1619			Poll::Ready(())
1620		})
1621		.await;
1622	}
1623
1624	#[tokio::test]
1625	async fn close_desired_by_remote() {
1626		let mut handler = notifs_handler();
1627		let (io, io2) = MockSubstream::negotiated().await;
1628		let mut codec = UviBytes::default();
1629		codec.set_max_len(usize::MAX);
1630
1631		let notif_in = NotificationsInOpen {
1632			handshake: b"hello, world".to_vec(),
1633			substream: NotificationsInSubstream::new(
1634				Framed::new(io, codec),
1635				NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]),
1636			),
1637		};
1638
1639		// add new inbound substream but close it immediately and verify that correct events are
1640		// emitted
1641		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1642			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1643		));
1644		drop(io2);
1645
1646		futures::future::poll_fn(|cx| {
1647			assert!(std::matches!(
1648				handler.poll(cx),
1649				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1650					NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0, .. },
1651				))
1652			));
1653			assert!(std::matches!(
1654				handler.poll(cx),
1655				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1656					NotifsHandlerOut::CloseDesired { protocol_index: 0 },
1657				))
1658			));
1659			Poll::Ready(())
1660		})
1661		.await;
1662	}
1663}