1use crate::{
38 peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39 service::traits::RequestResponseConfig as RequestResponseConfigT,
40 types::ProtocolName,
41 ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46 core::{Endpoint, Multiaddr},
47 request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48 swarm::{
49 behaviour::{ConnectionClosed, FromSwarm},
50 handler::multi::MultiHandler,
51 ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, THandler,
52 THandlerInEvent, THandlerOutEvent, ToSwarm,
53 },
54 PeerId,
55};
56
57use std::{
58 collections::{hash_map::Entry, HashMap},
59 io, iter,
60 ops::Deref,
61 pin::Pin,
62 sync::Arc,
63 task::{Context, Poll},
64 time::{Duration, Instant},
65};
66
67pub use libp2p::request_response::{Config, RequestId};
68
69const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
71
72#[derive(Debug, thiserror::Error)]
75pub enum OutboundFailure {
76 #[error("Failed to dial the requested peer")]
78 DialFailure,
79 #[error("Timeout while waiting for a response")]
81 Timeout,
82 #[error("Connection was closed before a response was received")]
84 ConnectionClosed,
85 #[error("The remote supports none of the requested protocols")]
87 UnsupportedProtocols,
88}
89
90impl From<request_response::OutboundFailure> for OutboundFailure {
91 fn from(out: request_response::OutboundFailure) -> Self {
92 match out {
93 request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
94 request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
95 request_response::OutboundFailure::ConnectionClosed =>
96 OutboundFailure::ConnectionClosed,
97 request_response::OutboundFailure::UnsupportedProtocols =>
98 OutboundFailure::UnsupportedProtocols,
99 }
100 }
101}
102
103#[derive(Debug, thiserror::Error)]
106pub enum InboundFailure {
107 #[error("Timeout while receiving request or sending response")]
110 Timeout,
111 #[error("Connection was closed before a response could be sent")]
113 ConnectionClosed,
114 #[error("The local peer supports none of the protocols requested by the remote")]
116 UnsupportedProtocols,
117 #[error("The response channel was dropped without sending a response to the remote")]
119 ResponseOmission,
120}
121
122impl From<request_response::InboundFailure> for InboundFailure {
123 fn from(out: request_response::InboundFailure) -> Self {
124 match out {
125 request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
126 request_response::InboundFailure::Timeout => InboundFailure::Timeout,
127 request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
128 request_response::InboundFailure::UnsupportedProtocols =>
129 InboundFailure::UnsupportedProtocols,
130 }
131 }
132}
133
134#[derive(Debug, thiserror::Error)]
136#[allow(missing_docs)]
137pub enum RequestFailure {
138 #[error("We are not currently connected to the requested peer.")]
139 NotConnected,
140 #[error("Given protocol hasn't been registered.")]
141 UnknownProtocol,
142 #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
143 Refused,
144 #[error("The remote replied, but the local node is no longer interested in the response.")]
145 Obsolete,
146 #[error("Problem on the network: {0}")]
147 Network(OutboundFailure),
148}
149
150#[derive(Debug, Clone)]
152pub struct ProtocolConfig {
153 pub name: ProtocolName,
155
156 pub fallback_names: Vec<ProtocolName>,
158
159 pub max_request_size: u64,
164
165 pub max_response_size: u64,
170
171 pub request_timeout: Duration,
175
176 pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
197}
198
199impl RequestResponseConfigT for ProtocolConfig {
200 fn protocol_name(&self) -> &ProtocolName {
201 &self.name
202 }
203}
204
205#[derive(Debug)]
207pub struct IncomingRequest {
208 pub peer: sc_network_types::PeerId,
210
211 pub payload: Vec<u8>,
214
215 pub pending_response: oneshot::Sender<OutgoingResponse>,
224}
225
226#[derive(Debug)]
228pub struct OutgoingResponse {
229 pub result: Result<Vec<u8>, ()>,
233
234 pub reputation_changes: Vec<ReputationChange>,
237
238 pub sent_feedback: Option<oneshot::Sender<()>>,
247}
248
249struct PendingRequest {
251 started_at: Instant,
253 response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
258 fallback_request: Option<(Vec<u8>, ProtocolName)>,
260}
261
262#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
264pub enum IfDisconnected {
265 TryConnect,
267 ImmediateError,
269}
270
271impl IfDisconnected {
273 pub fn should_connect(self) -> bool {
275 match self {
276 Self::TryConnect => true,
277 Self::ImmediateError => false,
278 }
279 }
280}
281
282#[derive(Debug)]
284pub enum Event {
285 InboundRequest {
289 peer: PeerId,
291 protocol: ProtocolName,
293 result: Result<Duration, ResponseFailure>,
298 },
299
300 RequestFinished {
305 peer: PeerId,
307 protocol: ProtocolName,
309 duration: Duration,
311 result: Result<(), RequestFailure>,
313 },
314
315 ReputationChanges {
317 peer: PeerId,
319 changes: Vec<ReputationChange>,
321 },
322}
323
324#[derive(Debug, Clone, PartialEq, Eq, Hash)]
331struct ProtocolRequestId {
332 protocol: ProtocolName,
333 request_id: RequestId,
334}
335
336impl From<(ProtocolName, RequestId)> for ProtocolRequestId {
337 fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
338 Self { protocol, request_id }
339 }
340}
341
342struct ProtocolDetails {
344 behaviour: Behaviour<GenericCodec>,
345 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
346 request_timeout: Duration,
347}
348
349pub struct RequestResponsesBehaviour {
351 protocols: HashMap<ProtocolName, ProtocolDetails>,
356
357 pending_requests: HashMap<ProtocolRequestId, PendingRequest>,
359
360 pending_responses: stream::FuturesUnordered<
363 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
364 >,
365
366 pending_responses_arrival_time: HashMap<ProtocolRequestId, Instant>,
368
369 send_feedback: HashMap<ProtocolRequestId, oneshot::Sender<()>>,
372
373 peer_store: Arc<dyn PeerStoreProvider>,
375
376 periodic_request_check: tokio::time::Interval,
383}
384
385struct RequestProcessingOutcome {
387 peer: PeerId,
388 request_id: RequestId,
389 protocol: ProtocolName,
390 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
391 response: OutgoingResponse,
392}
393
394impl RequestResponsesBehaviour {
395 pub fn new(
398 list: impl Iterator<Item = ProtocolConfig>,
399 peer_store: Arc<dyn PeerStoreProvider>,
400 ) -> Result<Self, RegisterError> {
401 let mut protocols = HashMap::new();
402 for protocol in list {
403 let mut cfg = Config::default();
404 cfg.set_request_timeout(protocol.request_timeout);
405
406 let protocol_support = if protocol.inbound_queue.is_some() {
407 ProtocolSupport::Full
408 } else {
409 ProtocolSupport::Outbound
410 };
411
412 let behaviour = Behaviour::with_codec(
413 GenericCodec {
414 max_request_size: protocol.max_request_size,
415 max_response_size: protocol.max_response_size,
416 },
417 iter::once(protocol.name.clone())
418 .chain(protocol.fallback_names)
419 .zip(iter::repeat(protocol_support)),
420 cfg,
421 );
422
423 match protocols.entry(protocol.name) {
424 Entry::Vacant(e) => e.insert(ProtocolDetails {
425 behaviour,
426 inbound_queue: protocol.inbound_queue,
427 request_timeout: protocol.request_timeout,
428 }),
429 Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())),
430 };
431 }
432
433 Ok(Self {
434 protocols,
435 pending_requests: Default::default(),
436 pending_responses: Default::default(),
437 pending_responses_arrival_time: Default::default(),
438 send_feedback: Default::default(),
439 peer_store,
440 periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
441 })
442 }
443
444 pub fn send_request(
451 &mut self,
452 target: &PeerId,
453 protocol_name: ProtocolName,
454 request: Vec<u8>,
455 fallback_request: Option<(Vec<u8>, ProtocolName)>,
456 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
457 connect: IfDisconnected,
458 ) {
459 log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len());
460
461 if let Some(ProtocolDetails { behaviour, .. }) =
462 self.protocols.get_mut(protocol_name.deref())
463 {
464 Self::send_request_inner(
465 behaviour,
466 &mut self.pending_requests,
467 target,
468 protocol_name,
469 request,
470 fallback_request,
471 pending_response,
472 connect,
473 )
474 } else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
475 log::debug!(
476 target: "sub-libp2p",
477 "Unknown protocol {:?}. At the same time local \
478 node is no longer interested in the result.",
479 protocol_name,
480 );
481 }
482 }
483
484 fn send_request_inner(
485 behaviour: &mut Behaviour<GenericCodec>,
486 pending_requests: &mut HashMap<ProtocolRequestId, PendingRequest>,
487 target: &PeerId,
488 protocol_name: ProtocolName,
489 request: Vec<u8>,
490 fallback_request: Option<(Vec<u8>, ProtocolName)>,
491 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
492 connect: IfDisconnected,
493 ) {
494 if behaviour.is_connected(target) || connect.should_connect() {
495 let request_id = behaviour.send_request(target, request);
496 let prev_req_id = pending_requests.insert(
497 (protocol_name.to_string().into(), request_id).into(),
498 PendingRequest {
499 started_at: Instant::now(),
500 response_tx: Some(pending_response),
501 fallback_request,
502 },
503 );
504 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
505 } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
506 log::debug!(
507 target: "sub-libp2p",
508 "Not connected to peer {:?}. At the same time local \
509 node is no longer interested in the result.",
510 target,
511 );
512 }
513 }
514}
515
516impl NetworkBehaviour for RequestResponsesBehaviour {
517 type ConnectionHandler =
518 MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
519 type ToSwarm = Event;
520
521 fn handle_pending_inbound_connection(
522 &mut self,
523 _connection_id: ConnectionId,
524 _local_addr: &Multiaddr,
525 _remote_addr: &Multiaddr,
526 ) -> Result<(), ConnectionDenied> {
527 Ok(())
528 }
529
530 fn handle_pending_outbound_connection(
531 &mut self,
532 _connection_id: ConnectionId,
533 _maybe_peer: Option<PeerId>,
534 _addresses: &[Multiaddr],
535 _effective_role: Endpoint,
536 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
537 Ok(Vec::new())
538 }
539
540 fn handle_established_inbound_connection(
541 &mut self,
542 connection_id: ConnectionId,
543 peer: PeerId,
544 local_addr: &Multiaddr,
545 remote_addr: &Multiaddr,
546 ) -> Result<THandler<Self>, ConnectionDenied> {
547 let iter =
548 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
549 if let Ok(handler) = behaviour.handle_established_inbound_connection(
550 connection_id,
551 peer,
552 local_addr,
553 remote_addr,
554 ) {
555 Some((p.to_string(), handler))
556 } else {
557 None
558 }
559 });
560
561 Ok(MultiHandler::try_from_iter(iter).expect(
562 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
563 which is the only possible error; qed",
564 ))
565 }
566
567 fn handle_established_outbound_connection(
568 &mut self,
569 connection_id: ConnectionId,
570 peer: PeerId,
571 addr: &Multiaddr,
572 role_override: Endpoint,
573 ) -> Result<THandler<Self>, ConnectionDenied> {
574 let iter =
575 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
576 if let Ok(handler) = behaviour.handle_established_outbound_connection(
577 connection_id,
578 peer,
579 addr,
580 role_override,
581 ) {
582 Some((p.to_string(), handler))
583 } else {
584 None
585 }
586 });
587
588 Ok(MultiHandler::try_from_iter(iter).expect(
589 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
590 which is the only possible error; qed",
591 ))
592 }
593
594 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
595 match event {
596 FromSwarm::ConnectionEstablished(e) =>
597 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
598 NetworkBehaviour::on_swarm_event(
599 behaviour,
600 FromSwarm::ConnectionEstablished(e),
601 );
602 },
603 FromSwarm::ConnectionClosed(ConnectionClosed {
604 peer_id,
605 connection_id,
606 endpoint,
607 handler,
608 remaining_established,
609 }) =>
610 for (p_name, p_handler) in handler.into_iter() {
611 if let Some(ProtocolDetails { behaviour, .. }) =
612 self.protocols.get_mut(p_name.as_str())
613 {
614 behaviour.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
615 peer_id,
616 connection_id,
617 endpoint,
618 handler: p_handler,
619 remaining_established,
620 }));
621 } else {
622 log::error!(
623 target: "sub-libp2p",
624 "on_swarm_event/connection_closed: no request-response instance registered for protocol {:?}",
625 p_name,
626 )
627 }
628 },
629 FromSwarm::DialFailure(e) =>
630 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
631 NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::DialFailure(e));
632 },
633 FromSwarm::ListenerClosed(e) =>
634 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
635 NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenerClosed(e));
636 },
637 FromSwarm::ListenFailure(e) =>
638 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
639 NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenFailure(e));
640 },
641 FromSwarm::ListenerError(e) =>
642 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
643 NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenerError(e));
644 },
645 FromSwarm::ExternalAddrExpired(e) =>
646 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
647 NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ExternalAddrExpired(e));
648 },
649 FromSwarm::NewListener(e) =>
650 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
651 NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::NewListener(e));
652 },
653 FromSwarm::ExpiredListenAddr(e) =>
654 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
655 NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ExpiredListenAddr(e));
656 },
657 FromSwarm::NewExternalAddrCandidate(e) =>
658 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
659 NetworkBehaviour::on_swarm_event(
660 behaviour,
661 FromSwarm::NewExternalAddrCandidate(e),
662 );
663 },
664 FromSwarm::ExternalAddrConfirmed(e) =>
665 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
666 NetworkBehaviour::on_swarm_event(
667 behaviour,
668 FromSwarm::ExternalAddrConfirmed(e),
669 );
670 },
671 FromSwarm::AddressChange(e) =>
672 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
673 NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::AddressChange(e));
674 },
675 FromSwarm::NewListenAddr(e) =>
676 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
677 NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::NewListenAddr(e));
678 },
679 }
680 }
681
682 fn on_connection_handler_event(
683 &mut self,
684 peer_id: PeerId,
685 connection_id: ConnectionId,
686 event: THandlerOutEvent<Self>,
687 ) {
688 let p_name = event.0;
689 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
690 return behaviour.on_connection_handler_event(peer_id, connection_id, event.1)
691 } else {
692 log::warn!(
693 target: "sub-libp2p",
694 "on_connection_handler_event: no request-response instance registered for protocol {:?}",
695 p_name
696 );
697 }
698 }
699
700 fn poll(
701 &mut self,
702 cx: &mut Context,
703 params: &mut impl PollParameters,
704 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
705 'poll_all: loop {
706 if self.periodic_request_check.poll_tick(cx).is_ready() {
708 self.pending_requests.retain(|id, req| {
709 let Some(ProtocolDetails { request_timeout, .. }) =
710 self.protocols.get(&id.protocol)
711 else {
712 log::warn!(
713 target: "sub-libp2p",
714 "Request {id:?} has no protocol registered.",
715 );
716
717 if let Some(response_tx) = req.response_tx.take() {
718 if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
719 log::debug!(
720 target: "sub-libp2p",
721 "Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
722 );
723 }
724 }
725 return false
726 };
727
728 let elapsed = req.started_at.elapsed();
729 if elapsed > *request_timeout {
730 log::debug!(
731 target: "sub-libp2p",
732 "Request {id:?} force detected as timeout.",
733 );
734
735 if let Some(response_tx) = req.response_tx.take() {
736 if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
737 log::debug!(
738 target: "sub-libp2p",
739 "Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
740 );
741 }
742 }
743
744 false
745 } else {
746 true
747 }
748 });
749 }
750
751 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
753 let RequestProcessingOutcome {
754 peer,
755 request_id,
756 protocol: protocol_name,
757 inner_channel,
758 response: OutgoingResponse { result, reputation_changes, sent_feedback },
759 } = match outcome {
760 Some(outcome) => outcome,
761 None => continue,
764 };
765
766 if let Ok(payload) = result {
767 if let Some(ProtocolDetails { behaviour, .. }) =
768 self.protocols.get_mut(&*protocol_name)
769 {
770 log::trace!(target: "sub-libp2p", "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
771
772 if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
773 log::debug!(
776 target: "sub-libp2p",
777 "Failed to send response for {:?} on protocol {:?} due to a \
778 timeout or due to the connection to the peer being closed. \
779 Dropping response",
780 request_id, protocol_name,
781 );
782 } else if let Some(sent_feedback) = sent_feedback {
783 self.send_feedback
784 .insert((protocol_name, request_id).into(), sent_feedback);
785 }
786 }
787 }
788
789 if !reputation_changes.is_empty() {
790 return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
791 peer,
792 changes: reputation_changes,
793 }))
794 }
795 }
796
797 let mut fallback_requests = vec![];
798
799 for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
801 {
802 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) {
803 let ev = match ev {
804 ToSwarm::GenerateEvent(ev) => ev,
806
807 ToSwarm::Dial { opts } => {
810 if opts.get_peer_id().is_none() {
811 log::error!(
812 "The request-response isn't supposed to start dialing addresses"
813 );
814 }
815 return Poll::Ready(ToSwarm::Dial { opts })
816 },
817 ToSwarm::NotifyHandler { peer_id, handler, event } =>
818 return Poll::Ready(ToSwarm::NotifyHandler {
819 peer_id,
820 handler,
821 event: ((*protocol).to_string(), event),
822 }),
823 ToSwarm::CloseConnection { peer_id, connection } =>
824 return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
825 ToSwarm::NewExternalAddrCandidate(observed) =>
826 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
827 ToSwarm::ExternalAddrConfirmed(addr) =>
828 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
829 ToSwarm::ExternalAddrExpired(addr) =>
830 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
831 ToSwarm::ListenOn { opts } =>
832 return Poll::Ready(ToSwarm::ListenOn { opts }),
833 ToSwarm::RemoveListener { id } =>
834 return Poll::Ready(ToSwarm::RemoveListener { id }),
835 };
836
837 match ev {
838 request_response::Event::Message {
840 peer,
841 message: Message::Request { request_id, request, channel, .. },
842 } => {
843 self.pending_responses_arrival_time
844 .insert((protocol.clone(), request_id).into(), Instant::now());
845
846 let reputation = self.peer_store.peer_reputation(&peer.into());
847
848 if reputation < BANNED_THRESHOLD {
849 log::debug!(
850 target: "sub-libp2p",
851 "Cannot handle requests from a node with a low reputation {}: {}",
852 peer,
853 reputation,
854 );
855 continue 'poll_protocol
856 }
857
858 let (tx, rx) = oneshot::channel();
859
860 if let Some(resp_builder) = inbound_queue {
863 let _ = resp_builder.try_send(IncomingRequest {
870 peer: peer.into(),
871 payload: request,
872 pending_response: tx,
873 });
874 } else {
875 debug_assert!(false, "Received message on outbound-only protocol.");
876 }
877
878 let protocol = protocol.clone();
879
880 self.pending_responses.push(Box::pin(async move {
881 rx.await.map_or(None, |response| {
885 Some(RequestProcessingOutcome {
886 peer,
887 request_id,
888 protocol,
889 inner_channel: channel,
890 response,
891 })
892 })
893 }));
894
895 continue 'poll_all
898 },
899
900 request_response::Event::Message {
902 peer,
903 message: Message::Response { request_id, response },
904 ..
905 } => {
906 let (started, delivered) = match self
907 .pending_requests
908 .remove(&(protocol.clone(), request_id).into())
909 {
910 Some(PendingRequest {
911 started_at,
912 response_tx: Some(response_tx),
913 ..
914 }) => {
915 log::trace!(
916 target: "sub-libp2p",
917 "received response from {peer} ({protocol:?}), {} bytes",
918 response.as_ref().map_or(0usize, |response| response.len()),
919 );
920
921 let delivered = response_tx
922 .send(
923 response
924 .map_err(|()| RequestFailure::Refused)
925 .map(|resp| (resp, protocol.clone())),
926 )
927 .map_err(|_| RequestFailure::Obsolete);
928 (started_at, delivered)
929 },
930 _ => {
931 log::debug!(
932 target: "sub-libp2p",
933 "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
934 request_id,
935 peer,
936 );
937 continue
938 },
939 };
940
941 let out = Event::RequestFinished {
942 peer,
943 protocol: protocol.clone(),
944 duration: started.elapsed(),
945 result: delivered,
946 };
947
948 return Poll::Ready(ToSwarm::GenerateEvent(out))
949 },
950
951 request_response::Event::OutboundFailure {
953 peer,
954 request_id,
955 error,
956 ..
957 } => {
958 let started = match self
959 .pending_requests
960 .remove(&(protocol.clone(), request_id).into())
961 {
962 Some(PendingRequest {
963 started_at,
964 response_tx: Some(response_tx),
965 fallback_request,
966 }) => {
967 if let request_response::OutboundFailure::UnsupportedProtocols =
970 error
971 {
972 if let Some((fallback_request, fallback_protocol)) =
973 fallback_request
974 {
975 log::trace!(
976 target: "sub-libp2p",
977 "Request with id {:?} failed. Trying the fallback protocol. {}",
978 request_id,
979 fallback_protocol.deref()
980 );
981 fallback_requests.push((
982 peer,
983 fallback_protocol,
984 fallback_request,
985 response_tx,
986 ));
987 continue
988 }
989 }
990
991 if response_tx
992 .send(Err(RequestFailure::Network(error.clone().into())))
993 .is_err()
994 {
995 log::debug!(
996 target: "sub-libp2p",
997 "Request with id {:?} failed. At the same time local \
998 node is no longer interested in the result.",
999 request_id,
1000 );
1001 }
1002 started_at
1003 },
1004 _ => {
1005 log::debug!(
1006 target: "sub-libp2p",
1007 "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
1008 request_id,
1009 error,
1010 peer
1011 );
1012 continue
1013 },
1014 };
1015
1016 let out = Event::RequestFinished {
1017 peer,
1018 protocol: protocol.clone(),
1019 duration: started.elapsed(),
1020 result: Err(RequestFailure::Network(error.into())),
1021 };
1022
1023 return Poll::Ready(ToSwarm::GenerateEvent(out))
1024 },
1025
1026 request_response::Event::InboundFailure {
1029 request_id, peer, error, ..
1030 } => {
1031 self.pending_responses_arrival_time
1032 .remove(&(protocol.clone(), request_id).into());
1033 self.send_feedback.remove(&(protocol.clone(), request_id).into());
1034 let out = Event::InboundRequest {
1035 peer,
1036 protocol: protocol.clone(),
1037 result: Err(ResponseFailure::Network(error.into())),
1038 };
1039 return Poll::Ready(ToSwarm::GenerateEvent(out))
1040 },
1041
1042 request_response::Event::ResponseSent { request_id, peer } => {
1044 let arrival_time = self
1045 .pending_responses_arrival_time
1046 .remove(&(protocol.clone(), request_id).into())
1047 .map(|t| t.elapsed())
1048 .expect(
1049 "Time is added for each inbound request on arrival and only \
1050 removed on success (`ResponseSent`) or failure \
1051 (`InboundFailure`). One can not receive a success event for a \
1052 request that either never arrived, or that has previously \
1053 failed; qed.",
1054 );
1055
1056 if let Some(send_feedback) =
1057 self.send_feedback.remove(&(protocol.clone(), request_id).into())
1058 {
1059 let _ = send_feedback.send(());
1060 }
1061
1062 let out = Event::InboundRequest {
1063 peer,
1064 protocol: protocol.clone(),
1065 result: Ok(arrival_time),
1066 };
1067
1068 return Poll::Ready(ToSwarm::GenerateEvent(out))
1069 },
1070 };
1071 }
1072 }
1073
1074 for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
1076 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
1077 Self::send_request_inner(
1078 behaviour,
1079 &mut self.pending_requests,
1080 &peer,
1081 protocol,
1082 request,
1083 None,
1084 pending_response,
1085 IfDisconnected::ImmediateError,
1089 );
1090 }
1091 }
1092
1093 break Poll::Pending
1094 }
1095 }
1096}
1097
1098#[derive(Debug, thiserror::Error)]
1100pub enum RegisterError {
1101 #[error("{0}")]
1103 DuplicateProtocol(ProtocolName),
1104}
1105
1106#[derive(Debug, thiserror::Error)]
1108pub enum ResponseFailure {
1109 #[error("Problem on the network: {0}")]
1111 Network(InboundFailure),
1112}
1113
1114#[derive(Debug, Clone)]
1117#[doc(hidden)] pub struct GenericCodec {
1119 max_request_size: u64,
1120 max_response_size: u64,
1121}
1122
1123#[async_trait::async_trait]
1124impl Codec for GenericCodec {
1125 type Protocol = ProtocolName;
1126 type Request = Vec<u8>;
1127 type Response = Result<Vec<u8>, ()>;
1128
1129 async fn read_request<T>(
1130 &mut self,
1131 _: &Self::Protocol,
1132 mut io: &mut T,
1133 ) -> io::Result<Self::Request>
1134 where
1135 T: AsyncRead + Unpin + Send,
1136 {
1137 let length = unsigned_varint::aio::read_usize(&mut io)
1139 .await
1140 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1141 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1142 return Err(io::Error::new(
1143 io::ErrorKind::InvalidInput,
1144 format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1145 ))
1146 }
1147
1148 let mut buffer = vec![0; length];
1150 io.read_exact(&mut buffer).await?;
1151 Ok(buffer)
1152 }
1153
1154 async fn read_response<T>(
1155 &mut self,
1156 _: &Self::Protocol,
1157 mut io: &mut T,
1158 ) -> io::Result<Self::Response>
1159 where
1160 T: AsyncRead + Unpin + Send,
1161 {
1162 let length = match unsigned_varint::aio::read_usize(&mut io).await {
1169 Ok(l) => l,
1170 Err(unsigned_varint::io::ReadError::Io(err))
1171 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1172 return Ok(Err(())),
1173 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1174 };
1175
1176 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1177 return Err(io::Error::new(
1178 io::ErrorKind::InvalidInput,
1179 format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1180 ))
1181 }
1182
1183 let mut buffer = vec![0; length];
1185 io.read_exact(&mut buffer).await?;
1186 Ok(Ok(buffer))
1187 }
1188
1189 async fn write_request<T>(
1190 &mut self,
1191 _: &Self::Protocol,
1192 io: &mut T,
1193 req: Self::Request,
1194 ) -> io::Result<()>
1195 where
1196 T: AsyncWrite + Unpin + Send,
1197 {
1198 {
1201 let mut buffer = unsigned_varint::encode::usize_buffer();
1202 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1203 }
1204
1205 io.write_all(&req).await?;
1207
1208 io.close().await?;
1209 Ok(())
1210 }
1211
1212 async fn write_response<T>(
1213 &mut self,
1214 _: &Self::Protocol,
1215 io: &mut T,
1216 res: Self::Response,
1217 ) -> io::Result<()>
1218 where
1219 T: AsyncWrite + Unpin + Send,
1220 {
1221 if let Ok(res) = res {
1223 {
1226 let mut buffer = unsigned_varint::encode::usize_buffer();
1227 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1228 }
1229
1230 io.write_all(&res).await?;
1232 }
1233
1234 io.close().await?;
1235 Ok(())
1236 }
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241 use super::*;
1242
1243 use crate::mock::MockPeerStore;
1244 use assert_matches::assert_matches;
1245 use futures::channel::oneshot;
1246 use libp2p::{
1247 core::{
1248 transport::{MemoryTransport, Transport},
1249 upgrade,
1250 },
1251 identity::Keypair,
1252 noise,
1253 swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1254 Multiaddr,
1255 };
1256 use std::{iter, time::Duration};
1257
1258 struct TokioExecutor;
1259 impl Executor for TokioExecutor {
1260 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1261 tokio::spawn(f);
1262 }
1263 }
1264
1265 fn build_swarm(
1266 list: impl Iterator<Item = ProtocolConfig>,
1267 ) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1268 let keypair = Keypair::generate_ed25519();
1269
1270 let transport = MemoryTransport::new()
1271 .upgrade(upgrade::Version::V1)
1272 .authenticate(noise::Config::new(&keypair).unwrap())
1273 .multiplex(libp2p::yamux::Config::default())
1274 .boxed();
1275
1276 let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1277
1278 let mut swarm = Swarm::new(
1279 transport,
1280 behaviour,
1281 keypair.public().to_peer_id(),
1282 SwarmConfig::with_executor(TokioExecutor {})
1283 .with_idle_connection_timeout(Duration::from_secs(10)),
1286 );
1287
1288 let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1289
1290 swarm.listen_on(listen_addr.clone()).unwrap();
1291
1292 (swarm, listen_addr)
1293 }
1294
1295 #[tokio::test]
1296 async fn basic_request_response_works() {
1297 let protocol_name = ProtocolName::from("/test/req-resp/1");
1298
1299 let mut swarms = (0..2)
1301 .map(|_| {
1302 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1303
1304 tokio::spawn(async move {
1305 while let Some(rq) = rx.next().await {
1306 let (fb_tx, fb_rx) = oneshot::channel();
1307 assert_eq!(rq.payload, b"this is a request");
1308 let _ = rq.pending_response.send(super::OutgoingResponse {
1309 result: Ok(b"this is a response".to_vec()),
1310 reputation_changes: Vec::new(),
1311 sent_feedback: Some(fb_tx),
1312 });
1313 fb_rx.await.unwrap();
1314 }
1315 });
1316
1317 let protocol_config = ProtocolConfig {
1318 name: protocol_name.clone(),
1319 fallback_names: Vec::new(),
1320 max_request_size: 1024,
1321 max_response_size: 1024 * 1024,
1322 request_timeout: Duration::from_secs(30),
1323 inbound_queue: Some(tx),
1324 };
1325
1326 build_swarm(iter::once(protocol_config))
1327 })
1328 .collect::<Vec<_>>();
1329
1330 {
1333 let dial_addr = swarms[1].1.clone();
1334 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1335 }
1336
1337 let (mut swarm, _) = swarms.remove(0);
1338 tokio::spawn(async move {
1340 loop {
1341 match swarm.select_next_some().await {
1342 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1343 result.unwrap();
1344 },
1345 _ => {},
1346 }
1347 }
1348 });
1349
1350 let (mut swarm, _) = swarms.remove(0);
1352 let mut response_receiver = None;
1353
1354 loop {
1355 match swarm.select_next_some().await {
1356 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1357 let (sender, receiver) = oneshot::channel();
1358 swarm.behaviour_mut().send_request(
1359 &peer_id,
1360 protocol_name.clone(),
1361 b"this is a request".to_vec(),
1362 None,
1363 sender,
1364 IfDisconnected::ImmediateError,
1365 );
1366 assert!(response_receiver.is_none());
1367 response_receiver = Some(receiver);
1368 },
1369 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1370 result.unwrap();
1371 break
1372 },
1373 _ => {},
1374 }
1375 }
1376
1377 assert_eq!(
1378 response_receiver.unwrap().await.unwrap().unwrap(),
1379 (b"this is a response".to_vec(), protocol_name)
1380 );
1381 }
1382
1383 #[tokio::test]
1384 async fn max_response_size_exceeded() {
1385 let protocol_name = ProtocolName::from("/test/req-resp/1");
1386
1387 let mut swarms = (0..2)
1389 .map(|_| {
1390 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1391
1392 tokio::spawn(async move {
1393 while let Some(rq) = rx.next().await {
1394 assert_eq!(rq.payload, b"this is a request");
1395 let _ = rq.pending_response.send(super::OutgoingResponse {
1396 result: Ok(b"this response exceeds the limit".to_vec()),
1397 reputation_changes: Vec::new(),
1398 sent_feedback: None,
1399 });
1400 }
1401 });
1402
1403 let protocol_config = ProtocolConfig {
1404 name: protocol_name.clone(),
1405 fallback_names: Vec::new(),
1406 max_request_size: 1024,
1407 max_response_size: 8, request_timeout: Duration::from_secs(30),
1409 inbound_queue: Some(tx),
1410 };
1411
1412 build_swarm(iter::once(protocol_config))
1413 })
1414 .collect::<Vec<_>>();
1415
1416 {
1419 let dial_addr = swarms[1].1.clone();
1420 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1421 }
1422
1423 let (mut swarm, _) = swarms.remove(0);
1426 tokio::spawn(async move {
1427 loop {
1428 match swarm.select_next_some().await {
1429 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1430 assert!(result.is_ok());
1431 break;
1432 },
1433 _ => {},
1434 }
1435 }
1436 });
1437
1438 let (mut swarm, _) = swarms.remove(0);
1440
1441 let mut response_receiver = None;
1442
1443 loop {
1444 match swarm.select_next_some().await {
1445 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1446 let (sender, receiver) = oneshot::channel();
1447 swarm.behaviour_mut().send_request(
1448 &peer_id,
1449 protocol_name.clone(),
1450 b"this is a request".to_vec(),
1451 None,
1452 sender,
1453 IfDisconnected::ImmediateError,
1454 );
1455 assert!(response_receiver.is_none());
1456 response_receiver = Some(receiver);
1457 },
1458 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1459 assert!(result.is_err());
1460 break
1461 },
1462 _ => {},
1463 }
1464 }
1465
1466 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1467 RequestFailure::Network(OutboundFailure::ConnectionClosed) => {},
1468 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1469 }
1470 }
1471
1472 #[tokio::test]
1483 async fn request_id_collision() {
1484 let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1485 let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1486
1487 let mut swarm_1 = {
1488 let protocol_configs = vec![
1489 ProtocolConfig {
1490 name: protocol_name_1.clone(),
1491 fallback_names: Vec::new(),
1492 max_request_size: 1024,
1493 max_response_size: 1024 * 1024,
1494 request_timeout: Duration::from_secs(30),
1495 inbound_queue: None,
1496 },
1497 ProtocolConfig {
1498 name: protocol_name_2.clone(),
1499 fallback_names: Vec::new(),
1500 max_request_size: 1024,
1501 max_response_size: 1024 * 1024,
1502 request_timeout: Duration::from_secs(30),
1503 inbound_queue: None,
1504 },
1505 ];
1506
1507 build_swarm(protocol_configs.into_iter()).0
1508 };
1509
1510 let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1511 let (tx_1, rx_1) = async_channel::bounded(64);
1512 let (tx_2, rx_2) = async_channel::bounded(64);
1513
1514 let protocol_configs = vec![
1515 ProtocolConfig {
1516 name: protocol_name_1.clone(),
1517 fallback_names: Vec::new(),
1518 max_request_size: 1024,
1519 max_response_size: 1024 * 1024,
1520 request_timeout: Duration::from_secs(30),
1521 inbound_queue: Some(tx_1),
1522 },
1523 ProtocolConfig {
1524 name: protocol_name_2.clone(),
1525 fallback_names: Vec::new(),
1526 max_request_size: 1024,
1527 max_response_size: 1024 * 1024,
1528 request_timeout: Duration::from_secs(30),
1529 inbound_queue: Some(tx_2),
1530 },
1531 ];
1532
1533 let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1534
1535 (swarm, rx_1, rx_2, listen_addr)
1536 };
1537
1538 swarm_1.dial(listen_add_2).unwrap();
1541
1542 tokio::spawn(async move {
1544 loop {
1545 match swarm_2.select_next_some().await {
1546 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1547 result.unwrap();
1548 },
1549 _ => {},
1550 }
1551 }
1552 });
1553
1554 tokio::spawn(async move {
1559 let protocol_1_request = swarm_2_handler_1.next().await;
1560 let protocol_2_request = swarm_2_handler_2.next().await;
1561
1562 protocol_1_request
1563 .unwrap()
1564 .pending_response
1565 .send(OutgoingResponse {
1566 result: Ok(b"this is a response".to_vec()),
1567 reputation_changes: Vec::new(),
1568 sent_feedback: None,
1569 })
1570 .unwrap();
1571 protocol_2_request
1572 .unwrap()
1573 .pending_response
1574 .send(OutgoingResponse {
1575 result: Ok(b"this is a response".to_vec()),
1576 reputation_changes: Vec::new(),
1577 sent_feedback: None,
1578 })
1579 .unwrap();
1580 });
1581
1582 let mut response_receivers = None;
1585 let mut num_responses = 0;
1586
1587 loop {
1588 match swarm_1.select_next_some().await {
1589 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1590 let (sender_1, receiver_1) = oneshot::channel();
1591 let (sender_2, receiver_2) = oneshot::channel();
1592 swarm_1.behaviour_mut().send_request(
1593 &peer_id,
1594 protocol_name_1.clone(),
1595 b"this is a request".to_vec(),
1596 None,
1597 sender_1,
1598 IfDisconnected::ImmediateError,
1599 );
1600 swarm_1.behaviour_mut().send_request(
1601 &peer_id,
1602 protocol_name_2.clone(),
1603 b"this is a request".to_vec(),
1604 None,
1605 sender_2,
1606 IfDisconnected::ImmediateError,
1607 );
1608 assert!(response_receivers.is_none());
1609 response_receivers = Some((receiver_1, receiver_2));
1610 },
1611 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1612 num_responses += 1;
1613 result.unwrap();
1614 if num_responses == 2 {
1615 break
1616 }
1617 },
1618 _ => {},
1619 }
1620 }
1621 let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1622 assert_eq!(
1623 response_receiver_1.await.unwrap().unwrap(),
1624 (b"this is a response".to_vec(), protocol_name_1)
1625 );
1626 assert_eq!(
1627 response_receiver_2.await.unwrap().unwrap(),
1628 (b"this is a response".to_vec(), protocol_name_2)
1629 );
1630 }
1631
1632 #[tokio::test]
1633 async fn request_fallback() {
1634 let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1635 let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1636 let protocol_name_2 = ProtocolName::from("/test/another");
1637
1638 let protocol_config_1 = ProtocolConfig {
1639 name: protocol_name_1.clone(),
1640 fallback_names: Vec::new(),
1641 max_request_size: 1024,
1642 max_response_size: 1024 * 1024,
1643 request_timeout: Duration::from_secs(30),
1644 inbound_queue: None,
1645 };
1646 let protocol_config_1_fallback = ProtocolConfig {
1647 name: protocol_name_1_fallback.clone(),
1648 fallback_names: Vec::new(),
1649 max_request_size: 1024,
1650 max_response_size: 1024 * 1024,
1651 request_timeout: Duration::from_secs(30),
1652 inbound_queue: None,
1653 };
1654 let protocol_config_2 = ProtocolConfig {
1655 name: protocol_name_2.clone(),
1656 fallback_names: Vec::new(),
1657 max_request_size: 1024,
1658 max_response_size: 1024 * 1024,
1659 request_timeout: Duration::from_secs(30),
1660 inbound_queue: None,
1661 };
1662
1663 let mut older_swarm = {
1666 let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1667 let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1668 let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1669 protocol_config_1_fallback.inbound_queue = Some(tx_1);
1670
1671 let mut protocol_config_2 = protocol_config_2.clone();
1672 protocol_config_2.inbound_queue = Some(tx_2);
1673
1674 tokio::spawn(async move {
1675 for _ in 0..2 {
1676 if let Some(rq) = rx_1.next().await {
1677 let (fb_tx, fb_rx) = oneshot::channel();
1678 assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1679 let _ = rq.pending_response.send(super::OutgoingResponse {
1680 result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1681 reputation_changes: Vec::new(),
1682 sent_feedback: Some(fb_tx),
1683 });
1684 fb_rx.await.unwrap();
1685 }
1686 }
1687
1688 if let Some(rq) = rx_2.next().await {
1689 let (fb_tx, fb_rx) = oneshot::channel();
1690 assert_eq!(rq.payload, b"request on protocol /test/other");
1691 let _ = rq.pending_response.send(super::OutgoingResponse {
1692 result: Ok(b"this is a response on protocol /test/other".to_vec()),
1693 reputation_changes: Vec::new(),
1694 sent_feedback: Some(fb_tx),
1695 });
1696 fb_rx.await.unwrap();
1697 }
1698 });
1699
1700 build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1701 };
1702
1703 let mut new_swarm = build_swarm(
1705 vec![
1706 protocol_config_1.clone(),
1707 protocol_config_1_fallback.clone(),
1708 protocol_config_2.clone(),
1709 ]
1710 .into_iter(),
1711 );
1712
1713 {
1714 let dial_addr = older_swarm.1.clone();
1715 Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1716 }
1717
1718 tokio::spawn(async move {
1720 loop {
1721 _ = older_swarm.0.select_next_some().await;
1722 }
1723 });
1724
1725 let (mut swarm, _) = new_swarm;
1727 let mut older_peer_id = None;
1728
1729 let mut response_receiver = None;
1730 loop {
1732 match swarm.select_next_some().await {
1733 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1734 older_peer_id = Some(peer_id);
1735 let (sender, receiver) = oneshot::channel();
1736 swarm.behaviour_mut().send_request(
1737 &peer_id,
1738 protocol_name_1.clone(),
1739 b"request on protocol /test/req-resp/2".to_vec(),
1740 Some((
1741 b"request on protocol /test/req-resp/1".to_vec(),
1742 protocol_config_1_fallback.name.clone(),
1743 )),
1744 sender,
1745 IfDisconnected::ImmediateError,
1746 );
1747 response_receiver = Some(receiver);
1748 },
1749 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1750 result.unwrap();
1751 break
1752 },
1753 _ => {},
1754 }
1755 }
1756 assert_eq!(
1757 response_receiver.unwrap().await.unwrap().unwrap(),
1758 (
1759 b"this is a response on protocol /test/req-resp/1".to_vec(),
1760 protocol_name_1_fallback.clone()
1761 )
1762 );
1763 let (sender, response_receiver) = oneshot::channel();
1765 swarm.behaviour_mut().send_request(
1766 older_peer_id.as_ref().unwrap(),
1767 protocol_name_1_fallback.clone(),
1768 b"request on protocol /test/req-resp/1".to_vec(),
1769 Some((
1770 b"dummy request, will fail if processed".to_vec(),
1771 protocol_config_1_fallback.name.clone(),
1772 )),
1773 sender,
1774 IfDisconnected::ImmediateError,
1775 );
1776 loop {
1777 match swarm.select_next_some().await {
1778 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1779 result.unwrap();
1780 break
1781 },
1782 _ => {},
1783 }
1784 }
1785 assert_eq!(
1786 response_receiver.await.unwrap().unwrap(),
1787 (
1788 b"this is a response on protocol /test/req-resp/1".to_vec(),
1789 protocol_name_1_fallback.clone()
1790 )
1791 );
1792 let (sender, response_receiver) = oneshot::channel();
1794 swarm.behaviour_mut().send_request(
1795 older_peer_id.as_ref().unwrap(),
1796 protocol_name_1.clone(),
1797 b"request on protocol /test/req-resp-2".to_vec(),
1798 None,
1799 sender,
1800 IfDisconnected::ImmediateError,
1801 );
1802 loop {
1803 match swarm.select_next_some().await {
1804 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1805 assert_matches!(
1806 result.unwrap_err(),
1807 RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1808 );
1809 break
1810 },
1811 _ => {},
1812 }
1813 }
1814 assert!(response_receiver.await.unwrap().is_err());
1815 let (sender, response_receiver) = oneshot::channel();
1817 swarm.behaviour_mut().send_request(
1818 older_peer_id.as_ref().unwrap(),
1819 protocol_name_2.clone(),
1820 b"request on protocol /test/other".to_vec(),
1821 None,
1822 sender,
1823 IfDisconnected::ImmediateError,
1824 );
1825 loop {
1826 match swarm.select_next_some().await {
1827 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1828 result.unwrap();
1829 break
1830 },
1831 _ => {},
1832 }
1833 }
1834 assert_eq!(
1835 response_receiver.await.unwrap().unwrap(),
1836 (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1837 );
1838 }
1839
1840 #[tokio::test]
1854 async fn enforce_outbound_timeouts() {
1855 const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1856 const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1857
1858 let protocol_name = ProtocolName::from("/test/req-resp/1");
1860
1861 let protocol_config = ProtocolConfig {
1862 name: protocol_name.clone(),
1863 fallback_names: Vec::new(),
1864 max_request_size: 1024,
1865 max_response_size: 1024 * 1024,
1866 request_timeout: REQUEST_TIMEOUT, inbound_queue: None,
1868 };
1869
1870 let (mut first_swarm, _) = {
1872 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1873
1874 tokio::spawn(async move {
1875 if let Some(rq) = rx.next().await {
1876 assert_eq!(rq.payload, b"this is a request");
1877
1878 tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1881
1882 let _ = rq.pending_response.send(super::OutgoingResponse {
1885 result: Ok(b"Second swarm already timedout".to_vec()),
1886 reputation_changes: Vec::new(),
1887 sent_feedback: None,
1888 });
1889 }
1890 });
1891
1892 let mut protocol_config = protocol_config.clone();
1893 protocol_config.inbound_queue = Some(tx);
1894
1895 build_swarm(iter::once(protocol_config))
1896 };
1897
1898 let (mut second_swarm, second_address) = {
1899 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1900
1901 tokio::spawn(async move {
1902 while let Some(rq) = rx.next().await {
1903 let _ = rq.pending_response.send(super::OutgoingResponse {
1904 result: Ok(b"This is the response".to_vec()),
1905 reputation_changes: Vec::new(),
1906 sent_feedback: None,
1907 });
1908 }
1909 });
1910 let mut protocol_config = protocol_config.clone();
1911 protocol_config.inbound_queue = Some(tx);
1912
1913 build_swarm(iter::once(protocol_config.clone()))
1914 };
1915 second_swarm
1917 .behaviour_mut()
1918 .protocols
1919 .get_mut(&protocol_name)
1920 .unwrap()
1921 .request_timeout = REQUEST_TIMEOUT_SHORT;
1922
1923 {
1925 Swarm::dial(&mut first_swarm, second_address).unwrap();
1926 }
1927
1928 tokio::spawn(async move {
1931 loop {
1932 let event = first_swarm.select_next_some().await;
1933 match event {
1934 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1935 assert!(result.is_ok());
1936 break;
1937 },
1938 SwarmEvent::ConnectionClosed { .. } => {
1939 break;
1940 },
1941 _ => {},
1942 }
1943 }
1944 });
1945
1946 let mut response_receiver = None;
1950 loop {
1951 let event = second_swarm.select_next_some().await;
1952
1953 match event {
1954 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1955 let (sender, receiver) = oneshot::channel();
1956 second_swarm.behaviour_mut().send_request(
1957 &peer_id,
1958 protocol_name.clone(),
1959 b"this is a request".to_vec(),
1960 None,
1961 sender,
1962 IfDisconnected::ImmediateError,
1963 );
1964 assert!(response_receiver.is_none());
1965 response_receiver = Some(receiver);
1966 },
1967 SwarmEvent::ConnectionClosed { .. } => {
1968 break;
1969 },
1970 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1971 assert!(result.is_err());
1972 break
1973 },
1974 _ => {},
1975 }
1976 }
1977
1978 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1980 RequestFailure::Network(OutboundFailure::Timeout) => {},
1981 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1982 }
1983 }
1984}