radicle_ci_broker/
notif.rsuse std::{
sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender},
time::Duration,
};
const EVENT_RECV_TIMEOUT: Duration = Duration::from_secs(1);
const UPDATE_IMTERVAL: Duration = Duration::from_secs(1);
pub struct NotificationSender {
sender: Sender<()>,
}
impl NotificationSender {
fn new(sender: Sender<()>) -> Self {
Self { sender }
}
pub fn notify(&self) -> Result<(), NotificationError> {
self.sender.send(()).map_err(NotificationError::Send)
}
}
pub struct NotificationReceiver {
receiver: Receiver<()>,
max_wait: Duration,
}
impl NotificationReceiver {
fn new(receiver: Receiver<()>, max_wait: Duration) -> Self {
Self { receiver, max_wait }
}
pub fn wait_for_notification(&self) -> Result<(), RecvTimeoutError> {
self.receiver.recv_timeout(self.max_wait)
}
}
pub struct NotificationChannel {
tx: Option<NotificationSender>,
rx: Option<NotificationReceiver>,
}
impl NotificationChannel {
pub fn new_event() -> Self {
Self::new(EVENT_RECV_TIMEOUT)
}
pub fn new_run() -> Self {
Self::new(UPDATE_IMTERVAL)
}
fn new(max_wait: Duration) -> Self {
let (tx, rx) = channel();
Self {
tx: Some(NotificationSender::new(tx)),
rx: Some(NotificationReceiver::new(rx, max_wait)),
}
}
}
impl NotificationChannel {
pub fn tx(&mut self) -> Result<NotificationSender, NotificationError> {
self.tx.take().ok_or(NotificationError::Sender)
}
pub fn rx(&mut self) -> Result<NotificationReceiver, NotificationError> {
self.rx.take().ok_or(NotificationError::Receiver)
}
}
#[derive(Debug, thiserror::Error)]
pub enum NotificationError {
#[error("receiver end point already extracted")]
Receiver,
#[error("sender end point already extracted")]
Sender,
#[error("error sending to channel")]
Send(#[source] std::sync::mpsc::SendError<()>),
}