lance_core/utils/
tokio.rs1use std::time::Duration;
5
6use crate::Result;
7
8use futures::{Future, FutureExt};
9use tokio::runtime::{Builder, Runtime};
10use tracing::Span;
11
12pub fn get_num_compute_intensive_cpus() -> usize {
13 if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") {
14 return user_specified.parse().unwrap();
15 }
16
17 let cpus = num_cpus::get();
18
19 if cpus <= *IO_CORE_RESERVATION {
20 if cpus > 1 {
22 log::warn!(
23 "Number of CPUs is less than or equal to the number of IO core reservations. \
24 This is not a supported configuration. using 1 CPU for compute intensive tasks."
25 );
26 }
27 return 1;
28 }
29
30 num_cpus::get() - *IO_CORE_RESERVATION
31}
32
33lazy_static::lazy_static! {
34 pub static ref IO_CORE_RESERVATION: usize = std::env::var("LANCE_IO_CORE_RESERVATION").unwrap_or("2".to_string()).parse().unwrap();
35
36 pub static ref CPU_RUNTIME: Runtime = Builder::new_multi_thread()
37 .thread_name("lance-cpu")
38 .max_blocking_threads(get_num_compute_intensive_cpus())
39 .worker_threads(1)
40 .thread_keep_alive(Duration::from_secs(u64::MAX))
42 .build()
43 .unwrap();
44}
45
46pub fn spawn_cpu<F: FnOnce() -> Result<R> + Send + 'static, R: Send + 'static>(
55 func: F,
56) -> impl Future<Output = Result<R>> {
57 let (send, recv) = tokio::sync::oneshot::channel();
58 let span = Span::current();
60 CPU_RUNTIME.spawn_blocking(move || {
61 let _span_guard = span.enter();
62 let result = func();
63 let _ = send.send(result);
64 });
65 recv.map(|res| res.unwrap())
66}