Trait stream_cancel::StreamExt
source · pub trait StreamExt: Stream {
// Provided method
fn take_until_if<U>(self, until: U) -> TakeUntilIf<Self, U>
where U: Future<Output = bool>,
Self: Sized { ... }
}
Expand description
This Stream
extension trait provides a take_until_if
method that terminates the stream once
the given future resolves.
Provided Methods§
sourcefn take_until_if<U>(self, until: U) -> TakeUntilIf<Self, U>
fn take_until_if<U>(self, until: U) -> TakeUntilIf<Self, U>
Take elements from this stream until the given future resolves.
This function takes elements from this stream until the given future resolves with
true
. Once it resolves, the stream yields None
, and produces no further elements.
If the future resolves with false
, the stream is allowed to continue indefinitely.
This method is essentially a wrapper around futures_util::stream::StreamExt::take_until
that ascribes particular semantics to the output of the provided future.
use stream_cancel::StreamExt;
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;
#[tokio::main]
async fn main() {
let mut listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let mut incoming = TcpListenerStream::new(listener).take_until_if(rx.map(|_| true));
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
tokio::spawn(async move {
let (mut r, mut w) = s.split();
println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
});
}
});
// tell the listener to stop accepting new connections
tx.send(()).unwrap();
// the spawned async block will terminate cleanly, allowing main to return
}