pub struct LocalSet { /* private fields */ }
rt
only.Expand description
A set of tasks which are executed on the same thread.
In some cases, it is necessary to run one or more futures that do not
implement Send
and thus are unsafe to send between threads. In these
cases, a local task set may be used to schedule one or more !Send
futures to run together on the same thread.
For example, the following code will not compile:
use std::rc::Rc;
#[tokio::main]
async fn main() {
// `Rc` does not implement `Send`, and thus may not be sent between
// threads safely.
let unsend_data = Rc::new("my unsend data...");
let unsend_data = unsend_data.clone();
// Because the `async` block here moves `unsend_data`, the future is `!Send`.
// Since `tokio::spawn` requires the spawned future to implement `Send`, this
// will not compile.
tokio::spawn(async move {
println!("{}", unsend_data);
// ...
}).await.unwrap();
}
Use with run_until
To spawn !Send
futures, we can use a local task set to schedule them
on the thread calling Runtime::block_on
. When running inside of the
local task set, we can use task::spawn_local
, which can spawn
!Send
futures. For example:
use std::rc::Rc;
use tokio::task;
#[tokio::main]
async fn main() {
let unsend_data = Rc::new("my unsend data...");
// Construct a local task set that can run `!Send` futures.
let local = task::LocalSet::new();
// Run the local task set.
local.run_until(async move {
let unsend_data = unsend_data.clone();
// `spawn_local` ensures that the future is spawned on the local
// task set.
task::spawn_local(async move {
println!("{}", unsend_data);
// ...
}).await.unwrap();
}).await;
}
Note: The run_until
method can only be used in #[tokio::main]
,
#[tokio::test]
or directly inside a call to Runtime::block_on
. It
cannot be used inside a task spawned with tokio::spawn
.
Awaiting a LocalSet
Additionally, a LocalSet
itself implements Future
, completing when
all tasks spawned on the LocalSet
complete. This can be used to run
several futures on a LocalSet
and drive the whole set until they
complete. For example,
use tokio::{task, time};
use std::rc::Rc;
#[tokio::main]
async fn main() {
let unsend_data = Rc::new("world");
let local = task::LocalSet::new();
let unsend_data2 = unsend_data.clone();
local.spawn_local(async move {
// ...
println!("hello {}", unsend_data2)
});
local.spawn_local(async move {
time::sleep(time::Duration::from_millis(100)).await;
println!("goodbye {}", unsend_data)
});
// ...
local.await;
}
Note: Awaiting a LocalSet
can only be done inside
#[tokio::main]
, #[tokio::test]
or directly inside a call to
Runtime::block_on
. It cannot be used inside a task spawned with
tokio::spawn
.
Use inside tokio::spawn
The two methods mentioned above cannot be used inside tokio::spawn
, so
to spawn !Send
futures from inside tokio::spawn
, we need to do
something else. The solution is to create the LocalSet
somewhere else,
and communicate with it using an mpsc
channel.
The following example puts the LocalSet
inside a new thread.
use tokio::runtime::Builder;
use tokio::sync::{mpsc, oneshot};
use tokio::task::LocalSet;
// This struct describes the task you want to spawn. Here we include
// some simple examples. The oneshot channel allows sending a response
// to the spawner.
#[derive(Debug)]
enum Task {
PrintNumber(u32),
AddOne(u32, oneshot::Sender<u32>),
}
#[derive(Clone)]
struct LocalSpawner {
send: mpsc::UnboundedSender<Task>,
}
impl LocalSpawner {
pub fn new() -> Self {
let (send, mut recv) = mpsc::unbounded_channel();
let rt = Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
std::thread::spawn(move || {
let local = LocalSet::new();
local.spawn_local(async move {
while let Some(new_task) = recv.recv().await {
tokio::task::spawn_local(run_task(new_task));
}
// If the while loop returns, then all the LocalSpawner
// objects have been dropped.
});
// This will return once all senders are dropped and all
// spawned tasks have returned.
rt.block_on(local);
});
Self {
send,
}
}
pub fn spawn(&self, task: Task) {
self.send.send(task).expect("Thread with LocalSet has shut down.");
}
}
// This task may do !Send stuff. We use printing a number as an example,
// but it could be anything.
//
// The Task struct is an enum to support spawning many different kinds
// of operations.
async fn run_task(task: Task) {
match task {
Task::PrintNumber(n) => {
println!("{}", n);
},
Task::AddOne(n, response) => {
// We ignore failures to send the response.
let _ = response.send(n + 1);
},
}
}
#[tokio::main]
async fn main() {
let spawner = LocalSpawner::new();
let (send, response) = oneshot::channel();
spawner.spawn(Task::AddOne(10, send));
let eleven = response.await.unwrap();
assert_eq!(eleven, 11);
}
Implementations§
source§impl LocalSet
impl LocalSet
sourcepub fn enter(&self) -> LocalEnterGuard
pub fn enter(&self) -> LocalEnterGuard
Enters the context of this LocalSet
.
The spawn_local
method will spawn tasks on the LocalSet
whose
context you are inside.
sourcepub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> ⓘwhere
F: Future + 'static,
F::Output: 'static,
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> ⓘwhere F: Future + 'static, F::Output: 'static,
Spawns a !Send
task onto the local task set.
This task is guaranteed to be run on the current thread.
Unlike the free function spawn_local
, this method may be used to
spawn local tasks when the LocalSet
is not running. The provided
future will start running once the LocalSet
is next started, even if
you don’t await the returned JoinHandle
.
Examples
use tokio::task;
#[tokio::main]
async fn main() {
let local = task::LocalSet::new();
// Spawn a future on the local set. This future will be run when
// we call `run_until` to drive the task set.
local.spawn_local(async {
// ...
});
// Run the local task set.
local.run_until(async move {
// ...
}).await;
// When `run` finishes, we can spawn _more_ futures, which will
// run in subsequent calls to `run_until`.
local.spawn_local(async {
// ...
});
local.run_until(async move {
// ...
}).await;
}
sourcepub fn block_on<F>(&self, rt: &Runtime, future: F) -> F::Outputwhere
F: Future,
pub fn block_on<F>(&self, rt: &Runtime, future: F) -> F::Outputwhere F: Future,
Runs a future to completion on the provided runtime, driving any local futures spawned on this task set on the current thread.
This runs the given future on the runtime, blocking until it is
complete, and yielding its resolved result. Any tasks or timers which
the future spawns internally will be executed on the runtime. The future
may also call spawn_local
to spawn_local additional local futures on the
current thread.
This method should not be called from an asynchronous context.
Panics
This function panics if the executor is at capacity, if the provided future panics, or if called within an asynchronous execution context.
Notes
Since this function internally calls Runtime::block_on
, and drives
futures in the local task set inside that call to block_on
, the local
futures may not use in-place blocking. If a blocking call needs to be
issued from a local task, the spawn_blocking
API may be used instead.
For example, this will panic:
use tokio::runtime::Runtime;
use tokio::task;
let rt = Runtime::new().unwrap();
let local = task::LocalSet::new();
local.block_on(&rt, async {
let join = task::spawn_local(async {
let blocking_result = task::block_in_place(|| {
// ...
});
// ...
});
join.await.unwrap();
})
This, however, will not panic:
use tokio::runtime::Runtime;
use tokio::task;
let rt = Runtime::new().unwrap();
let local = task::LocalSet::new();
local.block_on(&rt, async {
let join = task::spawn_local(async {
let blocking_result = task::spawn_blocking(|| {
// ...
}).await;
// ...
});
join.await.unwrap();
})
sourcepub async fn run_until<F>(&self, future: F) -> F::Outputwhere
F: Future,
pub async fn run_until<F>(&self, future: F) -> F::Outputwhere F: Future,
Runs a future to completion on the local set, returning its output.
This returns a future that runs the given future with a local set,
allowing it to call spawn_local
to spawn additional !Send
futures.
Any local futures spawned on the local set will be driven in the
background until the future passed to run_until
completes. When the future
passed to run
finishes, any local futures which have not completed
will remain on the local set, and will be driven on subsequent calls to
run_until
or when awaiting the local set itself.
Examples
use tokio::task;
#[tokio::main]
async fn main() {
task::LocalSet::new().run_until(async {
task::spawn_local(async move {
// ...
}).await.unwrap();
// ...
}).await;
}
source§impl LocalSet
impl LocalSet
sourcepub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self
Available on tokio_unstable
only.
pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self
tokio_unstable
only.Configure how the LocalSet
responds to an unhandled panic on a
spawned task.
By default, an unhandled panic (i.e. a panic not caught by
std::panic::catch_unwind
) has no impact on the LocalSet
’s
execution. The panic is error value is forwarded to the task’s
JoinHandle
and all other spawned tasks continue running.
The unhandled_panic
option enables configuring this behavior.
UnhandledPanic::Ignore
is the default behavior. Panics on spawned tasks have no impact on theLocalSet
’s execution.UnhandledPanic::ShutdownRuntime
will force theLocalSet
to shutdown immediately when a spawned task panics even if that task’sJoinHandle
has not been dropped. All other spawned tasks will immediately terminate and further calls toLocalSet::block_on
andLocalSet::run_until
will panic.
Panics
This method panics if called after the LocalSet
has started
running.
Unstable
This option is currently unstable and its implementation is incomplete. The API may change or be removed in the future. See tokio-rs/tokio#4516 for more details.
Examples
The following demonstrates a LocalSet
configured to shutdown on
panic. The first spawned task panics and results in the LocalSet
shutting down. The second spawned task never has a chance to
execute. The call to run_until
will panic due to the runtime being
forcibly shutdown.
use tokio::runtime::UnhandledPanic;
tokio::task::LocalSet::new()
.unhandled_panic(UnhandledPanic::ShutdownRuntime)
.run_until(async {
tokio::task::spawn_local(async { panic!("boom"); });
tokio::task::spawn_local(async {
// This task never completes
});
// Do some work, but `run_until` will panic before it completes
})
.await;