await_tree/
spawn.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// TODO: should we consider exposing `current_registry`
16// so that users can not only spawn tasks but also get and collect trees?
17
18// TODO: should we support "global registry" for users to quick start?
19
20use std::future::Future;
21
22use tokio::task::JoinHandle;
23
24use crate::root::current_registry;
25use crate::{Key, Span};
26
27/// Spawns a new asynchronous task instrumented with the given root [`Span`], returning a
28/// [`JoinHandle`] for it.
29///
30/// The spawned task will be registered in the current [`Registry`](crate::Registry) with the given
31/// [`Key`], if it exists. Otherwise, this is equivalent to [`tokio::spawn`].
32pub fn spawn<T>(key: impl Key, root_span: impl Into<Span>, future: T) -> JoinHandle<T::Output>
33where
34    T: Future + Send + 'static,
35    T::Output: Send + 'static,
36{
37    if let Some(registry) = current_registry() {
38        tokio::spawn(registry.register(key, root_span).instrument(future))
39    } else {
40        tokio::spawn(future)
41    }
42}
43
44/// Spawns a new asynchronous task instrumented with the given root [`Span`], returning a
45/// [`JoinHandle`] for it.
46///
47/// The spawned task will be registered in the current [`Registry`](crate::Registry) anonymously, if
48/// it exists. Otherwise, this is equivalent to [`tokio::spawn`].
49pub fn spawn_anonymous<T>(root_span: impl Into<Span>, future: T) -> JoinHandle<T::Output>
50where
51    T: Future + Send + 'static,
52    T::Output: Send + 'static,
53{
54    if let Some(registry) = current_registry() {
55        tokio::spawn(registry.register_anonymous(root_span).instrument(future))
56    } else {
57        tokio::spawn(future)
58    }
59}