Crate stream_cancel
source ·Expand description
This crate provides multiple mechanisms for interrupting a Stream
.
Stream combinator
The extension trait StreamExt
provides a single new Stream
combinator: take_until_if
.
StreamExt::take_until_if
continues yielding elements from the underlying Stream
until a
Future
resolves, and at that moment immediately yields None
and stops producing further
elements.
For convenience, the crate also includes the Tripwire
type, which produces a cloneable
Future
that can then be passed to take_until_if
. When a new Tripwire
is created, an
associated Trigger
is also returned, which interrupts the Stream
when it is dropped.
use stream_cancel::{StreamExt, Tripwire};
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;
#[tokio::main]
async fn main() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let (trigger, tripwire) = Tripwire::new();
tokio::spawn(async move {
let mut incoming = TcpListenerStream::new(listener).take_until_if(tripwire);
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
tokio::spawn(async move {
let (mut r, mut w) = s.split();
println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
});
}
});
// tell the listener to stop accepting new connections
drop(trigger);
// the spawned async block will terminate cleanly, allowing main to return
}
Stream wrapper
Any stream can be wrapped in a Valved
, which enables it to be remotely terminated through
an associated Trigger
. This can be useful to implement graceful shutdown on “infinite”
streams like a TcpListener
. Once Trigger::cancel
is called on the handle for a given
stream’s Valved
, the stream will yield None
to indicate that it has terminated.
use stream_cancel::Valved;
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;
use std::thread;
#[tokio::main]
async fn main() {
let (exit_tx, exit_rx) = tokio::sync::oneshot::channel();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
tokio::spawn(async move {
let (exit, mut incoming) = Valved::new(TcpListenerStream::new(listener));
exit_tx.send(exit).unwrap();
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
tokio::spawn(async move {
let (mut r, mut w) = s.split();
println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
});
}
});
let exit = exit_rx.await;
// the server thread will normally never exit, since more connections
// can always arrive. however, with a Valved, we can turn off the
// stream of incoming connections to initiate a graceful shutdown
drop(exit);
}
You can share the same Trigger
between multiple streams by first creating a Valve
,
and then wrapping multiple streams using [Valve::Wrap
]:
use stream_cancel::Valve;
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;
#[tokio::main]
async fn main() {
let (exit, valve) = Valve::new();
let listener1 = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listener2 = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
tokio::spawn(async move {
let incoming1 = valve.wrap(TcpListenerStream::new(listener1));
let incoming2 = valve.wrap(TcpListenerStream::new(listener2));
use futures_util::stream::select;
let mut incoming = select(incoming1, incoming2);
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
tokio::spawn(async move {
let (mut r, mut w) = s.split();
println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
});
}
});
// the runtime will not become idle until both incoming1 and incoming2 have stopped
// (due to the select). this checks that they are indeed both interrupted when the
// valve is closed.
drop(exit);
}
Structs
- A stream combinator which takes elements from a stream until a future resolves.
- A handle to a set of cancellable streams.
- A
Tripwire
is a convenient mechanism for implementing graceful shutdown over many asynchronous streams. ATripwire
is aFuture
that isClone
, and that can be passed toStreamExt::take_until_if
. AllTripwire
clones are associated with a singleTrigger
, which is then used to signal that all the associated streams should be terminated. - A
Valve
is associated with aTrigger
, and can be used to wrap one or more asynchronous streams. All streams wrapped by a givenValve
(or its clones) will be interrupted when [Trigger::close
] is called on the valve’s associated handle. - A
Valved
is wrapper around aStream
that enables the stream to be turned off remotely to initiate a graceful shutdown. When a newValved
is created withValved::new
, a handle to thatValved
is also produced; when [Trigger::close
] is called on that handle, the wrapped stream will immediately yieldNone
to indicate that it has completed.
Traits
- This
Stream
extension trait provides atake_until_if
method that terminates the stream once the given future resolves.