1use crate::{
61 protocol::notifications::upgrade::{
62 NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutSubstream,
63 UpgradeCollec,
64 },
65 service::metrics::NotificationMetrics,
66 types::ProtocolName,
67};
68
69use bytes::BytesMut;
70use futures::{
71 channel::mpsc,
72 lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
73 prelude::*,
74};
75use libp2p::{
76 swarm::{
77 handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream,
78 SubstreamProtocol,
79 },
80 PeerId,
81};
82use log::error;
83use parking_lot::{Mutex, RwLock};
84use std::{
85 collections::VecDeque,
86 mem,
87 pin::Pin,
88 sync::Arc,
89 task::{Context, Poll},
90 time::{Duration, Instant},
91};
92
93pub(crate) const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
96
97const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
99
100const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
103
104const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
108
109pub struct NotifsHandler {
113 protocols: Vec<Protocol>,
115
116 when_connection_open: Instant,
118
119 peer_id: PeerId,
121
122 events_queue: VecDeque<
124 ConnectionHandlerEvent<NotificationsOut, usize, NotifsHandlerOut, NotifsHandlerError>,
125 >,
126
127 metrics: Option<Arc<NotificationMetrics>>,
129}
130
131impl NotifsHandler {
132 pub fn new(
134 peer_id: PeerId,
135 protocols: Vec<ProtocolConfig>,
136 metrics: Option<NotificationMetrics>,
137 ) -> Self {
138 Self {
139 protocols: protocols
140 .into_iter()
141 .map(|config| {
142 let in_upgrade = NotificationsIn::new(
143 config.name.clone(),
144 config.fallback_names.clone(),
145 config.max_notification_size,
146 );
147
148 Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } }
149 })
150 .collect(),
151 peer_id,
152 when_connection_open: Instant::now(),
153 events_queue: VecDeque::with_capacity(16),
154 metrics: metrics.map_or(None, |metrics| Some(Arc::new(metrics))),
155 }
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct ProtocolConfig {
162 pub name: ProtocolName,
164 pub fallback_names: Vec<ProtocolName>,
166 pub handshake: Arc<RwLock<Vec<u8>>>,
168 pub max_notification_size: u64,
170}
171
172struct Protocol {
174 config: ProtocolConfig,
176
177 in_upgrade: NotificationsIn,
179
180 state: State,
182}
183
184enum State {
186 Closed {
188 pending_opening: bool,
190 },
191
192 OpenDesiredByRemote {
195 in_substream: NotificationsInSubstream<Stream>,
197
198 pending_opening: bool,
200 },
201
202 Opening {
208 in_substream: Option<NotificationsInSubstream<Stream>>,
210 inbound: bool,
212 },
213
214 Open {
216 notifications_sink_rx: stream::Peekable<
222 stream::Select<
223 stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
224 stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
225 >,
226 >,
227
228 out_substream: Option<NotificationsOutSubstream<Stream>>,
234
235 in_substream: Option<NotificationsInSubstream<Stream>>,
241 },
242}
243
244#[derive(Debug, Clone)]
246pub enum NotifsHandlerIn {
247 Open {
255 protocol_index: usize,
257 },
258
259 Close {
264 protocol_index: usize,
266 },
267}
268
269#[derive(Debug)]
271pub enum NotifsHandlerOut {
272 OpenResultOk {
274 protocol_index: usize,
276 negotiated_fallback: Option<ProtocolName>,
278 received_handshake: Vec<u8>,
281 notifications_sink: NotificationsSink,
283 inbound: bool,
285 },
286
287 OpenResultErr {
290 protocol_index: usize,
292 },
293
294 CloseResult {
296 protocol_index: usize,
298 },
299
300 OpenDesiredByRemote {
306 protocol_index: usize,
308 handshake: Vec<u8>,
310 },
311
312 CloseDesired {
317 protocol_index: usize,
319 },
320
321 Notification {
325 protocol_index: usize,
327 message: BytesMut,
329 },
330}
331
332#[derive(Debug, Clone)]
336pub struct NotificationsSink {
337 inner: Arc<NotificationsSinkInner>,
338 metrics: Option<Arc<NotificationMetrics>>,
339}
340
341impl NotificationsSink {
342 pub fn new(
345 peer_id: PeerId,
346 ) -> (Self, mpsc::Receiver<NotificationsSinkMessage>, mpsc::Receiver<NotificationsSinkMessage>)
347 {
348 let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
349 let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
350 (
351 NotificationsSink {
352 inner: Arc::new(NotificationsSinkInner {
353 peer_id,
354 async_channel: FuturesMutex::new(async_tx),
355 sync_channel: Mutex::new(Some(sync_tx)),
356 }),
357 metrics: None,
358 },
359 async_rx,
360 sync_rx,
361 )
362 }
363
364 pub fn metrics(&self) -> &Option<Arc<NotificationMetrics>> {
366 &self.metrics
367 }
368}
369
370#[derive(Debug)]
371struct NotificationsSinkInner {
372 peer_id: PeerId,
374 async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
376 sync_channel: Mutex<Option<mpsc::Sender<NotificationsSinkMessage>>>,
383}
384
385#[derive(Debug, PartialEq, Eq)]
388pub enum NotificationsSinkMessage {
389 Notification { message: Vec<u8> },
392
393 ForceClose,
395}
396
397impl NotificationsSink {
398 pub fn peer_id(&self) -> &PeerId {
400 &self.inner.peer_id
401 }
402
403 pub fn send_sync_notification(&self, message: impl Into<Vec<u8>>) {
413 let mut lock = self.inner.sync_channel.lock();
414
415 if let Some(tx) = lock.as_mut() {
416 let message = message.into();
417 let result = tx.try_send(NotificationsSinkMessage::Notification { message });
418
419 if result.is_err() {
420 let _result2 = tx.clone().try_send(NotificationsSinkMessage::ForceClose);
423 debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
424
425 *lock = None;
427 }
428 }
429 }
430
431 pub async fn reserve_notification(&self) -> Result<Ready<'_>, ()> {
438 let mut lock = self.inner.async_channel.lock().await;
439
440 let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
441 if poll_ready.is_ok() {
442 Ok(Ready { lock })
443 } else {
444 Err(())
445 }
446 }
447}
448
449#[must_use]
451#[derive(Debug)]
452pub struct Ready<'a> {
453 lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
455}
456
457impl<'a> Ready<'a> {
458 pub fn send(mut self, notification: impl Into<Vec<u8>>) -> Result<(), ()> {
462 self.lock
463 .start_send(NotificationsSinkMessage::Notification { message: notification.into() })
464 .map_err(|_| ())
465 }
466}
467
468#[derive(Debug, thiserror::Error)]
470pub enum NotifsHandlerError {
471 #[error("Channel of synchronous notifications is full.")]
472 SyncNotificationsClogged,
473}
474
475impl ConnectionHandler for NotifsHandler {
476 type FromBehaviour = NotifsHandlerIn;
477 type ToBehaviour = NotifsHandlerOut;
478 type Error = NotifsHandlerError;
479 type InboundProtocol = UpgradeCollec<NotificationsIn>;
480 type OutboundProtocol = NotificationsOut;
481 type OutboundOpenInfo = usize;
483 type InboundOpenInfo = ();
484
485 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
486 let protocols = self
487 .protocols
488 .iter()
489 .map(|p| p.in_upgrade.clone())
490 .collect::<UpgradeCollec<_>>();
491
492 SubstreamProtocol::new(protocols, ())
493 }
494
495 fn on_connection_event(
496 &mut self,
497 event: ConnectionEvent<
498 '_,
499 Self::InboundProtocol,
500 Self::OutboundProtocol,
501 Self::InboundOpenInfo,
502 Self::OutboundOpenInfo,
503 >,
504 ) {
505 match event {
506 ConnectionEvent::FullyNegotiatedInbound(inbound) => {
507 let (mut in_substream_open, protocol_index) = inbound.protocol;
508 let protocol_info = &mut self.protocols[protocol_index];
509
510 match protocol_info.state {
511 State::Closed { pending_opening } => {
512 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
513 NotifsHandlerOut::OpenDesiredByRemote {
514 protocol_index,
515 handshake: in_substream_open.handshake,
516 },
517 ));
518
519 protocol_info.state = State::OpenDesiredByRemote {
520 in_substream: in_substream_open.substream,
521 pending_opening,
522 };
523 },
524 State::OpenDesiredByRemote { .. } => {
525 return
533 },
534 State::Opening { ref mut in_substream, .. } |
535 State::Open { ref mut in_substream, .. } => {
536 if in_substream.is_some() {
537 return
539 }
540
541 let handshake_message = protocol_info.config.handshake.read().clone();
544 in_substream_open.substream.send_handshake(handshake_message);
545 *in_substream = Some(in_substream_open.substream);
546 },
547 }
548 },
549 ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
550 let (new_open, protocol_index) = (outbound.protocol, outbound.info);
551
552 match self.protocols[protocol_index].state {
553 State::Closed { ref mut pending_opening } |
554 State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
555 debug_assert!(*pending_opening);
556 *pending_opening = false;
557 },
558 State::Open { .. } => {
559 error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler");
560 debug_assert!(false);
561 },
562 State::Opening { ref mut in_substream, inbound } => {
563 let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
564 let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
565 let notifications_sink = NotificationsSink {
566 inner: Arc::new(NotificationsSinkInner {
567 peer_id: self.peer_id,
568 async_channel: FuturesMutex::new(async_tx),
569 sync_channel: Mutex::new(Some(sync_tx)),
570 }),
571 metrics: self.metrics.clone(),
572 };
573
574 self.protocols[protocol_index].state = State::Open {
575 notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse())
576 .peekable(),
577 out_substream: Some(new_open.substream),
578 in_substream: in_substream.take(),
579 };
580
581 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
582 NotifsHandlerOut::OpenResultOk {
583 protocol_index,
584 negotiated_fallback: new_open.negotiated_fallback,
585 received_handshake: new_open.handshake,
586 notifications_sink,
587 inbound,
588 },
589 ));
590 },
591 }
592 },
593 ConnectionEvent::AddressChange(_address_change) => {},
594 ConnectionEvent::LocalProtocolsChange(_) => {},
595 ConnectionEvent::RemoteProtocolsChange(_) => {},
596 ConnectionEvent::DialUpgradeError(dial_upgrade_error) => match self.protocols
597 [dial_upgrade_error.info]
598 .state
599 {
600 State::Closed { ref mut pending_opening } |
601 State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
602 debug_assert!(*pending_opening);
603 *pending_opening = false;
604 },
605
606 State::Opening { .. } => {
607 self.protocols[dial_upgrade_error.info].state =
608 State::Closed { pending_opening: false };
609
610 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
611 NotifsHandlerOut::OpenResultErr { protocol_index: dial_upgrade_error.info },
612 ));
613 },
614
615 State::Open { .. } => debug_assert!(false),
617 },
618 ConnectionEvent::ListenUpgradeError(_listen_upgrade_error) => {},
619 }
620 }
621
622 fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
623 match message {
624 NotifsHandlerIn::Open { protocol_index } => {
625 let protocol_info = &mut self.protocols[protocol_index];
626 match &mut protocol_info.state {
627 State::Closed { pending_opening } => {
628 if !*pending_opening {
629 let proto = NotificationsOut::new(
630 protocol_info.config.name.clone(),
631 protocol_info.config.fallback_names.clone(),
632 protocol_info.config.handshake.read().clone(),
633 protocol_info.config.max_notification_size,
634 );
635
636 self.events_queue.push_back(
637 ConnectionHandlerEvent::OutboundSubstreamRequest {
638 protocol: SubstreamProtocol::new(proto, protocol_index)
639 .with_timeout(OPEN_TIMEOUT),
640 },
641 );
642 }
643
644 protocol_info.state = State::Opening { in_substream: None, inbound: false };
645 },
646 State::OpenDesiredByRemote { pending_opening, in_substream } => {
647 let handshake_message = protocol_info.config.handshake.read().clone();
648
649 if !*pending_opening {
650 let proto = NotificationsOut::new(
651 protocol_info.config.name.clone(),
652 protocol_info.config.fallback_names.clone(),
653 handshake_message.clone(),
654 protocol_info.config.max_notification_size,
655 );
656
657 self.events_queue.push_back(
658 ConnectionHandlerEvent::OutboundSubstreamRequest {
659 protocol: SubstreamProtocol::new(proto, protocol_index)
660 .with_timeout(OPEN_TIMEOUT),
661 },
662 );
663 }
664
665 in_substream.send_handshake(handshake_message);
666
667 let in_substream = match mem::replace(
669 &mut protocol_info.state,
670 State::Opening { in_substream: None, inbound: false },
671 ) {
672 State::OpenDesiredByRemote { in_substream, .. } => in_substream,
673 _ => unreachable!(),
674 };
675 protocol_info.state =
676 State::Opening { in_substream: Some(in_substream), inbound: true };
677 },
678 State::Opening { .. } | State::Open { .. } => {
679 error!(target: "sub-libp2p", "opening already-opened handler");
682 debug_assert!(false);
683 },
684 }
685 },
686
687 NotifsHandlerIn::Close { protocol_index } => {
688 match self.protocols[protocol_index].state {
689 State::Open { .. } => {
690 self.protocols[protocol_index].state =
691 State::Closed { pending_opening: false };
692 },
693 State::Opening { .. } => {
694 self.protocols[protocol_index].state =
695 State::Closed { pending_opening: true };
696
697 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
698 NotifsHandlerOut::OpenResultErr { protocol_index },
699 ));
700 },
701 State::OpenDesiredByRemote { pending_opening, .. } => {
702 self.protocols[protocol_index].state = State::Closed { pending_opening };
703 },
704 State::Closed { .. } => {},
705 }
706
707 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
708 NotifsHandlerOut::CloseResult { protocol_index },
709 ));
710 },
711 }
712 }
713
714 fn connection_keep_alive(&self) -> KeepAlive {
715 if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
717 return KeepAlive::Yes
718 }
719
720 #[allow(deprecated)]
723 KeepAlive::Until(self.when_connection_open + INITIAL_KEEPALIVE_TIME)
724 }
725
726 #[allow(deprecated)]
727 fn poll(
728 &mut self,
729 cx: &mut Context,
730 ) -> Poll<
731 ConnectionHandlerEvent<
732 Self::OutboundProtocol,
733 Self::OutboundOpenInfo,
734 Self::ToBehaviour,
735 Self::Error,
736 >,
737 > {
738 if let Some(ev) = self.events_queue.pop_front() {
739 return Poll::Ready(ev)
740 }
741
742 for protocol_index in 0..self.protocols.len() {
745 if let State::Open {
746 notifications_sink_rx, out_substream: Some(out_substream), ..
747 } = &mut self.protocols[protocol_index].state
748 {
749 loop {
750 #[allow(deprecated)]
754 match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
755 Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) =>
756 return Poll::Ready(ConnectionHandlerEvent::Close(
757 NotifsHandlerError::SyncNotificationsClogged,
758 )),
759 Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
760 Poll::Ready(None) | Poll::Pending => break,
761 }
762
763 match out_substream.poll_ready_unpin(cx) {
766 Poll::Ready(_) => {},
767 Poll::Pending => break,
768 }
769
770 let message = match notifications_sink_rx.poll_next_unpin(cx) {
772 Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) =>
773 message,
774 Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) |
775 Poll::Ready(None) |
776 Poll::Pending => {
777 debug_assert!(false);
779 break
780 },
781 };
782
783 let _ = out_substream.start_send_unpin(message);
784 }
786 }
787 }
788
789 for protocol_index in 0..self.protocols.len() {
800 match &mut self.protocols[protocol_index].state {
801 State::Open { out_substream: out_substream @ Some(_), .. } => {
802 match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
803 Poll::Pending | Poll::Ready(Ok(())) => {},
804 Poll::Ready(Err(_)) => {
805 *out_substream = None;
806 let event = NotifsHandlerOut::CloseDesired { protocol_index };
807 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event))
808 },
809 };
810 },
811
812 State::Closed { .. } |
813 State::Opening { .. } |
814 State::Open { out_substream: None, .. } |
815 State::OpenDesiredByRemote { .. } => {},
816 }
817 }
818
819 for protocol_index in 0..self.protocols.len() {
821 match &mut self.protocols[protocol_index].state {
824 State::Closed { .. } |
825 State::Open { in_substream: None, .. } |
826 State::Opening { in_substream: None, .. } => {},
827
828 State::Open { in_substream: in_substream @ Some(_), .. } =>
829 match futures::prelude::stream::Stream::poll_next(
830 Pin::new(in_substream.as_mut().unwrap()),
831 cx,
832 ) {
833 Poll::Pending => {},
834 Poll::Ready(Some(Ok(message))) => {
835 let event = NotifsHandlerOut::Notification { protocol_index, message };
836 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event))
837 },
838 Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None,
839 },
840
841 State::OpenDesiredByRemote { in_substream, pending_opening } =>
842 match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
843 Poll::Pending => {},
844 Poll::Ready(Ok(())) => {},
845 Poll::Ready(Err(_)) => {
846 self.protocols[protocol_index].state =
847 State::Closed { pending_opening: *pending_opening };
848 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
849 NotifsHandlerOut::CloseDesired { protocol_index },
850 ))
851 },
852 },
853
854 State::Opening { in_substream: in_substream @ Some(_), .. } =>
855 match NotificationsInSubstream::poll_process(
856 Pin::new(in_substream.as_mut().unwrap()),
857 cx,
858 ) {
859 Poll::Pending => {},
860 Poll::Ready(Ok(())) => {},
861 Poll::Ready(Err(_)) => *in_substream = None,
862 },
863 }
864 }
865
866 Poll::Pending
870 }
871}
872
873#[cfg(test)]
874pub mod tests {
875 use super::*;
876 use crate::protocol::notifications::upgrade::{
877 NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen,
878 };
879 use asynchronous_codec::Framed;
880 use libp2p::{
881 core::muxing::SubstreamBox,
882 swarm::handler::{self, StreamUpgradeError},
883 };
884 use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version};
885 use std::{
886 collections::HashMap,
887 io::{Error, IoSlice, IoSliceMut},
888 };
889 use tokio::sync::mpsc;
890 use unsigned_varint::codec::UviBytes;
891
892 struct OpenSubstream {
893 notifications: stream::Peekable<
894 stream::Select<
895 stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
896 stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
897 >,
898 >,
899 _in_substream: MockSubstream,
900 _out_substream: MockSubstream,
901 }
902
903 pub struct ConnectionYielder {
904 connections: HashMap<(PeerId, usize), OpenSubstream>,
905 }
906
907 impl ConnectionYielder {
908 pub fn new() -> Self {
910 Self { connections: HashMap::new() }
911 }
912
913 pub fn open_substream(
915 &mut self,
916 peer: PeerId,
917 protocol_index: usize,
918 received_handshake: Vec<u8>,
919 ) -> NotifsHandlerOut {
920 let (async_tx, async_rx) =
921 futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
922 let (sync_tx, sync_rx) =
923 futures::channel::mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
924 let notifications_sink = NotificationsSink {
925 inner: Arc::new(NotificationsSinkInner {
926 peer_id: peer,
927 async_channel: FuturesMutex::new(async_tx),
928 sync_channel: Mutex::new(Some(sync_tx)),
929 }),
930 metrics: None,
931 };
932 let (in_substream, out_substream) = MockSubstream::new();
933
934 self.connections.insert(
935 (peer, protocol_index),
936 OpenSubstream {
937 notifications: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
938 _in_substream: in_substream,
939 _out_substream: out_substream,
940 },
941 );
942
943 NotifsHandlerOut::OpenResultOk {
944 protocol_index,
945 negotiated_fallback: None,
946 received_handshake,
947 notifications_sink,
948 inbound: false,
949 }
950 }
951
952 pub async fn get_next_event(&mut self, peer: PeerId, set: usize) -> Option<Vec<u8>> {
954 let substream = if let Some(info) = self.connections.get_mut(&(peer, set)) {
955 info
956 } else {
957 return None
958 };
959
960 futures::future::poll_fn(|cx| match substream.notifications.poll_next_unpin(cx) {
961 Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) =>
962 Poll::Ready(Some(message)),
963 Poll::Pending => Poll::Ready(None),
964 Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) => {
965 panic!("sink closed")
966 },
967 })
968 .await
969 }
970 }
971
972 struct MockSubstream {
973 pub rx: mpsc::Receiver<Vec<u8>>,
974 pub tx: mpsc::Sender<Vec<u8>>,
975 rx_buffer: BytesMut,
976 }
977
978 impl MockSubstream {
979 pub fn new() -> (Self, Self) {
981 let (tx1, rx1) = mpsc::channel(32);
982 let (tx2, rx2) = mpsc::channel(32);
983
984 (
985 Self { rx: rx1, tx: tx2, rx_buffer: BytesMut::with_capacity(512) },
986 Self { rx: rx2, tx: tx1, rx_buffer: BytesMut::with_capacity(512) },
987 )
988 }
989
990 pub async fn negotiated() -> (Stream, Stream) {
992 let (socket1, socket2) = Self::new();
993 let socket1 = SubstreamBox::new(socket1);
994 let socket2 = SubstreamBox::new(socket2);
995
996 let protos = vec!["/echo/1.0.0", "/echo/2.5.0"];
997 let (res1, res2) = tokio::join!(
998 dialer_select_proto(socket1, protos.clone(), Version::V1),
999 listener_select_proto(socket2, protos),
1000 );
1001
1002 (Self::stream_new(res1.unwrap().1), Self::stream_new(res2.unwrap().1))
1003 }
1004
1005 fn stream_new(stream: Negotiated<SubstreamBox>) -> Stream {
1007 const _: () = {
1009 assert!(
1010 core::mem::size_of::<Stream>() ==
1011 core::mem::size_of::<Negotiated<SubstreamBox>>()
1012 );
1013 assert!(
1014 core::mem::align_of::<Stream>() ==
1015 core::mem::align_of::<Negotiated<SubstreamBox>>()
1016 );
1017 };
1018
1019 unsafe { core::mem::transmute(stream) }
1020 }
1021 }
1022
1023 impl AsyncWrite for MockSubstream {
1024 fn poll_write<'a>(
1025 self: Pin<&mut Self>,
1026 _cx: &mut Context<'a>,
1027 buf: &[u8],
1028 ) -> Poll<Result<usize, Error>> {
1029 match self.tx.try_send(buf.to_vec()) {
1030 Ok(_) => Poll::Ready(Ok(buf.len())),
1031 Err(_) => Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1032 }
1033 }
1034
1035 fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1036 Poll::Ready(Ok(()))
1037 }
1038
1039 fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1040 Poll::Ready(Ok(()))
1041 }
1042
1043 fn poll_write_vectored<'a, 'b>(
1044 self: Pin<&mut Self>,
1045 _cx: &mut Context<'a>,
1046 _bufs: &[IoSlice<'b>],
1047 ) -> Poll<Result<usize, Error>> {
1048 unimplemented!();
1049 }
1050 }
1051
1052 impl AsyncRead for MockSubstream {
1053 fn poll_read<'a>(
1054 mut self: Pin<&mut Self>,
1055 cx: &mut Context<'a>,
1056 buf: &mut [u8],
1057 ) -> Poll<Result<usize, Error>> {
1058 match self.rx.poll_recv(cx) {
1059 Poll::Ready(Some(data)) => self.rx_buffer.extend_from_slice(&data),
1060 Poll::Ready(None) =>
1061 return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1062 _ => {},
1063 }
1064
1065 let nsize = std::cmp::min(self.rx_buffer.len(), buf.len());
1066 let data = self.rx_buffer.split_to(nsize);
1067 buf[..nsize].copy_from_slice(&data[..]);
1068
1069 if nsize > 0 {
1070 return Poll::Ready(Ok(nsize))
1071 }
1072
1073 Poll::Pending
1074 }
1075
1076 fn poll_read_vectored<'a, 'b>(
1077 self: Pin<&mut Self>,
1078 _cx: &mut Context<'a>,
1079 _bufs: &mut [IoSliceMut<'b>],
1080 ) -> Poll<Result<usize, Error>> {
1081 unimplemented!();
1082 }
1083 }
1084
1085 fn notifs_handler() -> NotifsHandler {
1087 let proto = Protocol {
1088 config: ProtocolConfig {
1089 name: "/foo".into(),
1090 fallback_names: vec![],
1091 handshake: Arc::new(RwLock::new(b"hello, world".to_vec())),
1092 max_notification_size: u64::MAX,
1093 },
1094 in_upgrade: NotificationsIn::new("/foo", Vec::new(), u64::MAX),
1095 state: State::Closed { pending_opening: false },
1096 };
1097
1098 NotifsHandler {
1099 protocols: vec![proto],
1100 when_connection_open: Instant::now(),
1101 peer_id: PeerId::random(),
1102 events_queue: VecDeque::new(),
1103 metrics: None,
1104 }
1105 }
1106
1107 #[tokio::test]
1110 async fn second_open_desired_by_remote_rejected() {
1111 let mut handler = notifs_handler();
1112 let (io, mut io2) = MockSubstream::negotiated().await;
1113 let mut codec = UviBytes::default();
1114 codec.set_max_len(usize::MAX);
1115
1116 let notif_in = NotificationsInOpen {
1117 handshake: b"hello, world".to_vec(),
1118 substream: NotificationsInSubstream::new(
1119 Framed::new(io, codec),
1120 NotificationsInSubstreamHandshake::NotSent,
1121 ),
1122 };
1123
1124 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1125 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1126 ));
1127
1128 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1130 futures::future::poll_fn(|cx| {
1131 let mut buf = Vec::with_capacity(512);
1132 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1133 Poll::Ready(())
1134 })
1135 .await;
1136
1137 let (io, mut io2) = MockSubstream::negotiated().await;
1139 let mut codec = UviBytes::default();
1140 codec.set_max_len(usize::MAX);
1141
1142 let notif_in = NotificationsInOpen {
1143 handshake: b"hello, world".to_vec(),
1144 substream: NotificationsInSubstream::new(
1145 Framed::new(io, codec),
1146 NotificationsInSubstreamHandshake::NotSent,
1147 ),
1148 };
1149
1150 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1151 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1152 ));
1153
1154 futures::future::poll_fn(|cx| {
1156 let mut buf = Vec::with_capacity(512);
1157
1158 if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1159 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1160 }
1161
1162 Poll::Ready(())
1163 })
1164 .await;
1165 }
1166
1167 #[tokio::test]
1168 async fn open_rejected_if_substream_is_opening() {
1169 let mut handler = notifs_handler();
1170 let (io, mut io2) = MockSubstream::negotiated().await;
1171 let mut codec = UviBytes::default();
1172 codec.set_max_len(usize::MAX);
1173
1174 let notif_in = NotificationsInOpen {
1175 handshake: b"hello, world".to_vec(),
1176 substream: NotificationsInSubstream::new(
1177 Framed::new(io, codec),
1178 NotificationsInSubstreamHandshake::NotSent,
1179 ),
1180 };
1181
1182 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1183 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1184 ));
1185
1186 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1188 futures::future::poll_fn(|cx| {
1189 let mut buf = Vec::with_capacity(512);
1190 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1191 Poll::Ready(())
1192 })
1193 .await;
1194
1195 handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1197 assert!(std::matches!(
1198 handler.protocols[0].state,
1199 State::Opening { in_substream: Some(_), .. }
1200 ));
1201
1202 let (io, mut io2) = MockSubstream::negotiated().await;
1204 let mut codec = UviBytes::default();
1205 codec.set_max_len(usize::MAX);
1206
1207 let notif_in = NotificationsInOpen {
1208 handshake: b"hello, world".to_vec(),
1209 substream: NotificationsInSubstream::new(
1210 Framed::new(io, codec),
1211 NotificationsInSubstreamHandshake::NotSent,
1212 ),
1213 };
1214
1215 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1216 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1217 ));
1218
1219 futures::future::poll_fn(|cx| {
1222 let mut buf = Vec::with_capacity(512);
1223
1224 if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1225 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1226 } else {
1227 panic!("unexpected result");
1228 }
1229
1230 Poll::Ready(())
1231 })
1232 .await;
1233 assert!(std::matches!(
1234 handler.protocols[0].state,
1235 State::Opening { in_substream: Some(_), .. }
1236 ));
1237 }
1238
1239 #[tokio::test]
1240 async fn open_rejected_if_substream_already_open() {
1241 let mut handler = notifs_handler();
1242 let (io, mut io2) = MockSubstream::negotiated().await;
1243 let mut codec = UviBytes::default();
1244 codec.set_max_len(usize::MAX);
1245
1246 let notif_in = NotificationsInOpen {
1247 handshake: b"hello, world".to_vec(),
1248 substream: NotificationsInSubstream::new(
1249 Framed::new(io, codec),
1250 NotificationsInSubstreamHandshake::NotSent,
1251 ),
1252 };
1253 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1254 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1255 ));
1256
1257 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1259 futures::future::poll_fn(|cx| {
1260 let mut buf = Vec::with_capacity(512);
1261 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1262 Poll::Ready(())
1263 })
1264 .await;
1265
1266 handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1268 assert!(std::matches!(
1269 handler.protocols[0].state,
1270 State::Opening { in_substream: Some(_), .. }
1271 ));
1272
1273 let (io, _io2) = MockSubstream::negotiated().await;
1275 let mut codec = UviBytes::default();
1276 codec.set_max_len(usize::MAX);
1277
1278 let notif_out = NotificationsOutOpen {
1279 handshake: b"hello, world".to_vec(),
1280 negotiated_fallback: None,
1281 substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1282 };
1283 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1284 handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1285 ));
1286
1287 assert!(std::matches!(
1288 handler.protocols[0].state,
1289 State::Open { in_substream: Some(_), .. }
1290 ));
1291
1292 let (io, mut io2) = MockSubstream::negotiated().await;
1294 let mut codec = UviBytes::default();
1295 codec.set_max_len(usize::MAX);
1296 let notif_in = NotificationsInOpen {
1297 handshake: b"hello, world".to_vec(),
1298 substream: NotificationsInSubstream::new(
1299 Framed::new(io, codec),
1300 NotificationsInSubstreamHandshake::NotSent,
1301 ),
1302 };
1303 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1304 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1305 ));
1306
1307 futures::future::poll_fn(|cx| {
1310 let mut buf = Vec::with_capacity(512);
1311
1312 if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1313 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
1314 } else {
1315 panic!("unexpected result");
1316 }
1317
1318 Poll::Ready(())
1319 })
1320 .await;
1321 assert!(std::matches!(
1322 handler.protocols[0].state,
1323 State::Open { in_substream: Some(_), .. }
1324 ));
1325 }
1326
1327 #[tokio::test]
1328 async fn fully_negotiated_resets_state_for_closed_substream() {
1329 let mut handler = notifs_handler();
1330 let (io, mut io2) = MockSubstream::negotiated().await;
1331 let mut codec = UviBytes::default();
1332 codec.set_max_len(usize::MAX);
1333
1334 let notif_in = NotificationsInOpen {
1335 handshake: b"hello, world".to_vec(),
1336 substream: NotificationsInSubstream::new(
1337 Framed::new(io, codec),
1338 NotificationsInSubstreamHandshake::NotSent,
1339 ),
1340 };
1341 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1342 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1343 ));
1344
1345 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1347 futures::future::poll_fn(|cx| {
1348 let mut buf = Vec::with_capacity(512);
1349 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1350 Poll::Ready(())
1351 })
1352 .await;
1353
1354 handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1357 assert!(std::matches!(
1358 handler.protocols[0].state,
1359 State::Opening { in_substream: Some(_), .. }
1360 ));
1361
1362 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1363 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1364
1365 let (io, _io2) = MockSubstream::negotiated().await;
1368 let mut codec = UviBytes::default();
1369 codec.set_max_len(usize::MAX);
1370
1371 let notif_out = NotificationsOutOpen {
1372 handshake: b"hello, world".to_vec(),
1373 negotiated_fallback: None,
1374 substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1375 };
1376 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1377 handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1378 ));
1379
1380 assert!(std::matches!(
1381 handler.protocols[0].state,
1382 State::Closed { pending_opening: false }
1383 ));
1384 }
1385
1386 #[tokio::test]
1387 async fn fully_negotiated_resets_state_for_open_desired_substream() {
1388 let mut handler = notifs_handler();
1389 let (io, mut io2) = MockSubstream::negotiated().await;
1390 let mut codec = UviBytes::default();
1391 codec.set_max_len(usize::MAX);
1392
1393 let notif_in = NotificationsInOpen {
1394 handshake: b"hello, world".to_vec(),
1395 substream: NotificationsInSubstream::new(
1396 Framed::new(io, codec),
1397 NotificationsInSubstreamHandshake::NotSent,
1398 ),
1399 };
1400 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1401 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1402 ));
1403
1404 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1406 futures::future::poll_fn(|cx| {
1407 let mut buf = Vec::with_capacity(512);
1408 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1409 Poll::Ready(())
1410 })
1411 .await;
1412
1413 handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1416 assert!(std::matches!(
1417 handler.protocols[0].state,
1418 State::Opening { in_substream: Some(_), .. }
1419 ));
1420
1421 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1422 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1423
1424 let (io, _io2) = MockSubstream::negotiated().await;
1426 let mut codec = UviBytes::default();
1427 codec.set_max_len(usize::MAX);
1428
1429 let notif_in = NotificationsInOpen {
1430 handshake: b"hello, world".to_vec(),
1431 substream: NotificationsInSubstream::new(
1432 Framed::new(io, codec),
1433 NotificationsInSubstreamHandshake::NotSent,
1434 ),
1435 };
1436 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1437 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1438 ));
1439
1440 assert!(std::matches!(
1441 handler.protocols[0].state,
1442 State::OpenDesiredByRemote { pending_opening: true, .. }
1443 ));
1444
1445 let (io, _io2) = MockSubstream::negotiated().await;
1448 let mut codec = UviBytes::default();
1449 codec.set_max_len(usize::MAX);
1450
1451 let notif_out = NotificationsOutOpen {
1452 handshake: b"hello, world".to_vec(),
1453 negotiated_fallback: None,
1454 substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1455 };
1456
1457 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1458 handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1459 ));
1460
1461 assert!(std::matches!(
1462 handler.protocols[0].state,
1463 State::OpenDesiredByRemote { pending_opening: false, .. }
1464 ));
1465 }
1466
1467 #[tokio::test]
1468 async fn dial_upgrade_error_resets_closed_outbound_state() {
1469 let mut handler = notifs_handler();
1470 let (io, mut io2) = MockSubstream::negotiated().await;
1471 let mut codec = UviBytes::default();
1472 codec.set_max_len(usize::MAX);
1473
1474 let notif_in = NotificationsInOpen {
1475 handshake: b"hello, world".to_vec(),
1476 substream: NotificationsInSubstream::new(
1477 Framed::new(io, codec),
1478 NotificationsInSubstreamHandshake::NotSent,
1479 ),
1480 };
1481 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1482 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1483 ));
1484
1485 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1487 futures::future::poll_fn(|cx| {
1488 let mut buf = Vec::with_capacity(512);
1489 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1490 Poll::Ready(())
1491 })
1492 .await;
1493
1494 handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1497 assert!(std::matches!(
1498 handler.protocols[0].state,
1499 State::Opening { in_substream: Some(_), .. }
1500 ));
1501
1502 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1503 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1504
1505 handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1507 handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1508 ));
1509 assert!(std::matches!(
1510 handler.protocols[0].state,
1511 State::Closed { pending_opening: false }
1512 ));
1513 }
1514
1515 #[tokio::test]
1516 async fn dial_upgrade_error_resets_open_desired_state() {
1517 let mut handler = notifs_handler();
1518 let (io, mut io2) = MockSubstream::negotiated().await;
1519 let mut codec = UviBytes::default();
1520 codec.set_max_len(usize::MAX);
1521
1522 let notif_in = NotificationsInOpen {
1523 handshake: b"hello, world".to_vec(),
1524 substream: NotificationsInSubstream::new(
1525 Framed::new(io, codec),
1526 NotificationsInSubstreamHandshake::NotSent,
1527 ),
1528 };
1529 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1530 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1531 ));
1532
1533 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1535 futures::future::poll_fn(|cx| {
1536 let mut buf = Vec::with_capacity(512);
1537 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1538 Poll::Ready(())
1539 })
1540 .await;
1541
1542 handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
1545 assert!(std::matches!(
1546 handler.protocols[0].state,
1547 State::Opening { in_substream: Some(_), .. }
1548 ));
1549
1550 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1551 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1552
1553 let (io, _io2) = MockSubstream::negotiated().await;
1554 let mut codec = UviBytes::default();
1555 codec.set_max_len(usize::MAX);
1556
1557 let notif_in = NotificationsInOpen {
1558 handshake: b"hello, world".to_vec(),
1559 substream: NotificationsInSubstream::new(
1560 Framed::new(io, codec),
1561 NotificationsInSubstreamHandshake::NotSent,
1562 ),
1563 };
1564 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1565 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1566 ));
1567
1568 assert!(std::matches!(
1569 handler.protocols[0].state,
1570 State::OpenDesiredByRemote { pending_opening: true, .. }
1571 ));
1572
1573 handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1575 handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1576 ));
1577 assert!(std::matches!(
1578 handler.protocols[0].state,
1579 State::OpenDesiredByRemote { pending_opening: false, .. }
1580 ));
1581 }
1582
1583 #[tokio::test]
1584 async fn sync_notifications_clogged() {
1585 let mut handler = notifs_handler();
1586 let (io, _) = MockSubstream::negotiated().await;
1587 let codec = UviBytes::default();
1588
1589 let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
1590 let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1);
1591 let notifications_sink = NotificationsSink {
1592 inner: Arc::new(NotificationsSinkInner {
1593 peer_id: PeerId::random(),
1594 async_channel: FuturesMutex::new(async_tx),
1595 sync_channel: Mutex::new(Some(sync_tx)),
1596 }),
1597 metrics: None,
1598 };
1599
1600 handler.protocols[0].state = State::Open {
1601 notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
1602 out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))),
1603 in_substream: None,
1604 };
1605
1606 notifications_sink.send_sync_notification(vec![1, 3, 3, 7]);
1607 notifications_sink.send_sync_notification(vec![1, 3, 3, 8]);
1608 notifications_sink.send_sync_notification(vec![1, 3, 3, 9]);
1609 notifications_sink.send_sync_notification(vec![1, 3, 4, 0]);
1610
1611 #[allow(deprecated)]
1612 futures::future::poll_fn(|cx| {
1613 assert!(std::matches!(
1614 handler.poll(cx),
1615 Poll::Ready(ConnectionHandlerEvent::Close(
1616 NotifsHandlerError::SyncNotificationsClogged,
1617 ))
1618 ));
1619 Poll::Ready(())
1620 })
1621 .await;
1622 }
1623
1624 #[tokio::test]
1625 async fn close_desired_by_remote() {
1626 let mut handler = notifs_handler();
1627 let (io, io2) = MockSubstream::negotiated().await;
1628 let mut codec = UviBytes::default();
1629 codec.set_max_len(usize::MAX);
1630
1631 let notif_in = NotificationsInOpen {
1632 handshake: b"hello, world".to_vec(),
1633 substream: NotificationsInSubstream::new(
1634 Framed::new(io, codec),
1635 NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]),
1636 ),
1637 };
1638
1639 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1642 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1643 ));
1644 drop(io2);
1645
1646 futures::future::poll_fn(|cx| {
1647 assert!(std::matches!(
1648 handler.poll(cx),
1649 Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1650 NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0, .. },
1651 ))
1652 ));
1653 assert!(std::matches!(
1654 handler.poll(cx),
1655 Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1656 NotifsHandlerOut::CloseDesired { protocol_index: 0 },
1657 ))
1658 ));
1659 Poll::Ready(())
1660 })
1661 .await;
1662 }
1663}