pub trait StreamExt: Stream {
    // Provided method
    fn take_until_if<U>(self, until: U) -> TakeUntilIf<Self, U>
       where U: Future<Output = bool>,
             Self: Sized { ... }
}
Expand description

This Stream extension trait provides a take_until_if method that terminates the stream once the given future resolves.

Provided Methods§

source

fn take_until_if<U>(self, until: U) -> TakeUntilIf<Self, U>
where U: Future<Output = bool>, Self: Sized,

Take elements from this stream until the given future resolves.

This function takes elements from this stream until the given future resolves with true. Once it resolves, the stream yields None, and produces no further elements.

If the future resolves with false, the stream is allowed to continue indefinitely.

This method is essentially a wrapper around futures_util::stream::StreamExt::take_until that ascribes particular semantics to the output of the provided future.

use stream_cancel::StreamExt;
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;

#[tokio::main]
async fn main() {
    let mut listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
    let (tx, rx) = tokio::sync::oneshot::channel();

    tokio::spawn(async move {
        let mut incoming = TcpListenerStream::new(listener).take_until_if(rx.map(|_| true));
        while let Some(mut s) = incoming.next().await.transpose().unwrap() {
            tokio::spawn(async move {
                let (mut r, mut w) = s.split();
                println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
            });
        }
    });

    // tell the listener to stop accepting new connections
    tx.send(()).unwrap();
    // the spawned async block will terminate cleanly, allowing main to return
}

Implementors§

source§

impl<S> StreamExt for S
where S: Stream,