[−][src]Crate stream_cancel
This crate provides multiple mechanisms for interrupting a Stream
.
Stream combinator
The extension trait StreamExt
provides a single new Stream
combinator: take_until
.
StreamExt::take_until
continues yielding elements from the underlying Stream
until a
Future
resolves, and at that moment immediately yields None
and stops producing further
elements.
For convenience, the crate also includes the Tripwire
type, which produces a cloneable
Future
that can then be passed to take_until
. When a new Tripwire
is created, an
associated Trigger
is also returned, which interrupts the Stream
when it is dropped.
use stream_cancel::{StreamExt, Tripwire}; use futures::prelude::*; use tokio::prelude::*; #[tokio::main] async fn main() { let mut listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap(); let (trigger, tripwire) = Tripwire::new(); tokio::spawn(async move { let mut incoming = listener.incoming().take_until(tripwire); while let Some(mut s) = incoming.next().await.transpose().unwrap() { tokio::spawn(async move { let (mut r, mut w) = s.split(); println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap()); }); } }); // tell the listener to stop accepting new connections drop(trigger); // the spawned async block will terminate cleanly, allowing main to return }
Stream wrapper
Any stream can be wrapped in a Valved
, which enables it to be remotely terminated through
an associated Trigger
. This can be useful to implement graceful shutdown on "infinite"
streams like a TcpListener
. Once [Trigger::close
] is called on the handle for a given
stream's Valved
, the stream will yield None
to indicate that it has terminated.
use stream_cancel::Valved; use futures::prelude::*; use tokio::prelude::*; use std::thread; #[tokio::main] async fn main() { let (exit_tx, exit_rx) = tokio::sync::oneshot::channel(); let mut listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap(); tokio::spawn(async move { let (exit, mut incoming) = Valved::new(listener.incoming()); exit_tx.send(exit).unwrap(); while let Some(mut s) = incoming.next().await.transpose().unwrap() { tokio::spawn(async move { let (mut r, mut w) = s.split(); println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap()); }); } }); let exit = exit_rx.await; // the server thread will normally never exit, since more connections // can always arrive. however, with a Valved, we can turn off the // stream of incoming connections to initiate a graceful shutdown drop(exit); }
You can share the same Trigger
between multiple streams by first creating a Valve
,
and then wrapping multiple streams using [Valve::Wrap
]:
use stream_cancel::Valve; use futures::prelude::*; use tokio::prelude::*; #[tokio::main] async fn main() { let (exit, valve) = Valve::new(); let mut listener1 = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap(); let mut listener2 = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap(); tokio::spawn(async move { let incoming1 = valve.wrap(listener1.incoming()); let incoming2 = valve.wrap(listener2.incoming()); use futures_util::stream::select; let mut incoming = select(incoming1, incoming2); while let Some(mut s) = incoming.next().await.transpose().unwrap() { tokio::spawn(async move { let (mut r, mut w) = s.split(); println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap()); }); } }); // the runtime will not become idle until both incoming1 and incoming2 have stopped // (due to the select). this checks that they are indeed both interrupted when the // valve is closed. drop(exit); }
Structs
TakeUntil | A stream combinator which takes elements from a stream until a future resolves. |
Trigger | A handle to a set of cancellable streams. |
Tripwire | A |
Valve | A |
Valved | A |
Traits
StreamExt | This |