ckb_stop_handler/
stop_register.rsuse ckb_channel::TrySendError;
use ckb_logger::{debug, info, trace, warn};
use ckb_util::Mutex;
use std::sync::atomic::AtomicBool;
use tokio_util::sync::CancellationToken;
struct CkbServiceHandles {
thread_handles: Vec<(String, std::thread::JoinHandle<()>)>,
}
pub fn wait_all_ckb_services_exit() {
info!("Waiting exit signal...");
let exit_signal = new_crossbeam_exit_rx();
let _ = exit_signal.recv();
let mut handles = CKB_HANDLES.lock();
debug!("wait_all_ckb_services_exit waiting all threads to exit");
for (name, join_handle) in handles.thread_handles.drain(..) {
match join_handle.join() {
Ok(_) => {
info!("Waiting thread {} done.", name);
}
Err(e) => {
warn!("Waiting thread {}: ERROR: {:?}", name, e)
}
}
}
info!("All ckb threads have been stopped");
}
static CKB_HANDLES: std::sync::LazyLock<Mutex<CkbServiceHandles>> =
std::sync::LazyLock::new(|| {
Mutex::new(CkbServiceHandles {
thread_handles: vec![],
})
});
static RECEIVED_STOP_SIGNAL: std::sync::LazyLock<AtomicBool> =
std::sync::LazyLock::new(AtomicBool::default);
static TOKIO_EXIT: std::sync::LazyLock<CancellationToken> =
std::sync::LazyLock::new(CancellationToken::new);
static CROSSBEAM_EXIT_SENDERS: std::sync::LazyLock<Mutex<Vec<ckb_channel::Sender<()>>>> =
std::sync::LazyLock::new(|| Mutex::new(vec![]));
pub fn new_tokio_exit_rx() -> CancellationToken {
TOKIO_EXIT.clone()
}
pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> {
let (tx, rx) = ckb_channel::bounded(1);
CROSSBEAM_EXIT_SENDERS.lock().push(tx);
rx
}
pub fn has_received_stop_signal() -> bool {
RECEIVED_STOP_SIGNAL.load(std::sync::atomic::Ordering::SeqCst)
}
pub fn broadcast_exit_signals() {
debug!("Received exit signal; broadcasting exit signal to all threads");
RECEIVED_STOP_SIGNAL.store(true, std::sync::atomic::Ordering::SeqCst);
TOKIO_EXIT.cancel();
CROSSBEAM_EXIT_SENDERS
.lock()
.iter()
.for_each(|tx| match tx.try_send(()) {
Ok(_) => {}
Err(TrySendError::Full(_)) => info!("Ckb process has received exit signal"),
Err(TrySendError::Disconnected(_)) => {
debug!("broadcast thread: channel is disconnected")
}
});
}
pub fn register_thread(name: &str, thread_handle: std::thread::JoinHandle<()>) {
trace!("Registering thread {}", name);
CKB_HANDLES
.lock()
.thread_handles
.push((name.into(), thread_handle));
trace!("Thread registration completed {}", name);
}