sc_network/
request_responses.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Collection of request-response protocols.
20//!
21//! The [`RequestResponsesBehaviour`] struct defined in this module provides support for zero or
22//! more so-called "request-response" protocols.
23//!
24//! A request-response protocol works in the following way:
25//!
26//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
27//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed
28//! with the request itself. The remote then sends the size of the response as a LEB128 number,
29//! followed with the response.
30//!
31//! - Requests have a certain time limit before they time out. This time includes the time it
32//! takes to send/receive the request and response.
33//!
34//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
35//! is used to handle incoming requests.
36
37use crate::{
38	peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39	service::traits::RequestResponseConfig as RequestResponseConfigT,
40	types::ProtocolName,
41	ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46	core::{Endpoint, Multiaddr},
47	request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48	swarm::{
49		behaviour::{ConnectionClosed, FromSwarm},
50		handler::multi::MultiHandler,
51		ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, THandler,
52		THandlerInEvent, THandlerOutEvent, ToSwarm,
53	},
54	PeerId,
55};
56
57use std::{
58	collections::{hash_map::Entry, HashMap},
59	io, iter,
60	ops::Deref,
61	pin::Pin,
62	sync::Arc,
63	task::{Context, Poll},
64	time::{Duration, Instant},
65};
66
67pub use libp2p::request_response::{Config, RequestId};
68
69/// Periodically check if requests are taking too long.
70const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
71
72/// Possible failures occurring in the context of sending an outbound request and receiving the
73/// response.
74#[derive(Debug, thiserror::Error)]
75pub enum OutboundFailure {
76	/// The request could not be sent because a dialing attempt failed.
77	#[error("Failed to dial the requested peer")]
78	DialFailure,
79	/// The request timed out before a response was received.
80	#[error("Timeout while waiting for a response")]
81	Timeout,
82	/// The connection closed before a response was received.
83	#[error("Connection was closed before a response was received")]
84	ConnectionClosed,
85	/// The remote supports none of the requested protocols.
86	#[error("The remote supports none of the requested protocols")]
87	UnsupportedProtocols,
88}
89
90impl From<request_response::OutboundFailure> for OutboundFailure {
91	fn from(out: request_response::OutboundFailure) -> Self {
92		match out {
93			request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
94			request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
95			request_response::OutboundFailure::ConnectionClosed =>
96				OutboundFailure::ConnectionClosed,
97			request_response::OutboundFailure::UnsupportedProtocols =>
98				OutboundFailure::UnsupportedProtocols,
99		}
100	}
101}
102
103/// Possible failures occurring in the context of receiving an inbound request and sending a
104/// response.
105#[derive(Debug, thiserror::Error)]
106pub enum InboundFailure {
107	/// The inbound request timed out, either while reading the incoming request or before a
108	/// response is sent
109	#[error("Timeout while receiving request or sending response")]
110	Timeout,
111	/// The connection closed before a response could be send.
112	#[error("Connection was closed before a response could be sent")]
113	ConnectionClosed,
114	/// The local peer supports none of the protocols requested by the remote.
115	#[error("The local peer supports none of the protocols requested by the remote")]
116	UnsupportedProtocols,
117	/// The local peer failed to respond to an inbound request
118	#[error("The response channel was dropped without sending a response to the remote")]
119	ResponseOmission,
120}
121
122impl From<request_response::InboundFailure> for InboundFailure {
123	fn from(out: request_response::InboundFailure) -> Self {
124		match out {
125			request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
126			request_response::InboundFailure::Timeout => InboundFailure::Timeout,
127			request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
128			request_response::InboundFailure::UnsupportedProtocols =>
129				InboundFailure::UnsupportedProtocols,
130		}
131	}
132}
133
134/// Error in a request.
135#[derive(Debug, thiserror::Error)]
136#[allow(missing_docs)]
137pub enum RequestFailure {
138	#[error("We are not currently connected to the requested peer.")]
139	NotConnected,
140	#[error("Given protocol hasn't been registered.")]
141	UnknownProtocol,
142	#[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
143	Refused,
144	#[error("The remote replied, but the local node is no longer interested in the response.")]
145	Obsolete,
146	#[error("Problem on the network: {0}")]
147	Network(OutboundFailure),
148}
149
150/// Configuration for a single request-response protocol.
151#[derive(Debug, Clone)]
152pub struct ProtocolConfig {
153	/// Name of the protocol on the wire. Should be something like `/foo/bar`.
154	pub name: ProtocolName,
155
156	/// Fallback on the wire protocol names to support.
157	pub fallback_names: Vec<ProtocolName>,
158
159	/// Maximum allowed size, in bytes, of a request.
160	///
161	/// Any request larger than this value will be declined as a way to avoid allocating too
162	/// much memory for it.
163	pub max_request_size: u64,
164
165	/// Maximum allowed size, in bytes, of a response.
166	///
167	/// Any response larger than this value will be declined as a way to avoid allocating too
168	/// much memory for it.
169	pub max_response_size: u64,
170
171	/// Duration after which emitted requests are considered timed out.
172	///
173	/// If you expect the response to come back quickly, you should set this to a smaller duration.
174	pub request_timeout: Duration,
175
176	/// Channel on which the networking service will send incoming requests.
177	///
178	/// Every time a peer sends a request to the local node using this protocol, the networking
179	/// service will push an element on this channel. The receiving side of this channel then has
180	/// to pull this element, process the request, and send back the response to send back to the
181	/// peer.
182	///
183	/// The size of the channel has to be carefully chosen. If the channel is full, the networking
184	/// service will discard the incoming request send back an error to the peer. Consequently,
185	/// the channel being full is an indicator that the node is overloaded.
186	///
187	/// You can typically set the size of the channel to `T / d`, where `T` is the
188	/// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
189	/// build a response.
190	///
191	/// Can be `None` if the local node does not support answering incoming requests.
192	/// If this is `None`, then the local node will not advertise support for this protocol towards
193	/// other peers. If this is `Some` but the channel is closed, then the local node will
194	/// advertise support for this protocol, but any incoming request will lead to an error being
195	/// sent back.
196	pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
197}
198
199impl RequestResponseConfigT for ProtocolConfig {
200	fn protocol_name(&self) -> &ProtocolName {
201		&self.name
202	}
203}
204
205/// A single request received by a peer on a request-response protocol.
206#[derive(Debug)]
207pub struct IncomingRequest {
208	/// Who sent the request.
209	pub peer: sc_network_types::PeerId,
210
211	/// Request sent by the remote. Will always be smaller than
212	/// [`ProtocolConfig::max_request_size`].
213	pub payload: Vec<u8>,
214
215	/// Channel to send back the response.
216	///
217	/// There are two ways to indicate that handling the request failed:
218	///
219	/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
220	///
221	/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
222	/// the given peer.
223	pub pending_response: oneshot::Sender<OutgoingResponse>,
224}
225
226/// Response for an incoming request to be send by a request protocol handler.
227#[derive(Debug)]
228pub struct OutgoingResponse {
229	/// The payload of the response.
230	///
231	/// `Err(())` if none is available e.g. due an error while handling the request.
232	pub result: Result<Vec<u8>, ()>,
233
234	/// Reputation changes accrued while handling the request. To be applied to the reputation of
235	/// the peer sending the request.
236	pub reputation_changes: Vec<ReputationChange>,
237
238	/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
239	/// peer.
240	///
241	/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
242	/// >			outgoing data for each TCP socket, and it is not possible for a user
243	/// >			application to inspect this buffer. This channel here is not actually notified
244	/// >			when the response has been fully sent out, but rather when it has fully been
245	/// >			written to the buffer managed by the operating system.
246	pub sent_feedback: Option<oneshot::Sender<()>>,
247}
248
249/// Information stored about a pending request.
250struct PendingRequest {
251	/// The time when the request was sent to the libp2p request-response protocol.
252	started_at: Instant,
253	/// The channel to send the response back to the caller.
254	///
255	/// This is wrapped in an `Option` to allow for the channel to be taken out
256	/// on force-detected timeouts.
257	response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
258	/// Fallback request to send if the primary request fails.
259	fallback_request: Option<(Vec<u8>, ProtocolName)>,
260}
261
262/// When sending a request, what to do on a disconnected recipient.
263#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
264pub enum IfDisconnected {
265	/// Try to connect to the peer.
266	TryConnect,
267	/// Just fail if the destination is not yet connected.
268	ImmediateError,
269}
270
271/// Convenience functions for `IfDisconnected`.
272impl IfDisconnected {
273	/// Shall we connect to a disconnected peer?
274	pub fn should_connect(self) -> bool {
275		match self {
276			Self::TryConnect => true,
277			Self::ImmediateError => false,
278		}
279	}
280}
281
282/// Event generated by the [`RequestResponsesBehaviour`].
283#[derive(Debug)]
284pub enum Event {
285	/// A remote sent a request and either we have successfully answered it or an error happened.
286	///
287	/// This event is generated for statistics purposes.
288	InboundRequest {
289		/// Peer which has emitted the request.
290		peer: PeerId,
291		/// Name of the protocol in question.
292		protocol: ProtocolName,
293		/// Whether handling the request was successful or unsuccessful.
294		///
295		/// When successful contains the time elapsed between when we received the request and when
296		/// we sent back the response. When unsuccessful contains the failure reason.
297		result: Result<Duration, ResponseFailure>,
298	},
299
300	/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
301	/// failed.
302	///
303	/// This event is generated for statistics purposes.
304	RequestFinished {
305		/// Peer that we send a request to.
306		peer: PeerId,
307		/// Name of the protocol in question.
308		protocol: ProtocolName,
309		/// Duration the request took.
310		duration: Duration,
311		/// Result of the request.
312		result: Result<(), RequestFailure>,
313	},
314
315	/// A request protocol handler issued reputation changes for the given peer.
316	ReputationChanges {
317		/// Peer whose reputation needs to be adjust.
318		peer: PeerId,
319		/// Reputation changes.
320		changes: Vec<ReputationChange>,
321	},
322}
323
324/// Combination of a protocol name and a request id.
325///
326/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
327/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
328/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
329/// [`ProtocolRequestId`]s.
330#[derive(Debug, Clone, PartialEq, Eq, Hash)]
331struct ProtocolRequestId {
332	protocol: ProtocolName,
333	request_id: RequestId,
334}
335
336impl From<(ProtocolName, RequestId)> for ProtocolRequestId {
337	fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
338		Self { protocol, request_id }
339	}
340}
341
342/// Details of a request-response protocol.
343struct ProtocolDetails {
344	behaviour: Behaviour<GenericCodec>,
345	inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
346	request_timeout: Duration,
347}
348
349/// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
350pub struct RequestResponsesBehaviour {
351	/// The multiple sub-protocols, by name.
352	///
353	/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
354	/// "response builder" used to build responses for incoming requests.
355	protocols: HashMap<ProtocolName, ProtocolDetails>,
356
357	/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
358	pending_requests: HashMap<ProtocolRequestId, PendingRequest>,
359
360	/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
361	/// start time and the response to send back to the remote.
362	pending_responses: stream::FuturesUnordered<
363		Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
364	>,
365
366	/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
367	pending_responses_arrival_time: HashMap<ProtocolRequestId, Instant>,
368
369	/// Whenever a response is received on `pending_responses`, insert a channel to be notified
370	/// when the request has been sent out.
371	send_feedback: HashMap<ProtocolRequestId, oneshot::Sender<()>>,
372
373	/// Primarily used to get a reputation of a node.
374	peer_store: Arc<dyn PeerStoreProvider>,
375
376	/// Interval to check that the requests are not taking too long.
377	///
378	/// We had issues in the past where libp2p did not produce a timeout event in due time.
379	///
380	/// For more details, see:
381	/// - <https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096>
382	periodic_request_check: tokio::time::Interval,
383}
384
385/// Generated by the response builder and waiting to be processed.
386struct RequestProcessingOutcome {
387	peer: PeerId,
388	request_id: RequestId,
389	protocol: ProtocolName,
390	inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
391	response: OutgoingResponse,
392}
393
394impl RequestResponsesBehaviour {
395	/// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
396	/// the same protocol is passed twice.
397	pub fn new(
398		list: impl Iterator<Item = ProtocolConfig>,
399		peer_store: Arc<dyn PeerStoreProvider>,
400	) -> Result<Self, RegisterError> {
401		let mut protocols = HashMap::new();
402		for protocol in list {
403			let mut cfg = Config::default();
404			cfg.set_request_timeout(protocol.request_timeout);
405
406			let protocol_support = if protocol.inbound_queue.is_some() {
407				ProtocolSupport::Full
408			} else {
409				ProtocolSupport::Outbound
410			};
411
412			let behaviour = Behaviour::with_codec(
413				GenericCodec {
414					max_request_size: protocol.max_request_size,
415					max_response_size: protocol.max_response_size,
416				},
417				iter::once(protocol.name.clone())
418					.chain(protocol.fallback_names)
419					.zip(iter::repeat(protocol_support)),
420				cfg,
421			);
422
423			match protocols.entry(protocol.name) {
424				Entry::Vacant(e) => e.insert(ProtocolDetails {
425					behaviour,
426					inbound_queue: protocol.inbound_queue,
427					request_timeout: protocol.request_timeout,
428				}),
429				Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())),
430			};
431		}
432
433		Ok(Self {
434			protocols,
435			pending_requests: Default::default(),
436			pending_responses: Default::default(),
437			pending_responses_arrival_time: Default::default(),
438			send_feedback: Default::default(),
439			peer_store,
440			periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
441		})
442	}
443
444	/// Initiates sending a request.
445	///
446	/// If there is no established connection to the target peer, the behavior is determined by the
447	/// choice of `connect`.
448	///
449	/// An error is returned if the protocol doesn't match one that has been registered.
450	pub fn send_request(
451		&mut self,
452		target: &PeerId,
453		protocol_name: ProtocolName,
454		request: Vec<u8>,
455		fallback_request: Option<(Vec<u8>, ProtocolName)>,
456		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
457		connect: IfDisconnected,
458	) {
459		log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len());
460
461		if let Some(ProtocolDetails { behaviour, .. }) =
462			self.protocols.get_mut(protocol_name.deref())
463		{
464			Self::send_request_inner(
465				behaviour,
466				&mut self.pending_requests,
467				target,
468				protocol_name,
469				request,
470				fallback_request,
471				pending_response,
472				connect,
473			)
474		} else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
475			log::debug!(
476				target: "sub-libp2p",
477				"Unknown protocol {:?}. At the same time local \
478				 node is no longer interested in the result.",
479				protocol_name,
480			);
481		}
482	}
483
484	fn send_request_inner(
485		behaviour: &mut Behaviour<GenericCodec>,
486		pending_requests: &mut HashMap<ProtocolRequestId, PendingRequest>,
487		target: &PeerId,
488		protocol_name: ProtocolName,
489		request: Vec<u8>,
490		fallback_request: Option<(Vec<u8>, ProtocolName)>,
491		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
492		connect: IfDisconnected,
493	) {
494		if behaviour.is_connected(target) || connect.should_connect() {
495			let request_id = behaviour.send_request(target, request);
496			let prev_req_id = pending_requests.insert(
497				(protocol_name.to_string().into(), request_id).into(),
498				PendingRequest {
499					started_at: Instant::now(),
500					response_tx: Some(pending_response),
501					fallback_request,
502				},
503			);
504			debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
505		} else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
506			log::debug!(
507				target: "sub-libp2p",
508				"Not connected to peer {:?}. At the same time local \
509				 node is no longer interested in the result.",
510				target,
511			);
512		}
513	}
514}
515
516impl NetworkBehaviour for RequestResponsesBehaviour {
517	type ConnectionHandler =
518		MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
519	type ToSwarm = Event;
520
521	fn handle_pending_inbound_connection(
522		&mut self,
523		_connection_id: ConnectionId,
524		_local_addr: &Multiaddr,
525		_remote_addr: &Multiaddr,
526	) -> Result<(), ConnectionDenied> {
527		Ok(())
528	}
529
530	fn handle_pending_outbound_connection(
531		&mut self,
532		_connection_id: ConnectionId,
533		_maybe_peer: Option<PeerId>,
534		_addresses: &[Multiaddr],
535		_effective_role: Endpoint,
536	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
537		Ok(Vec::new())
538	}
539
540	fn handle_established_inbound_connection(
541		&mut self,
542		connection_id: ConnectionId,
543		peer: PeerId,
544		local_addr: &Multiaddr,
545		remote_addr: &Multiaddr,
546	) -> Result<THandler<Self>, ConnectionDenied> {
547		let iter =
548			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
549				if let Ok(handler) = behaviour.handle_established_inbound_connection(
550					connection_id,
551					peer,
552					local_addr,
553					remote_addr,
554				) {
555					Some((p.to_string(), handler))
556				} else {
557					None
558				}
559			});
560
561		Ok(MultiHandler::try_from_iter(iter).expect(
562			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
563			 which is the only possible error; qed",
564		))
565	}
566
567	fn handle_established_outbound_connection(
568		&mut self,
569		connection_id: ConnectionId,
570		peer: PeerId,
571		addr: &Multiaddr,
572		role_override: Endpoint,
573	) -> Result<THandler<Self>, ConnectionDenied> {
574		let iter =
575			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
576				if let Ok(handler) = behaviour.handle_established_outbound_connection(
577					connection_id,
578					peer,
579					addr,
580					role_override,
581				) {
582					Some((p.to_string(), handler))
583				} else {
584					None
585				}
586			});
587
588		Ok(MultiHandler::try_from_iter(iter).expect(
589			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
590			 which is the only possible error; qed",
591		))
592	}
593
594	fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
595		match event {
596			FromSwarm::ConnectionEstablished(e) =>
597				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
598					NetworkBehaviour::on_swarm_event(
599						behaviour,
600						FromSwarm::ConnectionEstablished(e),
601					);
602				},
603			FromSwarm::ConnectionClosed(ConnectionClosed {
604				peer_id,
605				connection_id,
606				endpoint,
607				handler,
608				remaining_established,
609			}) =>
610				for (p_name, p_handler) in handler.into_iter() {
611					if let Some(ProtocolDetails { behaviour, .. }) =
612						self.protocols.get_mut(p_name.as_str())
613					{
614						behaviour.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
615							peer_id,
616							connection_id,
617							endpoint,
618							handler: p_handler,
619							remaining_established,
620						}));
621					} else {
622						log::error!(
623						  target: "sub-libp2p",
624						  "on_swarm_event/connection_closed: no request-response instance registered for protocol {:?}",
625						  p_name,
626						)
627					}
628				},
629			FromSwarm::DialFailure(e) =>
630				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
631					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::DialFailure(e));
632				},
633			FromSwarm::ListenerClosed(e) =>
634				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
635					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenerClosed(e));
636				},
637			FromSwarm::ListenFailure(e) =>
638				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
639					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenFailure(e));
640				},
641			FromSwarm::ListenerError(e) =>
642				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
643					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenerError(e));
644				},
645			FromSwarm::ExternalAddrExpired(e) =>
646				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
647					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ExternalAddrExpired(e));
648				},
649			FromSwarm::NewListener(e) =>
650				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
651					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::NewListener(e));
652				},
653			FromSwarm::ExpiredListenAddr(e) =>
654				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
655					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ExpiredListenAddr(e));
656				},
657			FromSwarm::NewExternalAddrCandidate(e) =>
658				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
659					NetworkBehaviour::on_swarm_event(
660						behaviour,
661						FromSwarm::NewExternalAddrCandidate(e),
662					);
663				},
664			FromSwarm::ExternalAddrConfirmed(e) =>
665				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
666					NetworkBehaviour::on_swarm_event(
667						behaviour,
668						FromSwarm::ExternalAddrConfirmed(e),
669					);
670				},
671			FromSwarm::AddressChange(e) =>
672				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
673					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::AddressChange(e));
674				},
675			FromSwarm::NewListenAddr(e) =>
676				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
677					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::NewListenAddr(e));
678				},
679		}
680	}
681
682	fn on_connection_handler_event(
683		&mut self,
684		peer_id: PeerId,
685		connection_id: ConnectionId,
686		event: THandlerOutEvent<Self>,
687	) {
688		let p_name = event.0;
689		if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
690			return behaviour.on_connection_handler_event(peer_id, connection_id, event.1)
691		} else {
692			log::warn!(
693				target: "sub-libp2p",
694				"on_connection_handler_event: no request-response instance registered for protocol {:?}",
695				p_name
696			);
697		}
698	}
699
700	fn poll(
701		&mut self,
702		cx: &mut Context,
703		params: &mut impl PollParameters,
704	) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
705		'poll_all: loop {
706			// Poll the periodic request check.
707			if self.periodic_request_check.poll_tick(cx).is_ready() {
708				self.pending_requests.retain(|id, req| {
709					let Some(ProtocolDetails { request_timeout, .. }) =
710						self.protocols.get(&id.protocol)
711					else {
712						log::warn!(
713							target: "sub-libp2p",
714							"Request {id:?} has no protocol registered.",
715						);
716
717						if let Some(response_tx) = req.response_tx.take() {
718							if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
719								log::debug!(
720									target: "sub-libp2p",
721									"Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
722								);
723							}
724						}
725						return false
726					};
727
728					let elapsed = req.started_at.elapsed();
729					if elapsed > *request_timeout {
730						log::debug!(
731							target: "sub-libp2p",
732							"Request {id:?} force detected as timeout.",
733						);
734
735						if let Some(response_tx) = req.response_tx.take() {
736							if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
737								log::debug!(
738									target: "sub-libp2p",
739									"Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
740								);
741							}
742						}
743
744						false
745					} else {
746						true
747					}
748				});
749			}
750
751			// Poll to see if any response is ready to be sent back.
752			while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
753				let RequestProcessingOutcome {
754					peer,
755					request_id,
756					protocol: protocol_name,
757					inner_channel,
758					response: OutgoingResponse { result, reputation_changes, sent_feedback },
759				} = match outcome {
760					Some(outcome) => outcome,
761					// The response builder was too busy or handling the request failed. This is
762					// later on reported as a `InboundFailure::Omission`.
763					None => continue,
764				};
765
766				if let Ok(payload) = result {
767					if let Some(ProtocolDetails { behaviour, .. }) =
768						self.protocols.get_mut(&*protocol_name)
769					{
770						log::trace!(target: "sub-libp2p", "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
771
772						if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
773							// Note: Failure is handled further below when receiving
774							// `InboundFailure` event from request-response [`Behaviour`].
775							log::debug!(
776								target: "sub-libp2p",
777								"Failed to send response for {:?} on protocol {:?} due to a \
778								 timeout or due to the connection to the peer being closed. \
779								 Dropping response",
780								request_id, protocol_name,
781							);
782						} else if let Some(sent_feedback) = sent_feedback {
783							self.send_feedback
784								.insert((protocol_name, request_id).into(), sent_feedback);
785						}
786					}
787				}
788
789				if !reputation_changes.is_empty() {
790					return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
791						peer,
792						changes: reputation_changes,
793					}))
794				}
795			}
796
797			let mut fallback_requests = vec![];
798
799			// Poll request-responses protocols.
800			for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
801			{
802				'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) {
803					let ev = match ev {
804						// Main events we are interested in.
805						ToSwarm::GenerateEvent(ev) => ev,
806
807						// Other events generated by the underlying behaviour are transparently
808						// passed through.
809						ToSwarm::Dial { opts } => {
810							if opts.get_peer_id().is_none() {
811								log::error!(
812									"The request-response isn't supposed to start dialing addresses"
813								);
814							}
815							return Poll::Ready(ToSwarm::Dial { opts })
816						},
817						ToSwarm::NotifyHandler { peer_id, handler, event } =>
818							return Poll::Ready(ToSwarm::NotifyHandler {
819								peer_id,
820								handler,
821								event: ((*protocol).to_string(), event),
822							}),
823						ToSwarm::CloseConnection { peer_id, connection } =>
824							return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
825						ToSwarm::NewExternalAddrCandidate(observed) =>
826							return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
827						ToSwarm::ExternalAddrConfirmed(addr) =>
828							return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
829						ToSwarm::ExternalAddrExpired(addr) =>
830							return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
831						ToSwarm::ListenOn { opts } =>
832							return Poll::Ready(ToSwarm::ListenOn { opts }),
833						ToSwarm::RemoveListener { id } =>
834							return Poll::Ready(ToSwarm::RemoveListener { id }),
835					};
836
837					match ev {
838						// Received a request from a remote.
839						request_response::Event::Message {
840							peer,
841							message: Message::Request { request_id, request, channel, .. },
842						} => {
843							self.pending_responses_arrival_time
844								.insert((protocol.clone(), request_id).into(), Instant::now());
845
846							let reputation = self.peer_store.peer_reputation(&peer.into());
847
848							if reputation < BANNED_THRESHOLD {
849								log::debug!(
850									target: "sub-libp2p",
851									"Cannot handle requests from a node with a low reputation {}: {}",
852									peer,
853									reputation,
854								);
855								continue 'poll_protocol
856							}
857
858							let (tx, rx) = oneshot::channel();
859
860							// Submit the request to the "response builder" passed by the user at
861							// initialization.
862							if let Some(resp_builder) = inbound_queue {
863								// If the response builder is too busy, silently drop `tx`. This
864								// will be reported by the corresponding request-response
865								// [`Behaviour`] through an `InboundFailure::Omission` event.
866								// Note that we use `async_channel::bounded` and not `mpsc::channel`
867								// because the latter allocates an extra slot for every cloned
868								// sender.
869								let _ = resp_builder.try_send(IncomingRequest {
870									peer: peer.into(),
871									payload: request,
872									pending_response: tx,
873								});
874							} else {
875								debug_assert!(false, "Received message on outbound-only protocol.");
876							}
877
878							let protocol = protocol.clone();
879
880							self.pending_responses.push(Box::pin(async move {
881								// The `tx` created above can be dropped if we are not capable of
882								// processing this request, which is reflected as a
883								// `InboundFailure::Omission` event.
884								rx.await.map_or(None, |response| {
885									Some(RequestProcessingOutcome {
886										peer,
887										request_id,
888										protocol,
889										inner_channel: channel,
890										response,
891									})
892								})
893							}));
894
895							// This `continue` makes sure that `pending_responses` gets polled
896							// after we have added the new element.
897							continue 'poll_all
898						},
899
900						// Received a response from a remote to one of our requests.
901						request_response::Event::Message {
902							peer,
903							message: Message::Response { request_id, response },
904							..
905						} => {
906							let (started, delivered) = match self
907								.pending_requests
908								.remove(&(protocol.clone(), request_id).into())
909							{
910								Some(PendingRequest {
911									started_at,
912									response_tx: Some(response_tx),
913									..
914								}) => {
915									log::trace!(
916										target: "sub-libp2p",
917										"received response from {peer} ({protocol:?}), {} bytes",
918										response.as_ref().map_or(0usize, |response| response.len()),
919									);
920
921									let delivered = response_tx
922										.send(
923											response
924												.map_err(|()| RequestFailure::Refused)
925												.map(|resp| (resp, protocol.clone())),
926										)
927										.map_err(|_| RequestFailure::Obsolete);
928									(started_at, delivered)
929								},
930								_ => {
931									log::debug!(
932										target: "sub-libp2p",
933										"Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
934										request_id,
935										peer,
936									);
937									continue
938								},
939							};
940
941							let out = Event::RequestFinished {
942								peer,
943								protocol: protocol.clone(),
944								duration: started.elapsed(),
945								result: delivered,
946							};
947
948							return Poll::Ready(ToSwarm::GenerateEvent(out))
949						},
950
951						// One of our requests has failed.
952						request_response::Event::OutboundFailure {
953							peer,
954							request_id,
955							error,
956							..
957						} => {
958							let started = match self
959								.pending_requests
960								.remove(&(protocol.clone(), request_id).into())
961							{
962								Some(PendingRequest {
963									started_at,
964									response_tx: Some(response_tx),
965									fallback_request,
966								}) => {
967									// Try using the fallback request if the protocol was not
968									// supported.
969									if let request_response::OutboundFailure::UnsupportedProtocols =
970										error
971									{
972										if let Some((fallback_request, fallback_protocol)) =
973											fallback_request
974										{
975											log::trace!(
976												target: "sub-libp2p",
977												"Request with id {:?} failed. Trying the fallback protocol. {}",
978												request_id,
979												fallback_protocol.deref()
980											);
981											fallback_requests.push((
982												peer,
983												fallback_protocol,
984												fallback_request,
985												response_tx,
986											));
987											continue
988										}
989									}
990
991									if response_tx
992										.send(Err(RequestFailure::Network(error.clone().into())))
993										.is_err()
994									{
995										log::debug!(
996											target: "sub-libp2p",
997											"Request with id {:?} failed. At the same time local \
998											 node is no longer interested in the result.",
999											request_id,
1000										);
1001									}
1002									started_at
1003								},
1004								_ => {
1005									log::debug!(
1006										target: "sub-libp2p",
1007										"Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
1008										request_id,
1009										error,
1010										peer
1011									);
1012									continue
1013								},
1014							};
1015
1016							let out = Event::RequestFinished {
1017								peer,
1018								protocol: protocol.clone(),
1019								duration: started.elapsed(),
1020								result: Err(RequestFailure::Network(error.into())),
1021							};
1022
1023							return Poll::Ready(ToSwarm::GenerateEvent(out))
1024						},
1025
1026						// An inbound request failed, either while reading the request or due to
1027						// failing to send a response.
1028						request_response::Event::InboundFailure {
1029							request_id, peer, error, ..
1030						} => {
1031							self.pending_responses_arrival_time
1032								.remove(&(protocol.clone(), request_id).into());
1033							self.send_feedback.remove(&(protocol.clone(), request_id).into());
1034							let out = Event::InboundRequest {
1035								peer,
1036								protocol: protocol.clone(),
1037								result: Err(ResponseFailure::Network(error.into())),
1038							};
1039							return Poll::Ready(ToSwarm::GenerateEvent(out))
1040						},
1041
1042						// A response to an inbound request has been sent.
1043						request_response::Event::ResponseSent { request_id, peer } => {
1044							let arrival_time = self
1045								.pending_responses_arrival_time
1046								.remove(&(protocol.clone(), request_id).into())
1047								.map(|t| t.elapsed())
1048								.expect(
1049									"Time is added for each inbound request on arrival and only \
1050									 removed on success (`ResponseSent`) or failure \
1051									 (`InboundFailure`). One can not receive a success event for a \
1052									 request that either never arrived, or that has previously \
1053									 failed; qed.",
1054								);
1055
1056							if let Some(send_feedback) =
1057								self.send_feedback.remove(&(protocol.clone(), request_id).into())
1058							{
1059								let _ = send_feedback.send(());
1060							}
1061
1062							let out = Event::InboundRequest {
1063								peer,
1064								protocol: protocol.clone(),
1065								result: Ok(arrival_time),
1066							};
1067
1068							return Poll::Ready(ToSwarm::GenerateEvent(out))
1069						},
1070					};
1071				}
1072			}
1073
1074			// Send out fallback requests.
1075			for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
1076				if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
1077					Self::send_request_inner(
1078						behaviour,
1079						&mut self.pending_requests,
1080						&peer,
1081						protocol,
1082						request,
1083						None,
1084						pending_response,
1085						// We can error if not connected because the
1086						// previous attempt would have tried to establish a
1087						// connection already or errored and we wouldn't have gotten here.
1088						IfDisconnected::ImmediateError,
1089					);
1090				}
1091			}
1092
1093			break Poll::Pending
1094		}
1095	}
1096}
1097
1098/// Error when registering a protocol.
1099#[derive(Debug, thiserror::Error)]
1100pub enum RegisterError {
1101	/// A protocol has been specified multiple times.
1102	#[error("{0}")]
1103	DuplicateProtocol(ProtocolName),
1104}
1105
1106/// Error when processing a request sent by a remote.
1107#[derive(Debug, thiserror::Error)]
1108pub enum ResponseFailure {
1109	/// Problem on the network.
1110	#[error("Problem on the network: {0}")]
1111	Network(InboundFailure),
1112}
1113
1114/// Implements the libp2p [`Codec`] trait. Defines how streams of bytes are turned
1115/// into requests and responses and vice-versa.
1116#[derive(Debug, Clone)]
1117#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
1118pub struct GenericCodec {
1119	max_request_size: u64,
1120	max_response_size: u64,
1121}
1122
1123#[async_trait::async_trait]
1124impl Codec for GenericCodec {
1125	type Protocol = ProtocolName;
1126	type Request = Vec<u8>;
1127	type Response = Result<Vec<u8>, ()>;
1128
1129	async fn read_request<T>(
1130		&mut self,
1131		_: &Self::Protocol,
1132		mut io: &mut T,
1133	) -> io::Result<Self::Request>
1134	where
1135		T: AsyncRead + Unpin + Send,
1136	{
1137		// Read the length.
1138		let length = unsigned_varint::aio::read_usize(&mut io)
1139			.await
1140			.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1141		if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1142			return Err(io::Error::new(
1143				io::ErrorKind::InvalidInput,
1144				format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1145			))
1146		}
1147
1148		// Read the payload.
1149		let mut buffer = vec![0; length];
1150		io.read_exact(&mut buffer).await?;
1151		Ok(buffer)
1152	}
1153
1154	async fn read_response<T>(
1155		&mut self,
1156		_: &Self::Protocol,
1157		mut io: &mut T,
1158	) -> io::Result<Self::Response>
1159	where
1160		T: AsyncRead + Unpin + Send,
1161	{
1162		// Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
1163		// considered as a protocol error and will result in the entire connection being closed.
1164		// Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
1165		// that this response is an error.
1166
1167		// Read the length.
1168		let length = match unsigned_varint::aio::read_usize(&mut io).await {
1169			Ok(l) => l,
1170			Err(unsigned_varint::io::ReadError::Io(err))
1171				if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1172				return Ok(Err(())),
1173			Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1174		};
1175
1176		if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1177			return Err(io::Error::new(
1178				io::ErrorKind::InvalidInput,
1179				format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1180			))
1181		}
1182
1183		// Read the payload.
1184		let mut buffer = vec![0; length];
1185		io.read_exact(&mut buffer).await?;
1186		Ok(Ok(buffer))
1187	}
1188
1189	async fn write_request<T>(
1190		&mut self,
1191		_: &Self::Protocol,
1192		io: &mut T,
1193		req: Self::Request,
1194	) -> io::Result<()>
1195	where
1196		T: AsyncWrite + Unpin + Send,
1197	{
1198		// TODO: check the length?
1199		// Write the length.
1200		{
1201			let mut buffer = unsigned_varint::encode::usize_buffer();
1202			io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1203		}
1204
1205		// Write the payload.
1206		io.write_all(&req).await?;
1207
1208		io.close().await?;
1209		Ok(())
1210	}
1211
1212	async fn write_response<T>(
1213		&mut self,
1214		_: &Self::Protocol,
1215		io: &mut T,
1216		res: Self::Response,
1217	) -> io::Result<()>
1218	where
1219		T: AsyncWrite + Unpin + Send,
1220	{
1221		// If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1222		if let Ok(res) = res {
1223			// TODO: check the length?
1224			// Write the length.
1225			{
1226				let mut buffer = unsigned_varint::encode::usize_buffer();
1227				io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1228			}
1229
1230			// Write the payload.
1231			io.write_all(&res).await?;
1232		}
1233
1234		io.close().await?;
1235		Ok(())
1236	}
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241	use super::*;
1242
1243	use crate::mock::MockPeerStore;
1244	use assert_matches::assert_matches;
1245	use futures::channel::oneshot;
1246	use libp2p::{
1247		core::{
1248			transport::{MemoryTransport, Transport},
1249			upgrade,
1250		},
1251		identity::Keypair,
1252		noise,
1253		swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1254		Multiaddr,
1255	};
1256	use std::{iter, time::Duration};
1257
1258	struct TokioExecutor;
1259	impl Executor for TokioExecutor {
1260		fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1261			tokio::spawn(f);
1262		}
1263	}
1264
1265	fn build_swarm(
1266		list: impl Iterator<Item = ProtocolConfig>,
1267	) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1268		let keypair = Keypair::generate_ed25519();
1269
1270		let transport = MemoryTransport::new()
1271			.upgrade(upgrade::Version::V1)
1272			.authenticate(noise::Config::new(&keypair).unwrap())
1273			.multiplex(libp2p::yamux::Config::default())
1274			.boxed();
1275
1276		let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1277
1278		let mut swarm = Swarm::new(
1279			transport,
1280			behaviour,
1281			keypair.public().to_peer_id(),
1282			SwarmConfig::with_executor(TokioExecutor {})
1283				// This is taken care of by notification protocols in non-test environment
1284				// It is very slow in test environment for some reason, hence larger timeout
1285				.with_idle_connection_timeout(Duration::from_secs(10)),
1286		);
1287
1288		let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1289
1290		swarm.listen_on(listen_addr.clone()).unwrap();
1291
1292		(swarm, listen_addr)
1293	}
1294
1295	#[tokio::test]
1296	async fn basic_request_response_works() {
1297		let protocol_name = ProtocolName::from("/test/req-resp/1");
1298
1299		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1300		let mut swarms = (0..2)
1301			.map(|_| {
1302				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1303
1304				tokio::spawn(async move {
1305					while let Some(rq) = rx.next().await {
1306						let (fb_tx, fb_rx) = oneshot::channel();
1307						assert_eq!(rq.payload, b"this is a request");
1308						let _ = rq.pending_response.send(super::OutgoingResponse {
1309							result: Ok(b"this is a response".to_vec()),
1310							reputation_changes: Vec::new(),
1311							sent_feedback: Some(fb_tx),
1312						});
1313						fb_rx.await.unwrap();
1314					}
1315				});
1316
1317				let protocol_config = ProtocolConfig {
1318					name: protocol_name.clone(),
1319					fallback_names: Vec::new(),
1320					max_request_size: 1024,
1321					max_response_size: 1024 * 1024,
1322					request_timeout: Duration::from_secs(30),
1323					inbound_queue: Some(tx),
1324				};
1325
1326				build_swarm(iter::once(protocol_config))
1327			})
1328			.collect::<Vec<_>>();
1329
1330		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1331		// this test, so they wouldn't connect to each other.
1332		{
1333			let dial_addr = swarms[1].1.clone();
1334			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1335		}
1336
1337		let (mut swarm, _) = swarms.remove(0);
1338		// Running `swarm[0]` in the background.
1339		tokio::spawn(async move {
1340			loop {
1341				match swarm.select_next_some().await {
1342					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1343						result.unwrap();
1344					},
1345					_ => {},
1346				}
1347			}
1348		});
1349
1350		// Remove and run the remaining swarm.
1351		let (mut swarm, _) = swarms.remove(0);
1352		let mut response_receiver = None;
1353
1354		loop {
1355			match swarm.select_next_some().await {
1356				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1357					let (sender, receiver) = oneshot::channel();
1358					swarm.behaviour_mut().send_request(
1359						&peer_id,
1360						protocol_name.clone(),
1361						b"this is a request".to_vec(),
1362						None,
1363						sender,
1364						IfDisconnected::ImmediateError,
1365					);
1366					assert!(response_receiver.is_none());
1367					response_receiver = Some(receiver);
1368				},
1369				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1370					result.unwrap();
1371					break
1372				},
1373				_ => {},
1374			}
1375		}
1376
1377		assert_eq!(
1378			response_receiver.unwrap().await.unwrap().unwrap(),
1379			(b"this is a response".to_vec(), protocol_name)
1380		);
1381	}
1382
1383	#[tokio::test]
1384	async fn max_response_size_exceeded() {
1385		let protocol_name = ProtocolName::from("/test/req-resp/1");
1386
1387		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1388		let mut swarms = (0..2)
1389			.map(|_| {
1390				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1391
1392				tokio::spawn(async move {
1393					while let Some(rq) = rx.next().await {
1394						assert_eq!(rq.payload, b"this is a request");
1395						let _ = rq.pending_response.send(super::OutgoingResponse {
1396							result: Ok(b"this response exceeds the limit".to_vec()),
1397							reputation_changes: Vec::new(),
1398							sent_feedback: None,
1399						});
1400					}
1401				});
1402
1403				let protocol_config = ProtocolConfig {
1404					name: protocol_name.clone(),
1405					fallback_names: Vec::new(),
1406					max_request_size: 1024,
1407					max_response_size: 8, // <-- important for the test
1408					request_timeout: Duration::from_secs(30),
1409					inbound_queue: Some(tx),
1410				};
1411
1412				build_swarm(iter::once(protocol_config))
1413			})
1414			.collect::<Vec<_>>();
1415
1416		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1417		// this test, so they wouldn't connect to each other.
1418		{
1419			let dial_addr = swarms[1].1.clone();
1420			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1421		}
1422
1423		// Running `swarm[0]` in the background until a `InboundRequest` event happens,
1424		// which is a hint about the test having ended.
1425		let (mut swarm, _) = swarms.remove(0);
1426		tokio::spawn(async move {
1427			loop {
1428				match swarm.select_next_some().await {
1429					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1430						assert!(result.is_ok());
1431						break;
1432					},
1433					_ => {},
1434				}
1435			}
1436		});
1437
1438		// Remove and run the remaining swarm.
1439		let (mut swarm, _) = swarms.remove(0);
1440
1441		let mut response_receiver = None;
1442
1443		loop {
1444			match swarm.select_next_some().await {
1445				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1446					let (sender, receiver) = oneshot::channel();
1447					swarm.behaviour_mut().send_request(
1448						&peer_id,
1449						protocol_name.clone(),
1450						b"this is a request".to_vec(),
1451						None,
1452						sender,
1453						IfDisconnected::ImmediateError,
1454					);
1455					assert!(response_receiver.is_none());
1456					response_receiver = Some(receiver);
1457				},
1458				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1459					assert!(result.is_err());
1460					break
1461				},
1462				_ => {},
1463			}
1464		}
1465
1466		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1467			RequestFailure::Network(OutboundFailure::ConnectionClosed) => {},
1468			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1469		}
1470	}
1471
1472	/// A [`RequestId`] is a unique identifier among either all inbound or all outbound requests for
1473	/// a single [`RequestResponsesBehaviour`] behaviour. It is not guaranteed to be unique across
1474	/// multiple [`RequestResponsesBehaviour`] behaviours. Thus when handling [`RequestId`] in the
1475	/// context of multiple [`RequestResponsesBehaviour`] behaviours, one needs to couple the
1476	/// protocol name with the [`RequestId`] to get a unique request identifier.
1477	///
1478	/// This test ensures that two requests on different protocols can be handled concurrently
1479	/// without a [`RequestId`] collision.
1480	///
1481	/// See [`ProtocolRequestId`] for additional information.
1482	#[tokio::test]
1483	async fn request_id_collision() {
1484		let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1485		let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1486
1487		let mut swarm_1 = {
1488			let protocol_configs = vec![
1489				ProtocolConfig {
1490					name: protocol_name_1.clone(),
1491					fallback_names: Vec::new(),
1492					max_request_size: 1024,
1493					max_response_size: 1024 * 1024,
1494					request_timeout: Duration::from_secs(30),
1495					inbound_queue: None,
1496				},
1497				ProtocolConfig {
1498					name: protocol_name_2.clone(),
1499					fallback_names: Vec::new(),
1500					max_request_size: 1024,
1501					max_response_size: 1024 * 1024,
1502					request_timeout: Duration::from_secs(30),
1503					inbound_queue: None,
1504				},
1505			];
1506
1507			build_swarm(protocol_configs.into_iter()).0
1508		};
1509
1510		let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1511			let (tx_1, rx_1) = async_channel::bounded(64);
1512			let (tx_2, rx_2) = async_channel::bounded(64);
1513
1514			let protocol_configs = vec![
1515				ProtocolConfig {
1516					name: protocol_name_1.clone(),
1517					fallback_names: Vec::new(),
1518					max_request_size: 1024,
1519					max_response_size: 1024 * 1024,
1520					request_timeout: Duration::from_secs(30),
1521					inbound_queue: Some(tx_1),
1522				},
1523				ProtocolConfig {
1524					name: protocol_name_2.clone(),
1525					fallback_names: Vec::new(),
1526					max_request_size: 1024,
1527					max_response_size: 1024 * 1024,
1528					request_timeout: Duration::from_secs(30),
1529					inbound_queue: Some(tx_2),
1530				},
1531			];
1532
1533			let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1534
1535			(swarm, rx_1, rx_2, listen_addr)
1536		};
1537
1538		// Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test,
1539		// so they wouldn't connect to each other.
1540		swarm_1.dial(listen_add_2).unwrap();
1541
1542		// Run swarm 2 in the background, receiving two requests.
1543		tokio::spawn(async move {
1544			loop {
1545				match swarm_2.select_next_some().await {
1546					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1547						result.unwrap();
1548					},
1549					_ => {},
1550				}
1551			}
1552		});
1553
1554		// Handle both requests sent by swarm 1 to swarm 2 in the background.
1555		//
1556		// Make sure both requests overlap, by answering the first only after receiving the
1557		// second.
1558		tokio::spawn(async move {
1559			let protocol_1_request = swarm_2_handler_1.next().await;
1560			let protocol_2_request = swarm_2_handler_2.next().await;
1561
1562			protocol_1_request
1563				.unwrap()
1564				.pending_response
1565				.send(OutgoingResponse {
1566					result: Ok(b"this is a response".to_vec()),
1567					reputation_changes: Vec::new(),
1568					sent_feedback: None,
1569				})
1570				.unwrap();
1571			protocol_2_request
1572				.unwrap()
1573				.pending_response
1574				.send(OutgoingResponse {
1575					result: Ok(b"this is a response".to_vec()),
1576					reputation_changes: Vec::new(),
1577					sent_feedback: None,
1578				})
1579				.unwrap();
1580		});
1581
1582		// Have swarm 1 send two requests to swarm 2 and await responses.
1583
1584		let mut response_receivers = None;
1585		let mut num_responses = 0;
1586
1587		loop {
1588			match swarm_1.select_next_some().await {
1589				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1590					let (sender_1, receiver_1) = oneshot::channel();
1591					let (sender_2, receiver_2) = oneshot::channel();
1592					swarm_1.behaviour_mut().send_request(
1593						&peer_id,
1594						protocol_name_1.clone(),
1595						b"this is a request".to_vec(),
1596						None,
1597						sender_1,
1598						IfDisconnected::ImmediateError,
1599					);
1600					swarm_1.behaviour_mut().send_request(
1601						&peer_id,
1602						protocol_name_2.clone(),
1603						b"this is a request".to_vec(),
1604						None,
1605						sender_2,
1606						IfDisconnected::ImmediateError,
1607					);
1608					assert!(response_receivers.is_none());
1609					response_receivers = Some((receiver_1, receiver_2));
1610				},
1611				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1612					num_responses += 1;
1613					result.unwrap();
1614					if num_responses == 2 {
1615						break
1616					}
1617				},
1618				_ => {},
1619			}
1620		}
1621		let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1622		assert_eq!(
1623			response_receiver_1.await.unwrap().unwrap(),
1624			(b"this is a response".to_vec(), protocol_name_1)
1625		);
1626		assert_eq!(
1627			response_receiver_2.await.unwrap().unwrap(),
1628			(b"this is a response".to_vec(), protocol_name_2)
1629		);
1630	}
1631
1632	#[tokio::test]
1633	async fn request_fallback() {
1634		let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1635		let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1636		let protocol_name_2 = ProtocolName::from("/test/another");
1637
1638		let protocol_config_1 = ProtocolConfig {
1639			name: protocol_name_1.clone(),
1640			fallback_names: Vec::new(),
1641			max_request_size: 1024,
1642			max_response_size: 1024 * 1024,
1643			request_timeout: Duration::from_secs(30),
1644			inbound_queue: None,
1645		};
1646		let protocol_config_1_fallback = ProtocolConfig {
1647			name: protocol_name_1_fallback.clone(),
1648			fallback_names: Vec::new(),
1649			max_request_size: 1024,
1650			max_response_size: 1024 * 1024,
1651			request_timeout: Duration::from_secs(30),
1652			inbound_queue: None,
1653		};
1654		let protocol_config_2 = ProtocolConfig {
1655			name: protocol_name_2.clone(),
1656			fallback_names: Vec::new(),
1657			max_request_size: 1024,
1658			max_response_size: 1024 * 1024,
1659			request_timeout: Duration::from_secs(30),
1660			inbound_queue: None,
1661		};
1662
1663		// This swarm only speaks protocol_name_1_fallback and protocol_name_2.
1664		// It only responds to requests.
1665		let mut older_swarm = {
1666			let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1667			let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1668			let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1669			protocol_config_1_fallback.inbound_queue = Some(tx_1);
1670
1671			let mut protocol_config_2 = protocol_config_2.clone();
1672			protocol_config_2.inbound_queue = Some(tx_2);
1673
1674			tokio::spawn(async move {
1675				for _ in 0..2 {
1676					if let Some(rq) = rx_1.next().await {
1677						let (fb_tx, fb_rx) = oneshot::channel();
1678						assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1679						let _ = rq.pending_response.send(super::OutgoingResponse {
1680							result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1681							reputation_changes: Vec::new(),
1682							sent_feedback: Some(fb_tx),
1683						});
1684						fb_rx.await.unwrap();
1685					}
1686				}
1687
1688				if let Some(rq) = rx_2.next().await {
1689					let (fb_tx, fb_rx) = oneshot::channel();
1690					assert_eq!(rq.payload, b"request on protocol /test/other");
1691					let _ = rq.pending_response.send(super::OutgoingResponse {
1692						result: Ok(b"this is a response on protocol /test/other".to_vec()),
1693						reputation_changes: Vec::new(),
1694						sent_feedback: Some(fb_tx),
1695					});
1696					fb_rx.await.unwrap();
1697				}
1698			});
1699
1700			build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1701		};
1702
1703		// This swarm speaks all protocols.
1704		let mut new_swarm = build_swarm(
1705			vec![
1706				protocol_config_1.clone(),
1707				protocol_config_1_fallback.clone(),
1708				protocol_config_2.clone(),
1709			]
1710			.into_iter(),
1711		);
1712
1713		{
1714			let dial_addr = older_swarm.1.clone();
1715			Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1716		}
1717
1718		// Running `older_swarm`` in the background.
1719		tokio::spawn(async move {
1720			loop {
1721				_ = older_swarm.0.select_next_some().await;
1722			}
1723		});
1724
1725		// Run the newer swarm. Attempt to make requests on all protocols.
1726		let (mut swarm, _) = new_swarm;
1727		let mut older_peer_id = None;
1728
1729		let mut response_receiver = None;
1730		// Try the new protocol with a fallback.
1731		loop {
1732			match swarm.select_next_some().await {
1733				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1734					older_peer_id = Some(peer_id);
1735					let (sender, receiver) = oneshot::channel();
1736					swarm.behaviour_mut().send_request(
1737						&peer_id,
1738						protocol_name_1.clone(),
1739						b"request on protocol /test/req-resp/2".to_vec(),
1740						Some((
1741							b"request on protocol /test/req-resp/1".to_vec(),
1742							protocol_config_1_fallback.name.clone(),
1743						)),
1744						sender,
1745						IfDisconnected::ImmediateError,
1746					);
1747					response_receiver = Some(receiver);
1748				},
1749				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1750					result.unwrap();
1751					break
1752				},
1753				_ => {},
1754			}
1755		}
1756		assert_eq!(
1757			response_receiver.unwrap().await.unwrap().unwrap(),
1758			(
1759				b"this is a response on protocol /test/req-resp/1".to_vec(),
1760				protocol_name_1_fallback.clone()
1761			)
1762		);
1763		// Try the old protocol with a useless fallback.
1764		let (sender, response_receiver) = oneshot::channel();
1765		swarm.behaviour_mut().send_request(
1766			older_peer_id.as_ref().unwrap(),
1767			protocol_name_1_fallback.clone(),
1768			b"request on protocol /test/req-resp/1".to_vec(),
1769			Some((
1770				b"dummy request, will fail if processed".to_vec(),
1771				protocol_config_1_fallback.name.clone(),
1772			)),
1773			sender,
1774			IfDisconnected::ImmediateError,
1775		);
1776		loop {
1777			match swarm.select_next_some().await {
1778				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1779					result.unwrap();
1780					break
1781				},
1782				_ => {},
1783			}
1784		}
1785		assert_eq!(
1786			response_receiver.await.unwrap().unwrap(),
1787			(
1788				b"this is a response on protocol /test/req-resp/1".to_vec(),
1789				protocol_name_1_fallback.clone()
1790			)
1791		);
1792		// Try the new protocol with no fallback. Should fail.
1793		let (sender, response_receiver) = oneshot::channel();
1794		swarm.behaviour_mut().send_request(
1795			older_peer_id.as_ref().unwrap(),
1796			protocol_name_1.clone(),
1797			b"request on protocol /test/req-resp-2".to_vec(),
1798			None,
1799			sender,
1800			IfDisconnected::ImmediateError,
1801		);
1802		loop {
1803			match swarm.select_next_some().await {
1804				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1805					assert_matches!(
1806						result.unwrap_err(),
1807						RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1808					);
1809					break
1810				},
1811				_ => {},
1812			}
1813		}
1814		assert!(response_receiver.await.unwrap().is_err());
1815		// Try the other protocol with no fallback.
1816		let (sender, response_receiver) = oneshot::channel();
1817		swarm.behaviour_mut().send_request(
1818			older_peer_id.as_ref().unwrap(),
1819			protocol_name_2.clone(),
1820			b"request on protocol /test/other".to_vec(),
1821			None,
1822			sender,
1823			IfDisconnected::ImmediateError,
1824		);
1825		loop {
1826			match swarm.select_next_some().await {
1827				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1828					result.unwrap();
1829					break
1830				},
1831				_ => {},
1832			}
1833		}
1834		assert_eq!(
1835			response_receiver.await.unwrap().unwrap(),
1836			(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1837		);
1838	}
1839
1840	/// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error
1841	/// even if the libp2p component hangs.
1842	///
1843	/// For testing purposes, the communication happens on the `/test/req-resp/1` protocol.
1844	///
1845	/// This is achieved by:
1846	/// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10
1847	///   seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in
1848	///   substrate this is set to 1 second.
1849	///
1850	/// - The first swarm introduces a delay of 2 seconds before responding to the request.
1851	///
1852	/// - The second swarm must enforce the 1 second timeout.
1853	#[tokio::test]
1854	async fn enforce_outbound_timeouts() {
1855		const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1856		const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1857
1858		// These swarms only speaks protocol_name.
1859		let protocol_name = ProtocolName::from("/test/req-resp/1");
1860
1861		let protocol_config = ProtocolConfig {
1862			name: protocol_name.clone(),
1863			fallback_names: Vec::new(),
1864			max_request_size: 1024,
1865			max_response_size: 1024 * 1024,
1866			request_timeout: REQUEST_TIMEOUT, // <-- important for the test
1867			inbound_queue: None,
1868		};
1869
1870		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1871		let (mut first_swarm, _) = {
1872			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1873
1874			tokio::spawn(async move {
1875				if let Some(rq) = rx.next().await {
1876					assert_eq!(rq.payload, b"this is a request");
1877
1878					// Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than
1879					// `REQUEST_TIMEOUT`.
1880					tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1881
1882					// By the time the response is sent back, the second swarm
1883					// received Timeout.
1884					let _ = rq.pending_response.send(super::OutgoingResponse {
1885						result: Ok(b"Second swarm already timedout".to_vec()),
1886						reputation_changes: Vec::new(),
1887						sent_feedback: None,
1888					});
1889				}
1890			});
1891
1892			let mut protocol_config = protocol_config.clone();
1893			protocol_config.inbound_queue = Some(tx);
1894
1895			build_swarm(iter::once(protocol_config))
1896		};
1897
1898		let (mut second_swarm, second_address) = {
1899			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1900
1901			tokio::spawn(async move {
1902				while let Some(rq) = rx.next().await {
1903					let _ = rq.pending_response.send(super::OutgoingResponse {
1904						result: Ok(b"This is the response".to_vec()),
1905						reputation_changes: Vec::new(),
1906						sent_feedback: None,
1907					});
1908				}
1909			});
1910			let mut protocol_config = protocol_config.clone();
1911			protocol_config.inbound_queue = Some(tx);
1912
1913			build_swarm(iter::once(protocol_config.clone()))
1914		};
1915		// Modify the second swarm to have a shorter timeout.
1916		second_swarm
1917			.behaviour_mut()
1918			.protocols
1919			.get_mut(&protocol_name)
1920			.unwrap()
1921			.request_timeout = REQUEST_TIMEOUT_SHORT;
1922
1923		// Ask first swarm to dial the second swarm.
1924		{
1925			Swarm::dial(&mut first_swarm, second_address).unwrap();
1926		}
1927
1928		// Running the first swarm in the background until a `InboundRequest` event happens,
1929		// which is a hint about the test having ended.
1930		tokio::spawn(async move {
1931			loop {
1932				let event = first_swarm.select_next_some().await;
1933				match event {
1934					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1935						assert!(result.is_ok());
1936						break;
1937					},
1938					SwarmEvent::ConnectionClosed { .. } => {
1939						break;
1940					},
1941					_ => {},
1942				}
1943			}
1944		});
1945
1946		// Run the second swarm.
1947		// - on connection established send the request to the first swarm
1948		// - expect to receive a timeout
1949		let mut response_receiver = None;
1950		loop {
1951			let event = second_swarm.select_next_some().await;
1952
1953			match event {
1954				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1955					let (sender, receiver) = oneshot::channel();
1956					second_swarm.behaviour_mut().send_request(
1957						&peer_id,
1958						protocol_name.clone(),
1959						b"this is a request".to_vec(),
1960						None,
1961						sender,
1962						IfDisconnected::ImmediateError,
1963					);
1964					assert!(response_receiver.is_none());
1965					response_receiver = Some(receiver);
1966				},
1967				SwarmEvent::ConnectionClosed { .. } => {
1968					break;
1969				},
1970				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1971					assert!(result.is_err());
1972					break
1973				},
1974				_ => {},
1975			}
1976		}
1977
1978		// Expect the timeout.
1979		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1980			RequestFailure::Network(OutboundFailure::Timeout) => {},
1981			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1982		}
1983	}
1984}