radicle_ci_broker/
node_event_source.rsuse std::{fmt, path::PathBuf, time};
use radicle::{
node::{Event, Handle},
Profile,
};
use crate::logger;
pub struct NodeEventSource {
profile_path: PathBuf,
events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
}
impl NodeEventSource {
pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
let socket = profile.socket();
if !socket.exists() {
return Err(NodeEventError::NoControlSocket(socket));
}
let node = radicle::Node::new(socket.clone());
let source = match node.subscribe(time::Duration::MAX) {
Ok(events) => Ok(Self {
profile_path: profile.home.path().into(),
events: Box::new(events.into_iter()),
}),
Err(err) => {
logger::error("failed to subscribe to node events", &err);
Err(NodeEventError::CannotSubscribe(socket.clone(), err))
}
}?;
logger::node_event_source_created(&source);
Ok(source)
}
pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
logger::debug("node_event: try to get an event");
if let Some(event) = self.events.next() {
match event {
Ok(event) => {
logger::node_event_source_got_event(&event);
Ok(Some(event))
}
Err(radicle::node::Error::Io(err))
if err.kind() == std::io::ErrorKind::ConnectionReset =>
{
logger::event_disconnected();
Ok(None)
}
Err(err) => {
logger::error("error reading event from node", &err);
Err(NodeEventError::Node(err))
}
}
} else {
logger::node_event_source_eof(self);
Ok(None)
}
}
}
impl fmt::Debug for NodeEventSource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "NodeEventSource<path={}", self.profile_path.display())
}
}
#[derive(Debug, thiserror::Error)]
pub enum NodeEventError {
#[error("programming error in regular expression {0:?}")]
Regex(&'static str, regex::Error),
#[error("node control socket does not exist: {0}")]
NoControlSocket(PathBuf),
#[error("failed to subscribe to node events on socket {0}")]
CannotSubscribe(PathBuf, #[source] radicle::node::Error),
#[error(transparent)]
Node(#[from] radicle::node::Error),
#[error("connection to the node control socket broke")]
BrokenConnection,
#[error(transparent)]
Id(#[from] radicle::identity::IdError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("failed to read filter file: {0}")]
ReadFilterFile(PathBuf, #[source] std::io::Error),
#[error("failed to parser filters file: {0}")]
FiltersJsonFile(PathBuf, #[source] serde_json::Error),
#[error("failed to parser filters file: {0}")]
FiltersYamlFile(PathBuf, #[source] serde_yml::Error),
#[error("failed to parser filters as JSON")]
FiltersJsonString(#[source] serde_json::Error),
#[error("failed to parse string as a Git object id: {0:?}")]
ParseOid(String, #[source] radicle::git::raw::Error),
}