tokio_stream/wrappers/
mpsc_unbounded.rs

1use crate::Stream;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use tokio::sync::mpsc::UnboundedReceiver;
5
6/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
7///
8/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
9/// [`Stream`]: trait@crate::Stream
10#[derive(Debug)]
11pub struct UnboundedReceiverStream<T> {
12    inner: UnboundedReceiver<T>,
13}
14
15impl<T> UnboundedReceiverStream<T> {
16    /// Create a new `UnboundedReceiverStream`.
17    pub fn new(recv: UnboundedReceiver<T>) -> Self {
18        Self { inner: recv }
19    }
20
21    /// Get back the inner `UnboundedReceiver`.
22    pub fn into_inner(self) -> UnboundedReceiver<T> {
23        self.inner
24    }
25
26    /// Closes the receiving half of a channel without dropping it.
27    ///
28    /// This prevents any further messages from being sent on the channel while
29    /// still enabling the receiver to drain messages that are buffered.
30    pub fn close(&mut self) {
31        self.inner.close();
32    }
33}
34
35impl<T> Stream for UnboundedReceiverStream<T> {
36    type Item = T;
37
38    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39        self.inner.poll_recv(cx)
40    }
41}
42
43impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
44    fn as_ref(&self) -> &UnboundedReceiver<T> {
45        &self.inner
46    }
47}
48
49impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
50    fn as_mut(&mut self) -> &mut UnboundedReceiver<T> {
51        &mut self.inner
52    }
53}
54
55impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
56    fn from(recv: UnboundedReceiver<T>) -> Self {
57        Self::new(recv)
58    }
59}