tokio_test/
stream_mock.rs

1#![cfg(not(loom))]
2
3//! A mock stream implementing [`Stream`].
4//!
5//! # Overview
6//! This crate provides a `StreamMock` that can be used to test code that interacts with streams.
7//! It allows you to mock the behavior of a stream and control the items it yields and the waiting
8//! intervals between items.
9//!
10//! # Usage
11//! To use the `StreamMock`, you need to create a builder using [`StreamMockBuilder`]. The builder
12//! allows you to enqueue actions such as returning items or waiting for a certain duration.
13//!
14//! # Example
15//! ```rust
16//!
17//! use futures_util::StreamExt;
18//! use std::time::Duration;
19//! use tokio_test::stream_mock::StreamMockBuilder;
20//!
21//! async fn test_stream_mock_wait() {
22//!     let mut stream_mock = StreamMockBuilder::new()
23//!         .next(1)
24//!         .wait(Duration::from_millis(300))
25//!         .next(2)
26//!         .build();
27//!
28//!     assert_eq!(stream_mock.next().await, Some(1));
29//!     let start = std::time::Instant::now();
30//!     assert_eq!(stream_mock.next().await, Some(2));
31//!     let elapsed = start.elapsed();
32//!     assert!(elapsed >= Duration::from_millis(300));
33//!     assert_eq!(stream_mock.next().await, None);
34//! }
35//! ```
36
37use std::collections::VecDeque;
38use std::pin::Pin;
39use std::task::Poll;
40use std::time::Duration;
41
42use futures_core::{ready, Stream};
43use std::future::Future;
44use tokio::time::{sleep_until, Instant, Sleep};
45
46#[derive(Debug, Clone)]
47enum Action<T: Unpin> {
48    Next(T),
49    Wait(Duration),
50}
51
52/// A builder for [`StreamMock`]
53#[derive(Debug, Clone)]
54pub struct StreamMockBuilder<T: Unpin> {
55    actions: VecDeque<Action<T>>,
56}
57
58impl<T: Unpin> StreamMockBuilder<T> {
59    /// Create a new empty [`StreamMockBuilder`]
60    pub fn new() -> Self {
61        StreamMockBuilder::default()
62    }
63
64    /// Queue an item to be returned by the stream
65    pub fn next(mut self, value: T) -> Self {
66        self.actions.push_back(Action::Next(value));
67        self
68    }
69
70    // Queue an item to be consumed by the sink,
71    // commented out until Sink is implemented.
72    //
73    // pub fn consume(mut self, value: T) -> Self {
74    //    self.actions.push_back(Action::Consume(value));
75    //    self
76    // }
77
78    /// Queue the stream to wait for a duration
79    pub fn wait(mut self, duration: Duration) -> Self {
80        self.actions.push_back(Action::Wait(duration));
81        self
82    }
83
84    /// Build the [`StreamMock`]
85    pub fn build(self) -> StreamMock<T> {
86        StreamMock {
87            actions: self.actions,
88            sleep: None,
89        }
90    }
91}
92
93impl<T: Unpin> Default for StreamMockBuilder<T> {
94    fn default() -> Self {
95        StreamMockBuilder {
96            actions: VecDeque::new(),
97        }
98    }
99}
100
101/// A mock stream implementing [`Stream`]
102///
103/// See [`StreamMockBuilder`] for more information.
104#[derive(Debug)]
105pub struct StreamMock<T: Unpin> {
106    actions: VecDeque<Action<T>>,
107    sleep: Option<Pin<Box<Sleep>>>,
108}
109
110impl<T: Unpin> StreamMock<T> {
111    fn next_action(&mut self) -> Option<Action<T>> {
112        self.actions.pop_front()
113    }
114}
115
116impl<T: Unpin> Stream for StreamMock<T> {
117    type Item = T;
118
119    fn poll_next(
120        mut self: std::pin::Pin<&mut Self>,
121        cx: &mut std::task::Context<'_>,
122    ) -> std::task::Poll<Option<Self::Item>> {
123        // Try polling the sleep future first
124        if let Some(ref mut sleep) = self.sleep {
125            ready!(Pin::new(sleep).poll(cx));
126            // Since we're ready, discard the sleep future
127            self.sleep.take();
128        }
129
130        match self.next_action() {
131            Some(action) => match action {
132                Action::Next(item) => Poll::Ready(Some(item)),
133                Action::Wait(duration) => {
134                    // Set up a sleep future and schedule this future to be polled again for it.
135                    self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration)));
136                    cx.waker().wake_by_ref();
137
138                    Poll::Pending
139                }
140            },
141            None => Poll::Ready(None),
142        }
143    }
144}
145
146impl<T: Unpin> Drop for StreamMock<T> {
147    fn drop(&mut self) {
148        // Avoid double panicking to make debugging easier.
149        if std::thread::panicking() {
150            return;
151        }
152
153        let undropped_count = self
154            .actions
155            .iter()
156            .filter(|action| match action {
157                Action::Next(_) => true,
158                Action::Wait(_) => false,
159            })
160            .count();
161
162        assert!(
163            undropped_count == 0,
164            "StreamMock was dropped before all actions were consumed, {} actions were not consumed",
165            undropped_count
166        );
167    }
168}