lance_core/utils/
tokio.rsuse std::time::Duration;
use crate::Result;
use futures::{Future, FutureExt};
use tokio::runtime::{Builder, Runtime};
use tracing::Span;
pub fn get_num_compute_intensive_cpus() -> usize {
if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") {
return user_specified.parse().unwrap();
}
let cpus = num_cpus::get();
if cpus <= *IO_CORE_RESERVATION {
if cpus > 1 {
log::warn!(
"Number of CPUs is less than or equal to the number of IO core reservations. \
This is not a supported configuration. using 1 CPU for compute intensive tasks."
);
}
return 1;
}
num_cpus::get() - *IO_CORE_RESERVATION
}
lazy_static::lazy_static! {
pub static ref IO_CORE_RESERVATION: usize = std::env::var("LANCE_IO_CORE_RESERVATION").unwrap_or("2".to_string()).parse().unwrap();
pub static ref CPU_RUNTIME: Runtime = Builder::new_multi_thread()
.thread_name("lance-cpu")
.max_blocking_threads(get_num_compute_intensive_cpus())
.worker_threads(1)
.thread_keep_alive(Duration::from_secs(u64::MAX))
.build()
.unwrap();
}
pub fn spawn_cpu<F: FnOnce() -> Result<R> + Send + 'static, R: Send + 'static>(
func: F,
) -> impl Future<Output = Result<R>> {
let (send, recv) = tokio::sync::oneshot::channel();
let span = Span::current();
CPU_RUNTIME.spawn_blocking(move || {
let _span_guard = span.enter();
let result = func();
let _ = send.send(result);
});
recv.map(|res| res.unwrap())
}