sc_network/
protocol_controller.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Protocol Controller. Generic implementation of peer management for protocols.
20//! Responsible for accepting/rejecting incoming connections and initiating outgoing connections,
21//! respecting the inbound and outbound peer slot counts. Communicates with `PeerStore` to get and
22//! update peer reputation values and sends commands to `Notifications`.
23//!
24//! Due to asynchronous nature of communication between `ProtocolController` and `Notifications`,
25//! `ProtocolController` has an imperfect view of the states of the peers. To reduce this
26//! desynchronization, the following measures are taken:
27//!
28//! 1. Network peer events from `Notifications` are prioritized over actions from external API and
29//!    internal actions by `ProtocolController` (like slot allocation).
30//! 2. `Notifications` ignores all commands from `ProtocolController` after sending "incoming"
31//!    request until receiving the answer to this "incoming" request.
32//! 3. After sending a "connect" message, `ProtocolController` switches the state of the peer from
33//!    `Outbound` to `Inbound` if it receives an "incoming" request from `Notifications` for this
34//!    peer.
35//!
36//! These measures do not eliminate confusing commands from `ProtocolController` completely,
37//! so `Notifications` must correctly handle seemingly inconsistent commands, like a "connect"
38//! command for the peer it thinks is already connected, and a "drop" command for a peer that
39//! was previously dropped.
40//!
41//! Even though this does not guarantee that `ProtocolController` and `Notifications` have the same
42//! view of the peers' states at any given moment, the eventual consistency is maintained.
43
44use crate::peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT};
45
46use futures::{channel::oneshot, future::Either, FutureExt, StreamExt};
47use libp2p::PeerId;
48use log::{debug, error, trace, warn};
49use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
50use sp_arithmetic::traits::SaturatedConversion;
51use std::{
52	collections::{HashMap, HashSet},
53	sync::Arc,
54	time::{Duration, Instant},
55};
56use wasm_timer::Delay;
57
58/// Log target for this file.
59pub const LOG_TARGET: &str = "peerset";
60
61/// `Notifications` protocol index. For historical reasons it's called `SetId`, because it
62/// used to refer to a set of peers in a peerset for this protocol.
63///
64/// Can be constructed using the `From<usize>` trait implementation based on the index of the
65/// protocol in `Notifications`.
66#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub struct SetId(usize);
68
69impl SetId {
70	/// Const conversion function for initialization of hardcoded peerset indices.
71	pub const fn from(id: usize) -> Self {
72		Self(id)
73	}
74}
75
76impl From<usize> for SetId {
77	fn from(id: usize) -> Self {
78		Self(id)
79	}
80}
81
82impl From<SetId> for usize {
83	fn from(id: SetId) -> Self {
84		id.0
85	}
86}
87
88/// Configuration for a set of nodes for a specific protocol.
89#[derive(Debug)]
90pub struct ProtoSetConfig {
91	/// Maximum number of incoming links to peers.
92	pub in_peers: u32,
93
94	/// Maximum number of outgoing links to peers.
95	pub out_peers: u32,
96
97	/// Lists of nodes we should always be connected to.
98	///
99	/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
100	/// >			otherwise it will not be able to connect to them.
101	pub reserved_nodes: HashSet<PeerId>,
102
103	/// If true, we only accept nodes in [`ProtoSetConfig::reserved_nodes`].
104	pub reserved_only: bool,
105}
106
107/// Message that is sent by [`ProtocolController`] to `Notifications`.
108#[derive(Debug, PartialEq)]
109pub enum Message {
110	/// Request to open a connection to the given peer. From the point of view of the
111	/// `ProtocolController`, we are immediately connected.
112	Connect {
113		/// Set id to connect on.
114		set_id: SetId,
115		/// Peer to connect to.
116		peer_id: PeerId,
117	},
118
119	/// Drop the connection to the given peer, or cancel the connection attempt after a `Connect`.
120	Drop {
121		/// Set id to disconnect on.
122		set_id: SetId,
123		/// Peer to disconnect from.
124		peer_id: PeerId,
125	},
126
127	/// Equivalent to `Connect` for the peer corresponding to this incoming index.
128	Accept(IncomingIndex),
129
130	/// Equivalent to `Drop` for the peer corresponding to this incoming index.
131	Reject(IncomingIndex),
132}
133
134/// Opaque identifier for an incoming connection. Allocated by the network.
135#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
136pub struct IncomingIndex(pub u64);
137
138impl From<u64> for IncomingIndex {
139	fn from(val: u64) -> Self {
140		Self(val)
141	}
142}
143
144/// External API actions.
145#[derive(Debug)]
146enum Action {
147	/// Add a reserved peer or mark already connected peer as reserved.
148	AddReservedPeer(PeerId),
149	/// Remove a reserved peer.
150	RemoveReservedPeer(PeerId),
151	/// Update reserved peers to match the provided set.
152	SetReservedPeers(HashSet<PeerId>),
153	/// Set/unset reserved-only mode.
154	SetReservedOnly(bool),
155	/// Disconnect a peer.
156	DisconnectPeer(PeerId),
157	/// Get the list of reserved peers.
158	GetReservedPeers(oneshot::Sender<Vec<PeerId>>),
159}
160
161/// Network events from `Notifications`.
162#[derive(Debug)]
163enum Event {
164	/// Incoming connection from the peer.
165	IncomingConnection(PeerId, IncomingIndex),
166	/// Connection with the peer dropped.
167	Dropped(PeerId),
168}
169
170/// Shared handle to [`ProtocolController`]. Distributed around the code outside of the
171/// protocol implementation.
172#[derive(Debug, Clone)]
173pub struct ProtocolHandle {
174	/// Actions from outer API.
175	actions_tx: TracingUnboundedSender<Action>,
176	/// Connection events from `Notifications`. We prioritize them over actions.
177	events_tx: TracingUnboundedSender<Event>,
178}
179
180impl ProtocolHandle {
181	/// Adds a new reserved peer. [`ProtocolController`] will make an effort
182	/// to always remain connected to this peer.
183	///
184	/// Has no effect if the node was already a reserved peer.
185	///
186	/// > **Note**: Keep in mind that the networking has to know an address for this node,
187	/// > otherwise it will not be able to connect to it.
188	pub fn add_reserved_peer(&self, peer_id: PeerId) {
189		let _ = self.actions_tx.unbounded_send(Action::AddReservedPeer(peer_id));
190	}
191
192	/// Demotes reserved peer to non-reserved. Does not disconnect the peer.
193	///
194	/// Has no effect if the node was not a reserved peer.
195	pub fn remove_reserved_peer(&self, peer_id: PeerId) {
196		let _ = self.actions_tx.unbounded_send(Action::RemoveReservedPeer(peer_id));
197	}
198
199	/// Set reserved peers to the new set.
200	pub fn set_reserved_peers(&self, peer_ids: HashSet<PeerId>) {
201		let _ = self.actions_tx.unbounded_send(Action::SetReservedPeers(peer_ids));
202	}
203
204	/// Sets whether or not [`ProtocolController`] only has connections with nodes marked
205	/// as reserved for the given set.
206	pub fn set_reserved_only(&self, reserved: bool) {
207		let _ = self.actions_tx.unbounded_send(Action::SetReservedOnly(reserved));
208	}
209
210	/// Disconnect peer. You should remove the `PeerId` from the `PeerStore` first
211	/// to not connect to the peer again during the next slot allocation.
212	pub fn disconnect_peer(&self, peer_id: PeerId) {
213		let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id));
214	}
215
216	/// Get the list of reserved peers.
217	pub fn reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
218		let _ = self.actions_tx.unbounded_send(Action::GetReservedPeers(pending_response));
219	}
220
221	/// Notify about incoming connection. [`ProtocolController`] will either accept or reject it.
222	pub fn incoming_connection(&self, peer_id: PeerId, incoming_index: IncomingIndex) {
223		let _ = self
224			.events_tx
225			.unbounded_send(Event::IncomingConnection(peer_id, incoming_index));
226	}
227
228	/// Notify that connection was dropped (either refused or disconnected).
229	pub fn dropped(&self, peer_id: PeerId) {
230		let _ = self.events_tx.unbounded_send(Event::Dropped(peer_id));
231	}
232}
233
234impl ProtocolHandleT for ProtocolHandle {
235	fn disconnect_peer(&self, peer_id: sc_network_types::PeerId) {
236		let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id.into()));
237	}
238}
239
240/// Direction of a connection
241#[derive(Clone, Copy, Debug)]
242enum Direction {
243	Inbound,
244	Outbound,
245}
246
247/// Status of a connection with a peer.
248#[derive(Clone, Debug)]
249enum PeerState {
250	/// We are connected to the peer.
251	Connected(Direction),
252	/// We are not connected.
253	NotConnected,
254}
255
256impl PeerState {
257	/// Returns true if we are connected with the node.
258	fn is_connected(&self) -> bool {
259		matches!(self, PeerState::Connected(_))
260	}
261}
262
263impl Default for PeerState {
264	fn default() -> PeerState {
265		PeerState::NotConnected
266	}
267}
268
269/// Worker side of [`ProtocolHandle`] responsible for all the logic.
270#[derive(Debug)]
271pub struct ProtocolController {
272	/// Set id to use when sending connect/drop requests to `Notifications`.
273	// Will likely be replaced by `ProtocolName` in the future.
274	set_id: SetId,
275	/// Receiver for outer API messages from [`ProtocolHandle`].
276	actions_rx: TracingUnboundedReceiver<Action>,
277	/// Receiver for connection events from `Notifications` sent via [`ProtocolHandle`].
278	events_rx: TracingUnboundedReceiver<Event>,
279	/// Number of occupied slots for incoming connections (not counting reserved nodes).
280	num_in: u32,
281	/// Number of occupied slots for outgoing connections (not counting reserved nodes).
282	num_out: u32,
283	/// Maximum number of slots for incoming connections (not counting reserved nodes).
284	max_in: u32,
285	/// Maximum number of slots for outgoing connections (not counting reserved nodes).
286	max_out: u32,
287	/// Connected regular nodes.
288	nodes: HashMap<PeerId, Direction>,
289	/// Reserved nodes. Should be always connected and do not occupy peer slots.
290	reserved_nodes: HashMap<PeerId, PeerState>,
291	/// Connect only to reserved nodes.
292	reserved_only: bool,
293	/// Next time to allocate slots. This is done once per second.
294	next_periodic_alloc_slots: Instant,
295	/// Outgoing channel for messages to `Notifications`.
296	to_notifications: TracingUnboundedSender<Message>,
297	/// `PeerStore` handle for checking peer reputation values and getting connection candidates
298	/// with highest reputation.
299	peer_store: Arc<dyn PeerStoreProvider>,
300}
301
302impl ProtocolController {
303	/// Construct new [`ProtocolController`].
304	pub fn new(
305		set_id: SetId,
306		config: ProtoSetConfig,
307		to_notifications: TracingUnboundedSender<Message>,
308		peer_store: Arc<dyn PeerStoreProvider>,
309	) -> (ProtocolHandle, ProtocolController) {
310		let (actions_tx, actions_rx) = tracing_unbounded("mpsc_api_protocol", 10_000);
311		let (events_tx, events_rx) = tracing_unbounded("mpsc_notifications_protocol", 10_000);
312		let handle = ProtocolHandle { actions_tx, events_tx };
313		peer_store.register_protocol(Arc::new(handle.clone()));
314		let reserved_nodes =
315			config.reserved_nodes.iter().map(|p| (*p, PeerState::NotConnected)).collect();
316		let controller = ProtocolController {
317			set_id,
318			actions_rx,
319			events_rx,
320			num_in: 0,
321			num_out: 0,
322			max_in: config.in_peers,
323			max_out: config.out_peers,
324			nodes: HashMap::new(),
325			reserved_nodes,
326			reserved_only: config.reserved_only,
327			next_periodic_alloc_slots: Instant::now(),
328			to_notifications,
329			peer_store,
330		};
331		(handle, controller)
332	}
333
334	/// Drive [`ProtocolController`]. This function returns when all instances of
335	/// [`ProtocolHandle`] are dropped.
336	pub async fn run(mut self) {
337		while self.next_action().await {}
338	}
339
340	/// Perform one action. Returns `true` if it should be called again.
341	///
342	/// Intended for tests only. Use `run` for driving [`ProtocolController`].
343	pub async fn next_action(&mut self) -> bool {
344		let either = loop {
345			let mut next_alloc_slots = Delay::new_at(self.next_periodic_alloc_slots).fuse();
346
347			// See the module doc for why we use `select_biased!`.
348			futures::select_biased! {
349				event = self.events_rx.next() => match event {
350					Some(event) => break Either::Left(event),
351					None => return false,
352				},
353				action = self.actions_rx.next() => match action {
354					Some(action) => break Either::Right(action),
355					None => return false,
356				},
357				_ = next_alloc_slots => {
358					self.alloc_slots();
359					self.next_periodic_alloc_slots = Instant::now() + Duration::new(1, 0);
360				},
361			}
362		};
363
364		match either {
365			Either::Left(event) => self.process_event(event),
366			Either::Right(action) => self.process_action(action),
367		}
368
369		true
370	}
371
372	/// Process connection event.
373	fn process_event(&mut self, event: Event) {
374		match event {
375			Event::IncomingConnection(peer_id, index) =>
376				self.on_incoming_connection(peer_id, index),
377			Event::Dropped(peer_id) => self.on_peer_dropped(peer_id),
378		}
379	}
380
381	/// Process action command.
382	fn process_action(&mut self, action: Action) {
383		match action {
384			Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id),
385			Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
386			Action::SetReservedPeers(peer_ids) => self.on_set_reserved_peers(peer_ids),
387			Action::SetReservedOnly(reserved_only) => self.on_set_reserved_only(reserved_only),
388			Action::DisconnectPeer(peer_id) => self.on_disconnect_peer(peer_id),
389			Action::GetReservedPeers(pending_response) =>
390				self.on_get_reserved_peers(pending_response),
391		}
392	}
393
394	/// Send "accept" message to `Notifications`.
395	fn accept_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
396		trace!(
397			target: LOG_TARGET,
398			"Accepting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
399			self.set_id,
400			self.num_in,
401			self.max_in,
402		);
403
404		let _ = self.to_notifications.unbounded_send(Message::Accept(incoming_index));
405	}
406
407	/// Send "reject" message to `Notifications`.
408	fn reject_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
409		trace!(
410			target: LOG_TARGET,
411			"Rejecting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
412			self.set_id,
413			self.num_in,
414			self.max_in,
415		);
416
417		let _ = self.to_notifications.unbounded_send(Message::Reject(incoming_index));
418	}
419
420	/// Send "connect" message to `Notifications`.
421	fn start_connection(&mut self, peer_id: PeerId) {
422		trace!(
423			target: LOG_TARGET,
424			"Connecting to {peer_id} on {:?} ({}/{} num_out/max_out).",
425			self.set_id,
426			self.num_out,
427			self.max_out,
428		);
429
430		let _ = self
431			.to_notifications
432			.unbounded_send(Message::Connect { set_id: self.set_id, peer_id });
433	}
434
435	/// Send "drop" message to `Notifications`.
436	fn drop_connection(&mut self, peer_id: PeerId) {
437		trace!(
438			target: LOG_TARGET,
439			"Dropping {peer_id} on {:?} ({}/{} num_in/max_in, {}/{} num_out/max_out).",
440			self.set_id,
441			self.num_in,
442			self.max_in,
443			self.num_out,
444			self.max_out,
445		);
446
447		let _ = self
448			.to_notifications
449			.unbounded_send(Message::Drop { set_id: self.set_id, peer_id });
450	}
451
452	/// Report peer disconnect event to `PeerStore` for it to update peer's reputation accordingly.
453	/// Should only be called if the remote node disconnected us, not the other way around.
454	fn report_disconnect(&mut self, peer_id: PeerId) {
455		self.peer_store.report_disconnect(peer_id.into());
456	}
457
458	/// Ask `Peerset` if the peer has a reputation value not sufficient for connection with it.
459	fn is_banned(&self, peer_id: &PeerId) -> bool {
460		self.peer_store.is_banned(&peer_id.into())
461	}
462
463	/// Add the peer to the set of reserved peers. [`ProtocolController`] will try to always
464	/// maintain connections with such peers.
465	fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
466		if self.reserved_nodes.contains_key(&peer_id) {
467			warn!(
468				target: LOG_TARGET,
469				"Trying to add an already reserved node {peer_id} as reserved on {:?}.",
470				self.set_id,
471			);
472			return
473		}
474
475		// Get the peer out of non-reserved peers if it's there.
476		let state = match self.nodes.remove(&peer_id) {
477			Some(direction) => {
478				trace!(
479					target: LOG_TARGET,
480					"Marking previously connected node {} ({:?}) as reserved on {:?}.",
481					peer_id,
482					direction,
483					self.set_id
484				);
485				PeerState::Connected(direction)
486			},
487			None => {
488				trace!(target: LOG_TARGET, "Adding reserved node {peer_id} on {:?}.", self.set_id,);
489				PeerState::NotConnected
490			},
491		};
492
493		self.reserved_nodes.insert(peer_id, state.clone());
494
495		// Discount occupied slots or connect to the node.
496		match state {
497			PeerState::Connected(Direction::Inbound) => self.num_in -= 1,
498			PeerState::Connected(Direction::Outbound) => self.num_out -= 1,
499			PeerState::NotConnected => self.alloc_slots(),
500		}
501	}
502
503	/// Remove the peer from the set of reserved peers. The peer is either moved to the set of
504	/// regular nodes or disconnected.
505	fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
506		let state = match self.reserved_nodes.remove(&peer_id) {
507			Some(state) => state,
508			None => {
509				warn!(
510					target: LOG_TARGET,
511					"Trying to remove unknown reserved node {peer_id} from {:?}.", self.set_id,
512				);
513				return
514			},
515		};
516
517		if let PeerState::Connected(direction) = state {
518			// Disconnect if we're at (or over) the regular node limit
519			let disconnect = self.reserved_only ||
520				match direction {
521					Direction::Inbound => self.num_in >= self.max_in,
522					Direction::Outbound => self.num_out >= self.max_out,
523				};
524
525			if disconnect {
526				// Disconnect the node.
527				trace!(
528					target: LOG_TARGET,
529					"Disconnecting previously reserved node {peer_id} ({direction:?}) on {:?}.",
530					self.set_id,
531				);
532				self.drop_connection(peer_id);
533			} else {
534				// Count connections as of regular node.
535				trace!(
536					target: LOG_TARGET,
537					"Making a connected reserved node {peer_id} ({:?}) on {:?} a regular one.",
538					direction,
539					self.set_id,
540				);
541
542				match direction {
543					Direction::Inbound => self.num_in += 1,
544					Direction::Outbound => self.num_out += 1,
545				}
546
547				// Put the node into the list of regular nodes.
548				let prev = self.nodes.insert(peer_id, direction);
549				assert!(prev.is_none(), "Corrupted state: reserved node was also non-reserved.");
550			}
551		} else {
552			trace!(
553				target: LOG_TARGET,
554				"Removed disconnected reserved node {peer_id} from {:?}.",
555				self.set_id,
556			);
557		}
558	}
559
560	/// Replace the set of reserved peers.
561	fn on_set_reserved_peers(&mut self, peer_ids: HashSet<PeerId>) {
562		// Determine the difference between the current group and the new list.
563		let current = self.reserved_nodes.keys().cloned().collect();
564		let to_insert = peer_ids.difference(&current).cloned().collect::<Vec<_>>();
565		let to_remove = current.difference(&peer_ids).cloned().collect::<Vec<_>>();
566
567		for node in to_insert {
568			self.on_add_reserved_peer(node);
569		}
570
571		for node in to_remove {
572			self.on_remove_reserved_peer(node);
573		}
574	}
575
576	/// Change "reserved only" flag. In "reserved only" mode we connect and accept connections to
577	/// reserved nodes only.
578	fn on_set_reserved_only(&mut self, reserved_only: bool) {
579		trace!(target: LOG_TARGET, "Set reserved only to `{reserved_only}` on {:?}", self.set_id);
580
581		self.reserved_only = reserved_only;
582
583		if !reserved_only {
584			return self.alloc_slots()
585		}
586
587		// Disconnect all non-reserved peers.
588		self.nodes
589			.iter()
590			.map(|(k, v)| (*k, *v))
591			.collect::<Vec<(_, _)>>()
592			.iter()
593			.for_each(|(peer_id, direction)| {
594				// Update counters in the loop for `drop_connection` to report the correct number.
595				match direction {
596					Direction::Inbound => self.num_in -= 1,
597					Direction::Outbound => self.num_out -= 1,
598				}
599				self.drop_connection(*peer_id)
600			});
601		self.nodes.clear();
602	}
603
604	/// Get the list of reserved peers.
605	fn on_get_reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
606		let _ = pending_response.send(self.reserved_nodes.keys().cloned().collect());
607	}
608
609	/// Disconnect the peer.
610	fn on_disconnect_peer(&mut self, peer_id: PeerId) {
611		// Don't do anything if the node is reserved.
612		if self.reserved_nodes.contains_key(&peer_id) {
613			debug!(
614				target: LOG_TARGET,
615				"Ignoring request to disconnect reserved peer {peer_id} from {:?}.", self.set_id,
616			);
617			return
618		}
619
620		match self.nodes.remove(&peer_id) {
621			Some(direction) => {
622				trace!(
623					target: LOG_TARGET,
624					"Disconnecting peer {peer_id} ({direction:?}) from {:?}.",
625					self.set_id
626				);
627				match direction {
628					Direction::Inbound => self.num_in -= 1,
629					Direction::Outbound => self.num_out -= 1,
630				}
631				self.drop_connection(peer_id);
632			},
633			None => {
634				debug!(
635					target: LOG_TARGET,
636					"Trying to disconnect unknown peer {peer_id} from {:?}.", self.set_id,
637				);
638			},
639		}
640	}
641
642	/// Indicate that we received an incoming connection. Must be answered either with
643	/// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
644	///
645	/// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming
646	/// connection implicitly means `Connect`, but incoming connections aren't cancelled by
647	/// `dropped`.
648	// Implementation note: because of concurrency issues, `ProtocolController` has an imperfect
649	// view of the peers' states, and may issue commands for a peer after `Notifications` received
650	// an incoming request for that peer. In this case, `Notifications` ignores all the commands
651	// until it receives a response for the incoming request to `ProtocolController`, so we must
652	// ensure we handle this incoming request correctly.
653	fn on_incoming_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
654		trace!(
655			target: LOG_TARGET,
656			"Incoming connection from peer {peer_id} ({incoming_index:?}) on {:?}.",
657			self.set_id,
658		);
659
660		if self.reserved_only && !self.reserved_nodes.contains_key(&peer_id) {
661			self.reject_connection(peer_id, incoming_index);
662			return
663		}
664
665		// Check if the node is reserved first.
666		if let Some(state) = self.reserved_nodes.get_mut(&peer_id) {
667			match state {
668				PeerState::Connected(ref mut direction) => {
669					// We are accepting an incoming connection, so ensure the direction is inbound.
670					// (See the implementation note above.)
671					*direction = Direction::Inbound;
672					self.accept_connection(peer_id, incoming_index);
673				},
674				PeerState::NotConnected =>
675					if self.peer_store.is_banned(&peer_id.into()) {
676						self.reject_connection(peer_id, incoming_index);
677					} else {
678						*state = PeerState::Connected(Direction::Inbound);
679						self.accept_connection(peer_id, incoming_index);
680					},
681			}
682			return
683		}
684
685		// If we're already connected, pretend we are not connected and decide on the node again.
686		// (See the note above.)
687		if let Some(direction) = self.nodes.remove(&peer_id) {
688			trace!(
689				target: LOG_TARGET,
690				"Handling incoming connection from peer {} we think we already connected as {:?} on {:?}.",
691				peer_id,
692				direction,
693				self.set_id
694			);
695			match direction {
696				Direction::Inbound => self.num_in -= 1,
697				Direction::Outbound => self.num_out -= 1,
698			}
699		}
700
701		if self.num_in >= self.max_in {
702			self.reject_connection(peer_id, incoming_index);
703			return
704		}
705
706		if self.is_banned(&peer_id) {
707			self.reject_connection(peer_id, incoming_index);
708			return
709		}
710
711		self.num_in += 1;
712		self.nodes.insert(peer_id, Direction::Inbound);
713		self.accept_connection(peer_id, incoming_index);
714	}
715
716	/// Indicate that a connection with the peer was dropped.
717	fn on_peer_dropped(&mut self, peer_id: PeerId) {
718		self.on_peer_dropped_inner(peer_id).unwrap_or_else(|peer_id| {
719			// We do not assert here, because due to asynchronous nature of communication
720			// between `ProtocolController` and `Notifications` we can receive `Action::Dropped`
721			// for a peer we already disconnected ourself.
722			trace!(
723				target: LOG_TARGET,
724				"Received `Action::Dropped` for not connected peer {peer_id} on {:?}.",
725				self.set_id,
726			)
727		});
728	}
729
730	/// Indicate that a connection with the peer was dropped.
731	/// Returns `Err(PeerId)` if the peer wasn't connected or is not known to us.
732	fn on_peer_dropped_inner(&mut self, peer_id: PeerId) -> Result<(), PeerId> {
733		if self.drop_reserved_peer(&peer_id)? || self.drop_regular_peer(&peer_id) {
734			// The peer found and disconnected.
735			self.report_disconnect(peer_id);
736			Ok(())
737		} else {
738			// The peer was not found in neither regular or reserved lists.
739			Err(peer_id)
740		}
741	}
742
743	/// Try dropping the peer as a reserved peer. Return `Ok(true)` if the peer was found and
744	/// disconnected, `Ok(false)` if it wasn't found, `Err(PeerId)`, if the peer found, but not in
745	/// connected state.
746	fn drop_reserved_peer(&mut self, peer_id: &PeerId) -> Result<bool, PeerId> {
747		let Some(state) = self.reserved_nodes.get_mut(peer_id) else { return Ok(false) };
748
749		if let PeerState::Connected(direction) = state {
750			trace!(
751				target: LOG_TARGET,
752				"Reserved peer {peer_id} ({direction:?}) dropped from {:?}.",
753				self.set_id,
754			);
755			*state = PeerState::NotConnected;
756			Ok(true)
757		} else {
758			Err(*peer_id)
759		}
760	}
761
762	/// Try dropping the peer as a regular peer. Return `true` if the peer was found and
763	/// disconnected, `false` if it wasn't found.
764	fn drop_regular_peer(&mut self, peer_id: &PeerId) -> bool {
765		let Some(direction) = self.nodes.remove(peer_id) else { return false };
766
767		trace!(
768			target: LOG_TARGET,
769			"Peer {peer_id} ({direction:?}) dropped from {:?}.",
770			self.set_id,
771		);
772
773		match direction {
774			Direction::Inbound => self.num_in -= 1,
775			Direction::Outbound => self.num_out -= 1,
776		}
777
778		true
779	}
780
781	/// Initiate outgoing connections trying to connect all reserved nodes and fill in all outgoing
782	/// slots.
783	fn alloc_slots(&mut self) {
784		// Try connecting to reserved nodes first, ignoring nodes with outstanding events/actions.
785		self.reserved_nodes
786			.iter_mut()
787			.filter_map(|(peer_id, state)| {
788				(!state.is_connected() && !self.peer_store.is_banned(&peer_id.into())).then(|| {
789					*state = PeerState::Connected(Direction::Outbound);
790					peer_id
791				})
792			})
793			.cloned()
794			.collect::<Vec<_>>()
795			.into_iter()
796			.for_each(|peer_id| {
797				self.start_connection(peer_id);
798			});
799
800		// Nothing more to do if we're in reserved-only mode or don't have slots available.
801		if self.reserved_only || self.num_out >= self.max_out {
802			return
803		}
804
805		// Fill available slots.
806		let available_slots = (self.max_out - self.num_out).saturated_into();
807
808		// Ignore reserved nodes (connected above), already connected nodes, and nodes with
809		// outstanding events/actions.
810		let ignored = self
811			.reserved_nodes
812			.keys()
813			.map(From::from)
814			.collect::<HashSet<sc_network_types::PeerId>>()
815			.union(
816				&self.nodes.keys().map(From::from).collect::<HashSet<sc_network_types::PeerId>>(),
817			)
818			.cloned()
819			.collect();
820
821		let candidates = self
822			.peer_store
823			.outgoing_candidates(available_slots, ignored)
824			.into_iter()
825			.filter_map(|peer_id| {
826				(!self.reserved_nodes.contains_key(&peer_id.into()) &&
827					!self.nodes.contains_key(&peer_id.into()))
828				.then_some(peer_id)
829				.or_else(|| {
830					error!(
831						target: LOG_TARGET,
832						"`PeerStore` returned a node we asked to ignore: {peer_id}.",
833					);
834					debug_assert!(false, "`PeerStore` returned a node we asked to ignore.");
835					None
836				})
837			})
838			.collect::<Vec<_>>();
839
840		if candidates.len() > available_slots {
841			error!(
842				target: LOG_TARGET,
843				"`PeerStore` returned more nodes than there are slots available.",
844			);
845			debug_assert!(false, "`PeerStore` returned more nodes than there are slots available.");
846		}
847
848		candidates.into_iter().take(available_slots).for_each(|peer_id| {
849			self.num_out += 1;
850			self.nodes.insert(peer_id.into(), Direction::Outbound);
851			self.start_connection(peer_id.into());
852		})
853	}
854}
855
856#[cfg(test)]
857mod tests {
858	use super::*;
859	use crate::{
860		peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT},
861		ReputationChange,
862	};
863	use libp2p::PeerId;
864	use sc_network_common::role::ObservedRole;
865	use sc_utils::mpsc::{tracing_unbounded, TryRecvError};
866	use std::collections::HashSet;
867
868	mockall::mock! {
869		#[derive(Debug)]
870		pub PeerStoreHandle {}
871
872		impl PeerStoreProvider for PeerStoreHandle {
873			fn is_banned(&self, peer_id: &sc_network_types::PeerId) -> bool;
874			fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandleT>);
875			fn report_disconnect(&self, peer_id: sc_network_types::PeerId);
876			fn set_peer_role(&self, peer_id: &sc_network_types::PeerId, role: ObservedRole);
877			fn report_peer(&self, peer_id: sc_network_types::PeerId, change: ReputationChange);
878			fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32;
879			fn peer_role(&self, peer_id: &sc_network_types::PeerId) -> Option<ObservedRole>;
880			fn outgoing_candidates(&self, count: usize, ignored: HashSet<sc_network_types::PeerId>) -> Vec<sc_network_types::PeerId>;
881			fn add_known_peer(&self, peer_id: sc_network_types::PeerId);
882		}
883	}
884
885	#[test]
886	fn reserved_nodes_are_connected_dropped_and_accepted() {
887		let reserved1 = PeerId::random();
888		let reserved2 = PeerId::random();
889
890		// Add first reserved node via config.
891		let config = ProtoSetConfig {
892			in_peers: 0,
893			out_peers: 0,
894			reserved_nodes: std::iter::once(reserved1).collect(),
895			reserved_only: true,
896		};
897		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
898
899		let mut peer_store = MockPeerStoreHandle::new();
900		peer_store.expect_register_protocol().once().return_const(());
901		peer_store.expect_is_banned().times(4).return_const(false);
902		peer_store.expect_report_disconnect().times(2).return_const(());
903
904		let (_handle, mut controller) =
905			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
906
907		// Add second reserved node at runtime (this currently calls `alloc_slots` internally).
908		controller.on_add_reserved_peer(reserved2);
909
910		// Initiate connections (currently, `alloc_slots` is also called internally in
911		// `on_add_reserved_peer` above).
912		controller.alloc_slots();
913
914		let mut messages = Vec::new();
915		while let Some(message) = rx.try_recv().ok() {
916			messages.push(message);
917		}
918		assert_eq!(messages.len(), 2);
919		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
920		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
921
922		// Reserved peers do not occupy slots.
923		assert_eq!(controller.num_out, 0);
924		assert_eq!(controller.num_in, 0);
925
926		// Drop connections to be able to accept reserved nodes.
927		controller.on_peer_dropped(reserved1);
928		controller.on_peer_dropped(reserved2);
929
930		// Incoming connection from `reserved1`.
931		let incoming1 = IncomingIndex(1);
932		controller.on_incoming_connection(reserved1, incoming1);
933		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming1));
934		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
935
936		// Incoming connection from `reserved2`.
937		let incoming2 = IncomingIndex(2);
938		controller.on_incoming_connection(reserved2, incoming2);
939		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming2));
940		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
941
942		// Reserved peers do not occupy slots.
943		assert_eq!(controller.num_out, 0);
944		assert_eq!(controller.num_in, 0);
945	}
946
947	#[test]
948	fn banned_reserved_nodes_are_not_connected_and_not_accepted() {
949		let reserved1 = PeerId::random();
950		let reserved2 = PeerId::random();
951
952		// Add first reserved node via config.
953		let config = ProtoSetConfig {
954			in_peers: 0,
955			out_peers: 0,
956			reserved_nodes: std::iter::once(reserved1).collect(),
957			reserved_only: true,
958		};
959		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
960
961		let mut peer_store = MockPeerStoreHandle::new();
962		peer_store.expect_register_protocol().once().return_const(());
963		peer_store.expect_is_banned().times(6).return_const(true);
964
965		let (_handle, mut controller) =
966			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
967
968		// Add second reserved node at runtime (this currently calls `alloc_slots` internally).
969		controller.on_add_reserved_peer(reserved2);
970
971		// Initiate connections.
972		controller.alloc_slots();
973
974		// No slots occupied.
975		assert_eq!(controller.num_out, 0);
976		assert_eq!(controller.num_in, 0);
977
978		// No commands are generated.
979		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
980
981		// Incoming connection from `reserved1`.
982		let incoming1 = IncomingIndex(1);
983		controller.on_incoming_connection(reserved1, incoming1);
984		assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming1));
985		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
986
987		// Incoming connection from `reserved2`.
988		let incoming2 = IncomingIndex(2);
989		controller.on_incoming_connection(reserved2, incoming2);
990		assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming2));
991		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
992
993		// No slots occupied.
994		assert_eq!(controller.num_out, 0);
995		assert_eq!(controller.num_in, 0);
996	}
997
998	#[test]
999	fn we_try_to_reconnect_to_dropped_reserved_nodes() {
1000		let reserved1 = PeerId::random();
1001		let reserved2 = PeerId::random();
1002
1003		// Add first reserved node via config.
1004		let config = ProtoSetConfig {
1005			in_peers: 0,
1006			out_peers: 0,
1007			reserved_nodes: std::iter::once(reserved1).collect(),
1008			reserved_only: true,
1009		};
1010		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1011
1012		let mut peer_store = MockPeerStoreHandle::new();
1013		peer_store.expect_register_protocol().once().return_const(());
1014		peer_store.expect_is_banned().times(4).return_const(false);
1015		peer_store.expect_report_disconnect().times(2).return_const(());
1016
1017		let (_handle, mut controller) =
1018			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1019
1020		// Add second reserved node at runtime (this calls `alloc_slots` internally).
1021		controller.on_add_reserved_peer(reserved2);
1022
1023		// Initiate connections (actually redundant, see previous comment).
1024		controller.alloc_slots();
1025
1026		let mut messages = Vec::new();
1027		while let Some(message) = rx.try_recv().ok() {
1028			messages.push(message);
1029		}
1030
1031		assert_eq!(messages.len(), 2);
1032		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1033		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1034
1035		// Drop both reserved nodes.
1036		controller.on_peer_dropped(reserved1);
1037		controller.on_peer_dropped(reserved2);
1038
1039		// Initiate connections.
1040		controller.alloc_slots();
1041
1042		let mut messages = Vec::new();
1043		while let Some(message) = rx.try_recv().ok() {
1044			messages.push(message);
1045		}
1046
1047		assert_eq!(messages.len(), 2);
1048		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1049		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1050
1051		// No slots occupied.
1052		assert_eq!(controller.num_out, 0);
1053		assert_eq!(controller.num_in, 0);
1054	}
1055
1056	#[test]
1057	fn nodes_supplied_by_peer_store_are_connected() {
1058		let peer1 = PeerId::random();
1059		let peer2 = PeerId::random();
1060		let candidates = vec![peer1.into(), peer2.into()];
1061
1062		let config = ProtoSetConfig {
1063			in_peers: 0,
1064			// Less slots than candidates.
1065			out_peers: 2,
1066			reserved_nodes: HashSet::new(),
1067			reserved_only: false,
1068		};
1069		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1070
1071		let mut peer_store = MockPeerStoreHandle::new();
1072		peer_store.expect_register_protocol().once().return_const(());
1073		peer_store.expect_outgoing_candidates().once().return_const(candidates);
1074
1075		let (_handle, mut controller) =
1076			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1077
1078		// Initiate connections.
1079		controller.alloc_slots();
1080
1081		let mut messages = Vec::new();
1082		while let Some(message) = rx.try_recv().ok() {
1083			messages.push(message);
1084		}
1085
1086		// Only first two peers are connected (we only have 2 slots).
1087		assert_eq!(messages.len(), 2);
1088		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1089		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1090
1091		// Outgoing slots occupied.
1092		assert_eq!(controller.num_out, 2);
1093		assert_eq!(controller.num_in, 0);
1094
1095		// No more nodes are connected.
1096		controller.alloc_slots();
1097		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1098
1099		// No more slots occupied.
1100		assert_eq!(controller.num_out, 2);
1101		assert_eq!(controller.num_in, 0);
1102	}
1103
1104	#[test]
1105	fn both_reserved_nodes_and_nodes_supplied_by_peer_store_are_connected() {
1106		let reserved1 = PeerId::random();
1107		let reserved2 = PeerId::random();
1108		let regular1 = PeerId::random();
1109		let regular2 = PeerId::random();
1110		let outgoing_candidates = vec![regular1.into(), regular2.into()];
1111		let reserved_nodes = [reserved1, reserved2].iter().cloned().collect();
1112
1113		let config =
1114			ProtoSetConfig { in_peers: 10, out_peers: 10, reserved_nodes, reserved_only: false };
1115		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1116
1117		let mut peer_store = MockPeerStoreHandle::new();
1118		peer_store.expect_register_protocol().once().return_const(());
1119		peer_store.expect_is_banned().times(2).return_const(false);
1120		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1121
1122		let (_handle, mut controller) =
1123			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1124
1125		// Initiate connections.
1126		controller.alloc_slots();
1127
1128		let mut messages = Vec::new();
1129		while let Some(message) = rx.try_recv().ok() {
1130			messages.push(message);
1131		}
1132		assert_eq!(messages.len(), 4);
1133		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1134		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1135		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1136		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular2 }));
1137		assert_eq!(controller.num_out, 2);
1138		assert_eq!(controller.num_in, 0);
1139	}
1140
1141	#[test]
1142	fn if_slots_are_freed_we_try_to_allocate_them_again() {
1143		let peer1 = PeerId::random();
1144		let peer2 = PeerId::random();
1145		let peer3 = PeerId::random();
1146		let candidates1 = vec![peer1.into(), peer2.into()];
1147		let candidates2 = vec![peer3.into()];
1148
1149		let config = ProtoSetConfig {
1150			in_peers: 0,
1151			// Less slots than candidates.
1152			out_peers: 2,
1153			reserved_nodes: HashSet::new(),
1154			reserved_only: false,
1155		};
1156		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1157
1158		let mut peer_store = MockPeerStoreHandle::new();
1159		peer_store.expect_register_protocol().once().return_const(());
1160		peer_store.expect_outgoing_candidates().once().return_const(candidates1);
1161		peer_store.expect_outgoing_candidates().once().return_const(candidates2);
1162		peer_store.expect_report_disconnect().times(2).return_const(());
1163
1164		let (_handle, mut controller) =
1165			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1166
1167		// Initiate connections.
1168		controller.alloc_slots();
1169
1170		let mut messages = Vec::new();
1171		while let Some(message) = rx.try_recv().ok() {
1172			messages.push(message);
1173		}
1174
1175		// Only first two peers are connected (we only have 2 slots).
1176		assert_eq!(messages.len(), 2);
1177		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1178		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1179
1180		// Outgoing slots occupied.
1181		assert_eq!(controller.num_out, 2);
1182		assert_eq!(controller.num_in, 0);
1183
1184		// No more nodes are connected.
1185		controller.alloc_slots();
1186		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1187
1188		// No more slots occupied.
1189		assert_eq!(controller.num_out, 2);
1190		assert_eq!(controller.num_in, 0);
1191
1192		// Drop peers.
1193		controller.on_peer_dropped(peer1);
1194		controller.on_peer_dropped(peer2);
1195
1196		// Slots are freed.
1197		assert_eq!(controller.num_out, 0);
1198		assert_eq!(controller.num_in, 0);
1199
1200		// Initiate connections.
1201		controller.alloc_slots();
1202
1203		let mut messages = Vec::new();
1204		while let Some(message) = rx.try_recv().ok() {
1205			messages.push(message);
1206		}
1207
1208		// Peers are connected.
1209		assert_eq!(messages.len(), 1);
1210		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer3 }));
1211
1212		// Outgoing slots occupied.
1213		assert_eq!(controller.num_out, 1);
1214		assert_eq!(controller.num_in, 0);
1215	}
1216
1217	#[test]
1218	fn in_reserved_only_mode_no_peers_are_requested_from_peer_store_and_connected() {
1219		let config = ProtoSetConfig {
1220			in_peers: 0,
1221			// Make sure we have slots available.
1222			out_peers: 2,
1223			reserved_nodes: HashSet::new(),
1224			reserved_only: true,
1225		};
1226		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1227
1228		let mut peer_store = MockPeerStoreHandle::new();
1229		peer_store.expect_register_protocol().once().return_const(());
1230
1231		let (_handle, mut controller) =
1232			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1233
1234		// Initiate connections.
1235		controller.alloc_slots();
1236
1237		// No nodes are connected.
1238		assert_eq!(controller.num_out, 0);
1239		assert_eq!(controller.num_in, 0);
1240		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1241	}
1242
1243	#[test]
1244	fn in_reserved_only_mode_no_regular_peers_are_accepted() {
1245		let config = ProtoSetConfig {
1246			// Make sure we have slots available.
1247			in_peers: 2,
1248			out_peers: 0,
1249			reserved_nodes: HashSet::new(),
1250			reserved_only: true,
1251		};
1252		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1253
1254		let mut peer_store = MockPeerStoreHandle::new();
1255		peer_store.expect_register_protocol().once().return_const(());
1256
1257		let (_handle, mut controller) =
1258			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1259
1260		let peer = PeerId::random();
1261		let incoming_index = IncomingIndex(1);
1262		controller.on_incoming_connection(peer, incoming_index);
1263
1264		let mut messages = Vec::new();
1265		while let Some(message) = rx.try_recv().ok() {
1266			messages.push(message);
1267		}
1268
1269		// Peer is rejected.
1270		assert_eq!(messages.len(), 1);
1271		assert!(messages.contains(&Message::Reject(incoming_index)));
1272		assert_eq!(controller.num_out, 0);
1273		assert_eq!(controller.num_in, 0);
1274	}
1275
1276	#[test]
1277	fn disabling_reserved_only_mode_allows_to_connect_to_peers() {
1278		let peer1 = PeerId::random();
1279		let peer2 = PeerId::random();
1280		let candidates = vec![peer1.into(), peer2.into()];
1281
1282		let config = ProtoSetConfig {
1283			in_peers: 0,
1284			// Make sure we have slots available.
1285			out_peers: 10,
1286			reserved_nodes: HashSet::new(),
1287			reserved_only: true,
1288		};
1289		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1290
1291		let mut peer_store = MockPeerStoreHandle::new();
1292		peer_store.expect_register_protocol().once().return_const(());
1293		peer_store.expect_outgoing_candidates().once().return_const(candidates);
1294
1295		let (_handle, mut controller) =
1296			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1297
1298		// Initiate connections.
1299		controller.alloc_slots();
1300
1301		// No nodes are connected.
1302		assert_eq!(controller.num_out, 0);
1303		assert_eq!(controller.num_in, 0);
1304		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1305
1306		// Disable reserved-only mode (this also connects to peers).
1307		controller.on_set_reserved_only(false);
1308
1309		let mut messages = Vec::new();
1310		while let Some(message) = rx.try_recv().ok() {
1311			messages.push(message);
1312		}
1313
1314		assert_eq!(messages.len(), 2);
1315		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1316		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1317		assert_eq!(controller.num_out, 2);
1318		assert_eq!(controller.num_in, 0);
1319	}
1320
1321	#[test]
1322	fn enabling_reserved_only_mode_disconnects_regular_peers() {
1323		let reserved1 = PeerId::random();
1324		let reserved2 = PeerId::random();
1325		let regular1 = PeerId::random();
1326		let regular2 = PeerId::random();
1327		let outgoing_candidates = vec![regular1.into()];
1328
1329		let config = ProtoSetConfig {
1330			in_peers: 10,
1331			out_peers: 10,
1332			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1333			reserved_only: false,
1334		};
1335		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1336
1337		let mut peer_store = MockPeerStoreHandle::new();
1338		peer_store.expect_register_protocol().once().return_const(());
1339		peer_store.expect_is_banned().times(3).return_const(false);
1340		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1341
1342		let (_handle, mut controller) =
1343			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1344		assert_eq!(controller.num_out, 0);
1345		assert_eq!(controller.num_in, 0);
1346
1347		// Connect `regular1` as outbound.
1348		controller.alloc_slots();
1349
1350		let mut messages = Vec::new();
1351		while let Some(message) = rx.try_recv().ok() {
1352			messages.push(message);
1353		}
1354		assert_eq!(messages.len(), 3);
1355		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1356		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1357		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1358		assert_eq!(controller.num_out, 1);
1359		assert_eq!(controller.num_in, 0);
1360
1361		// Connect `regular2` as inbound.
1362		let incoming_index = IncomingIndex(1);
1363		controller.on_incoming_connection(regular2, incoming_index);
1364		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming_index));
1365		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1366		assert_eq!(controller.num_out, 1);
1367		assert_eq!(controller.num_in, 1);
1368
1369		// Switch to reserved-only mode.
1370		controller.on_set_reserved_only(true);
1371
1372		let mut messages = Vec::new();
1373		while let Some(message) = rx.try_recv().ok() {
1374			messages.push(message);
1375		}
1376		assert_eq!(messages.len(), 2);
1377		assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular1 }));
1378		assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular2 }));
1379		assert_eq!(controller.nodes.len(), 0);
1380		assert_eq!(controller.num_out, 0);
1381		assert_eq!(controller.num_in, 0);
1382	}
1383
1384	#[test]
1385	fn removed_disconnected_reserved_node_is_forgotten() {
1386		let reserved1 = PeerId::random();
1387		let reserved2 = PeerId::random();
1388
1389		let config = ProtoSetConfig {
1390			in_peers: 10,
1391			out_peers: 10,
1392			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1393			reserved_only: false,
1394		};
1395		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1396
1397		let mut peer_store = MockPeerStoreHandle::new();
1398		peer_store.expect_register_protocol().once().return_const(());
1399
1400		let (_handle, mut controller) =
1401			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1402		assert_eq!(controller.reserved_nodes.len(), 2);
1403		assert_eq!(controller.nodes.len(), 0);
1404		assert_eq!(controller.num_out, 0);
1405		assert_eq!(controller.num_in, 0);
1406
1407		controller.on_remove_reserved_peer(reserved1);
1408		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1409		assert_eq!(controller.reserved_nodes.len(), 1);
1410		assert!(!controller.reserved_nodes.contains_key(&reserved1));
1411		assert_eq!(controller.nodes.len(), 0);
1412		assert_eq!(controller.num_out, 0);
1413		assert_eq!(controller.num_in, 0);
1414	}
1415
1416	#[test]
1417	fn removed_connected_reserved_node_is_disconnected_in_reserved_only_mode() {
1418		let reserved1 = PeerId::random();
1419		let reserved2 = PeerId::random();
1420
1421		let config = ProtoSetConfig {
1422			in_peers: 10,
1423			out_peers: 10,
1424			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1425			reserved_only: true,
1426		};
1427		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1428
1429		let mut peer_store = MockPeerStoreHandle::new();
1430		peer_store.expect_register_protocol().once().return_const(());
1431		peer_store.expect_is_banned().times(2).return_const(false);
1432
1433		let (_handle, mut controller) =
1434			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1435
1436		// Initiate connections.
1437		controller.alloc_slots();
1438		let mut messages = Vec::new();
1439		while let Some(message) = rx.try_recv().ok() {
1440			messages.push(message);
1441		}
1442		assert_eq!(messages.len(), 2);
1443		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1444		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1445		assert_eq!(controller.reserved_nodes.len(), 2);
1446		assert!(controller.reserved_nodes.contains_key(&reserved1));
1447		assert!(controller.reserved_nodes.contains_key(&reserved2));
1448		assert!(controller.nodes.is_empty());
1449
1450		// Remove reserved node
1451		controller.on_remove_reserved_peer(reserved1);
1452		assert_eq!(
1453			rx.try_recv().unwrap(),
1454			Message::Drop { set_id: SetId::from(0), peer_id: reserved1 }
1455		);
1456		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1457		assert_eq!(controller.reserved_nodes.len(), 1);
1458		assert!(controller.reserved_nodes.contains_key(&reserved2));
1459		assert!(controller.nodes.is_empty());
1460	}
1461
1462	#[test]
1463	fn removed_connected_reserved_nodes_become_regular_in_non_reserved_mode() {
1464		let peer1 = PeerId::random();
1465		let peer2 = PeerId::random();
1466
1467		let config = ProtoSetConfig {
1468			in_peers: 10,
1469			out_peers: 10,
1470			reserved_nodes: [peer1, peer2].iter().cloned().collect(),
1471			reserved_only: false,
1472		};
1473		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1474
1475		let mut peer_store = MockPeerStoreHandle::new();
1476		peer_store.expect_register_protocol().once().return_const(());
1477		peer_store.expect_is_banned().times(2).return_const(false);
1478		peer_store
1479			.expect_outgoing_candidates()
1480			.once()
1481			.return_const(Vec::<sc_network_types::PeerId>::new());
1482
1483		let (_handle, mut controller) =
1484			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1485
1486		// Connect `peer1` as inbound, `peer2` as outbound.
1487		controller.on_incoming_connection(peer1, IncomingIndex(1));
1488		controller.alloc_slots();
1489		let mut messages = Vec::new();
1490		while let Some(message) = rx.try_recv().ok() {
1491			messages.push(message);
1492		}
1493		assert_eq!(messages.len(), 2);
1494		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1495		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1496		assert_eq!(controller.num_out, 0);
1497		assert_eq!(controller.num_in, 0);
1498
1499		// Remove reserved nodes (and make them regular)
1500		controller.on_remove_reserved_peer(peer1);
1501		controller.on_remove_reserved_peer(peer2);
1502		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1503		assert_eq!(controller.nodes.len(), 2);
1504		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Inbound)));
1505		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Outbound)));
1506		assert_eq!(controller.num_out, 1);
1507		assert_eq!(controller.num_in, 1);
1508	}
1509
1510	#[test]
1511	fn regular_nodes_stop_occupying_slots_when_become_reserved() {
1512		let peer1 = PeerId::random();
1513		let peer2 = PeerId::random();
1514		let outgoing_candidates = vec![peer1.into()];
1515
1516		let config = ProtoSetConfig {
1517			in_peers: 10,
1518			out_peers: 10,
1519			reserved_nodes: HashSet::new(),
1520			reserved_only: false,
1521		};
1522		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1523
1524		let mut peer_store = MockPeerStoreHandle::new();
1525		peer_store.expect_register_protocol().once().return_const(());
1526		peer_store.expect_is_banned().once().return_const(false);
1527		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1528
1529		let (_handle, mut controller) =
1530			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1531
1532		// Connect `peer1` as outbound & `peer2` as inbound.
1533		controller.alloc_slots();
1534		controller.on_incoming_connection(peer2, IncomingIndex(1));
1535		let mut messages = Vec::new();
1536		while let Some(message) = rx.try_recv().ok() {
1537			messages.push(message);
1538		}
1539		assert_eq!(messages.len(), 2);
1540		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1541		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1542		assert_eq!(controller.num_in, 1);
1543		assert_eq!(controller.num_out, 1);
1544
1545		controller.on_add_reserved_peer(peer1);
1546		controller.on_add_reserved_peer(peer2);
1547		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1548		assert_eq!(controller.num_in, 0);
1549		assert_eq!(controller.num_out, 0);
1550	}
1551
1552	#[test]
1553	fn disconnecting_regular_peers_work() {
1554		let peer1 = PeerId::random();
1555		let peer2 = PeerId::random();
1556		let outgoing_candidates = vec![peer1.into()];
1557
1558		let config = ProtoSetConfig {
1559			in_peers: 10,
1560			out_peers: 10,
1561			reserved_nodes: HashSet::new(),
1562			reserved_only: false,
1563		};
1564		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1565
1566		let mut peer_store = MockPeerStoreHandle::new();
1567		peer_store.expect_register_protocol().once().return_const(());
1568		peer_store.expect_is_banned().once().return_const(false);
1569		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1570
1571		let (_handle, mut controller) =
1572			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1573
1574		// Connect `peer1` as outbound & `peer2` as inbound.
1575		controller.alloc_slots();
1576		controller.on_incoming_connection(peer2, IncomingIndex(1));
1577		let mut messages = Vec::new();
1578		while let Some(message) = rx.try_recv().ok() {
1579			messages.push(message);
1580		}
1581		assert_eq!(messages.len(), 2);
1582		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1583		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1584		assert_eq!(controller.nodes.len(), 2);
1585		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1586		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1587		assert_eq!(controller.num_in, 1);
1588		assert_eq!(controller.num_out, 1);
1589
1590		controller.on_disconnect_peer(peer1);
1591		assert_eq!(
1592			rx.try_recv().unwrap(),
1593			Message::Drop { set_id: SetId::from(0), peer_id: peer1 }
1594		);
1595		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1596		assert_eq!(controller.nodes.len(), 1);
1597		assert!(!controller.nodes.contains_key(&peer1));
1598		assert_eq!(controller.num_in, 1);
1599		assert_eq!(controller.num_out, 0);
1600
1601		controller.on_disconnect_peer(peer2);
1602		assert_eq!(
1603			rx.try_recv().unwrap(),
1604			Message::Drop { set_id: SetId::from(0), peer_id: peer2 }
1605		);
1606		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1607		assert_eq!(controller.nodes.len(), 0);
1608		assert_eq!(controller.num_in, 0);
1609		assert_eq!(controller.num_out, 0);
1610	}
1611
1612	#[test]
1613	fn disconnecting_reserved_peers_is_a_noop() {
1614		let reserved1 = PeerId::random();
1615		let reserved2 = PeerId::random();
1616
1617		let config = ProtoSetConfig {
1618			in_peers: 10,
1619			out_peers: 10,
1620			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1621			reserved_only: false,
1622		};
1623		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1624
1625		let mut peer_store = MockPeerStoreHandle::new();
1626		peer_store.expect_register_protocol().once().return_const(());
1627		peer_store.expect_is_banned().times(2).return_const(false);
1628		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1629
1630		let (_handle, mut controller) =
1631			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1632
1633		// Connect `reserved1` as inbound & `reserved2` as outbound.
1634		controller.on_incoming_connection(reserved1, IncomingIndex(1));
1635		controller.alloc_slots();
1636		let mut messages = Vec::new();
1637		while let Some(message) = rx.try_recv().ok() {
1638			messages.push(message);
1639		}
1640		assert_eq!(messages.len(), 2);
1641		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1642		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1643		assert!(matches!(
1644			controller.reserved_nodes.get(&reserved1),
1645			Some(PeerState::Connected(Direction::Inbound))
1646		));
1647		assert!(matches!(
1648			controller.reserved_nodes.get(&reserved2),
1649			Some(PeerState::Connected(Direction::Outbound))
1650		));
1651
1652		controller.on_disconnect_peer(reserved1);
1653		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1654		assert!(matches!(
1655			controller.reserved_nodes.get(&reserved1),
1656			Some(PeerState::Connected(Direction::Inbound))
1657		));
1658
1659		controller.on_disconnect_peer(reserved2);
1660		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1661		assert!(matches!(
1662			controller.reserved_nodes.get(&reserved2),
1663			Some(PeerState::Connected(Direction::Outbound))
1664		));
1665	}
1666
1667	#[test]
1668	fn dropping_regular_peers_work() {
1669		let peer1 = PeerId::random();
1670		let peer2 = PeerId::random();
1671		let outgoing_candidates = vec![peer1.into()];
1672
1673		let config = ProtoSetConfig {
1674			in_peers: 10,
1675			out_peers: 10,
1676			reserved_nodes: HashSet::new(),
1677			reserved_only: false,
1678		};
1679		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1680
1681		let mut peer_store = MockPeerStoreHandle::new();
1682		peer_store.expect_register_protocol().once().return_const(());
1683		peer_store.expect_is_banned().once().return_const(false);
1684		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1685		peer_store.expect_report_disconnect().times(2).return_const(());
1686
1687		let (_handle, mut controller) =
1688			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1689
1690		// Connect `peer1` as outbound & `peer2` as inbound.
1691		controller.alloc_slots();
1692		controller.on_incoming_connection(peer2, IncomingIndex(1));
1693		let mut messages = Vec::new();
1694		while let Some(message) = rx.try_recv().ok() {
1695			messages.push(message);
1696		}
1697		assert_eq!(messages.len(), 2);
1698		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1699		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1700		assert_eq!(controller.nodes.len(), 2);
1701		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1702		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1703		assert_eq!(controller.num_in, 1);
1704		assert_eq!(controller.num_out, 1);
1705
1706		controller.on_peer_dropped(peer1);
1707		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1708		assert_eq!(controller.nodes.len(), 1);
1709		assert!(!controller.nodes.contains_key(&peer1));
1710		assert_eq!(controller.num_in, 1);
1711		assert_eq!(controller.num_out, 0);
1712
1713		controller.on_peer_dropped(peer2);
1714		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1715		assert_eq!(controller.nodes.len(), 0);
1716		assert_eq!(controller.num_in, 0);
1717		assert_eq!(controller.num_out, 0);
1718	}
1719
1720	#[test]
1721	fn incoming_request_for_connected_reserved_node_switches_it_to_inbound() {
1722		let reserved1 = PeerId::random();
1723		let reserved2 = PeerId::random();
1724
1725		let config = ProtoSetConfig {
1726			in_peers: 10,
1727			out_peers: 10,
1728			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1729			reserved_only: false,
1730		};
1731		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1732
1733		let mut peer_store = MockPeerStoreHandle::new();
1734		peer_store.expect_register_protocol().once().return_const(());
1735		peer_store.expect_is_banned().times(2).return_const(false);
1736		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1737
1738		let (_handle, mut controller) =
1739			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1740
1741		// Connect `reserved1` as inbound & `reserved2` as outbound.
1742		controller.on_incoming_connection(reserved1, IncomingIndex(1));
1743		controller.alloc_slots();
1744		let mut messages = Vec::new();
1745		while let Some(message) = rx.try_recv().ok() {
1746			messages.push(message);
1747		}
1748		assert_eq!(messages.len(), 2);
1749		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1750		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1751		assert!(matches!(
1752			controller.reserved_nodes.get(&reserved1),
1753			Some(PeerState::Connected(Direction::Inbound))
1754		));
1755		assert!(matches!(
1756			controller.reserved_nodes.get(&reserved2),
1757			Some(PeerState::Connected(Direction::Outbound))
1758		));
1759
1760		// Incoming request for `reserved1`.
1761		controller.on_incoming_connection(reserved1, IncomingIndex(2));
1762		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1763		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1764		assert!(matches!(
1765			controller.reserved_nodes.get(&reserved1),
1766			Some(PeerState::Connected(Direction::Inbound))
1767		));
1768
1769		// Incoming request for `reserved2`.
1770		controller.on_incoming_connection(reserved2, IncomingIndex(3));
1771		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(3)));
1772		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1773		assert!(matches!(
1774			controller.reserved_nodes.get(&reserved2),
1775			Some(PeerState::Connected(Direction::Inbound))
1776		));
1777	}
1778
1779	#[test]
1780	fn incoming_request_for_connected_regular_node_switches_it_to_inbound() {
1781		let regular1 = PeerId::random();
1782		let regular2 = PeerId::random();
1783		let outgoing_candidates = vec![regular1.into()];
1784
1785		let config = ProtoSetConfig {
1786			in_peers: 10,
1787			out_peers: 10,
1788			reserved_nodes: HashSet::new(),
1789			reserved_only: false,
1790		};
1791		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1792
1793		let mut peer_store = MockPeerStoreHandle::new();
1794		peer_store.expect_register_protocol().once().return_const(());
1795		peer_store.expect_is_banned().times(3).return_const(false);
1796		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1797
1798		let (_handle, mut controller) =
1799			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1800		assert_eq!(controller.num_out, 0);
1801		assert_eq!(controller.num_in, 0);
1802
1803		// Connect `regular1` as outbound.
1804		controller.alloc_slots();
1805		assert_eq!(
1806			rx.try_recv().ok().unwrap(),
1807			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1808		);
1809		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1810		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1811
1812		// Connect `regular2` as inbound.
1813		controller.on_incoming_connection(regular2, IncomingIndex(0));
1814		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1815		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1816		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1817
1818		// Incoming request for `regular1`.
1819		controller.on_incoming_connection(regular1, IncomingIndex(1));
1820		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(1)));
1821		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1822		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Inbound,));
1823
1824		// Incoming request for `regular2`.
1825		controller.on_incoming_connection(regular2, IncomingIndex(2));
1826		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1827		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1828		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1829	}
1830
1831	#[test]
1832	fn incoming_request_for_connected_node_is_rejected_if_its_banned() {
1833		let regular1 = PeerId::random();
1834		let regular2 = PeerId::random();
1835		let outgoing_candidates = vec![regular1.into()];
1836
1837		let config = ProtoSetConfig {
1838			in_peers: 10,
1839			out_peers: 10,
1840			reserved_nodes: HashSet::new(),
1841			reserved_only: false,
1842		};
1843		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1844
1845		let mut peer_store = MockPeerStoreHandle::new();
1846		peer_store.expect_register_protocol().once().return_const(());
1847		peer_store.expect_is_banned().once().return_const(false);
1848		peer_store.expect_is_banned().times(2).return_const(true);
1849		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1850
1851		let (_handle, mut controller) =
1852			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1853		assert_eq!(controller.num_out, 0);
1854		assert_eq!(controller.num_in, 0);
1855
1856		// Connect `regular1` as outbound.
1857		controller.alloc_slots();
1858		assert_eq!(
1859			rx.try_recv().ok().unwrap(),
1860			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1861		);
1862		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1863		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1864
1865		// Connect `regular2` as inbound.
1866		controller.on_incoming_connection(regular2, IncomingIndex(0));
1867		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1868		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1869		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1870
1871		// Incoming request for `regular1`.
1872		controller.on_incoming_connection(regular1, IncomingIndex(1));
1873		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1874		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1875		assert!(!controller.nodes.contains_key(&regular1));
1876
1877		// Incoming request for `regular2`.
1878		controller.on_incoming_connection(regular2, IncomingIndex(2));
1879		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1880		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1881		assert!(!controller.nodes.contains_key(&regular2));
1882	}
1883
1884	#[test]
1885	fn incoming_request_for_connected_node_is_rejected_if_no_slots_available() {
1886		let regular1 = PeerId::random();
1887		let regular2 = PeerId::random();
1888		let outgoing_candidates = vec![regular1.into()];
1889
1890		let config = ProtoSetConfig {
1891			in_peers: 1,
1892			out_peers: 1,
1893			reserved_nodes: HashSet::new(),
1894			reserved_only: false,
1895		};
1896		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1897
1898		let mut peer_store = MockPeerStoreHandle::new();
1899		peer_store.expect_register_protocol().once().return_const(());
1900		peer_store.expect_is_banned().once().return_const(false);
1901		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1902
1903		let (_handle, mut controller) =
1904			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1905		assert_eq!(controller.num_out, 0);
1906		assert_eq!(controller.num_in, 0);
1907
1908		// Connect `regular1` as outbound.
1909		controller.alloc_slots();
1910		assert_eq!(
1911			rx.try_recv().ok().unwrap(),
1912			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1913		);
1914		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1915		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1916
1917		// Connect `regular2` as inbound.
1918		controller.on_incoming_connection(regular2, IncomingIndex(0));
1919		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1920		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1921		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1922
1923		controller.max_in = 0;
1924
1925		// Incoming request for `regular1`.
1926		controller.on_incoming_connection(regular1, IncomingIndex(1));
1927		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1928		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1929		assert!(!controller.nodes.contains_key(&regular1));
1930
1931		// Incoming request for `regular2`.
1932		controller.on_incoming_connection(regular2, IncomingIndex(2));
1933		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1934		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1935		assert!(!controller.nodes.contains_key(&regular2));
1936	}
1937
1938	#[test]
1939	fn incoming_peers_that_exceed_slots_are_rejected() {
1940		let peer1 = PeerId::random();
1941		let peer2 = PeerId::random();
1942
1943		let config = ProtoSetConfig {
1944			in_peers: 1,
1945			out_peers: 10,
1946			reserved_nodes: HashSet::new(),
1947			reserved_only: false,
1948		};
1949		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1950
1951		let mut peer_store = MockPeerStoreHandle::new();
1952		peer_store.expect_register_protocol().once().return_const(());
1953		peer_store.expect_is_banned().once().return_const(false);
1954
1955		let (_handle, mut controller) =
1956			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1957
1958		// Connect `peer1` as inbound.
1959		controller.on_incoming_connection(peer1, IncomingIndex(1));
1960		assert_eq!(rx.try_recv().unwrap(), Message::Accept(IncomingIndex(1)));
1961		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1962
1963		// Incoming requests for `peer2`.
1964		controller.on_incoming_connection(peer2, IncomingIndex(2));
1965		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(2)));
1966		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1967	}
1968
1969	#[test]
1970	fn banned_regular_incoming_node_is_rejected() {
1971		let peer1 = PeerId::random();
1972
1973		let config = ProtoSetConfig {
1974			in_peers: 10,
1975			out_peers: 10,
1976			reserved_nodes: HashSet::new(),
1977			reserved_only: false,
1978		};
1979		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1980
1981		let mut peer_store = MockPeerStoreHandle::new();
1982		peer_store.expect_register_protocol().once().return_const(());
1983		peer_store.expect_is_banned().once().return_const(true);
1984
1985		let (_handle, mut controller) =
1986			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1987
1988		// Incoming request.
1989		controller.on_incoming_connection(peer1, IncomingIndex(1));
1990		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
1991		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1992	}
1993
1994	#[test]
1995	fn banned_reserved_incoming_node_is_rejected() {
1996		let reserved1 = PeerId::random();
1997
1998		let config = ProtoSetConfig {
1999			in_peers: 10,
2000			out_peers: 10,
2001			reserved_nodes: std::iter::once(reserved1).collect(),
2002			reserved_only: false,
2003		};
2004		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2005
2006		let mut peer_store = MockPeerStoreHandle::new();
2007		peer_store.expect_register_protocol().once().return_const(());
2008		peer_store.expect_is_banned().once().return_const(true);
2009
2010		let (_handle, mut controller) =
2011			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2012		assert!(controller.reserved_nodes.contains_key(&reserved1));
2013
2014		// Incoming request.
2015		controller.on_incoming_connection(reserved1, IncomingIndex(1));
2016		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
2017		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2018	}
2019
2020	#[test]
2021	fn we_dont_connect_to_banned_reserved_node() {
2022		let reserved1 = PeerId::random();
2023
2024		let config = ProtoSetConfig {
2025			in_peers: 10,
2026			out_peers: 10,
2027			reserved_nodes: std::iter::once(reserved1).collect(),
2028			reserved_only: false,
2029		};
2030		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2031
2032		let mut peer_store = MockPeerStoreHandle::new();
2033		peer_store.expect_register_protocol().once().return_const(());
2034		peer_store.expect_is_banned().once().return_const(true);
2035		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
2036
2037		let (_handle, mut controller) =
2038			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2039		assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2040
2041		// Initiate connections
2042		controller.alloc_slots();
2043		assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2044		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2045	}
2046}