use std::{
collections::HashMap,
ffi::OsStr,
path::{Path, PathBuf},
process::Command,
time::Duration,
};
use crate::{
db::{Db, DbError},
logger,
msg::{MessageError, Request, Response},
notif::NotificationSender,
run::{Run, RunState},
sensitive::Sensitive,
timeoutcmd::{TimeoutCommand, TimeoutError},
};
const NOT_EXITED: i32 = 999;
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct Adapter {
bin: PathBuf,
env: HashMap<String, String>,
}
impl Adapter {
pub fn new(bin: &Path) -> Self {
Self {
bin: bin.into(),
env: HashMap::new(),
}
}
pub fn with_environment(mut self, env: &HashMap<String, String>) -> Self {
for (key, value) in env.iter() {
self.env.insert(key.into(), value.into());
}
self
}
pub fn with_sensitive_environment(mut self, env: &HashMap<String, Sensitive>) -> Self {
for (key, value) in env.iter() {
self.env.insert(key.into(), value.as_str().into());
}
self
}
fn envs(&self) -> impl Iterator<Item = (&OsStr, &OsStr)> {
self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
}
pub fn run(
&self,
trigger: &Request,
run: &mut Run,
db: &Db,
run_notification: &NotificationSender,
max_run_time: Duration,
) -> Result<(), AdapterError> {
run.set_state(RunState::Triggered);
db.update_run(run).map_err(AdapterError::UpdateRun)?;
let x = self.run_helper(trigger, run, db, run_notification, max_run_time);
run.set_state(RunState::Finished);
db.update_run(run).map_err(AdapterError::UpdateRun)?;
x
}
fn run_helper(
&self,
trigger: &Request,
run: &mut Run,
db: &Db,
run_notification: &NotificationSender,
max_run_time: Duration,
) -> Result<(), AdapterError> {
assert!(matches!(trigger, Request::Trigger { .. }));
let mut cmd = Command::new(&self.bin);
cmd.envs(self.envs());
let mut child = TimeoutCommand::new(max_run_time);
child.feed_stdin(trigger.to_string().as_bytes());
let child = child.spawn(cmd).map_err(|err| match err {
TimeoutError::Spawn(_, err) => AdapterError::SpawnAdapter(self.bin.clone(), err),
_ => AdapterError::TimeoutCommand(err),
})?;
run_notification.notify()?;
let stdout = child.stdout();
if let Some(line) = stdout.line() {
let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
run_notification.notify()?;
match resp {
Response::Triggered { run_id, info_url } => {
run.set_state(RunState::Running);
run.set_adapter_run_id(run_id);
if let Some(url) = info_url {
run.set_adapter_info_url(&url);
}
db.update_run(run).map_err(AdapterError::UpdateRun)?;
}
_ => {
child.kill().ok();
return Err(AdapterError::NotTriggered(resp));
}
}
} else {
logger::adapter_no_first_response();
child.kill().ok();
return Err(AdapterError::NoFirstMessage);
}
if let Some(line) = stdout.line() {
let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
run_notification.notify()?;
match resp {
Response::Finished { result } => {
run.set_result(result);
db.update_run(run).map_err(AdapterError::UpdateRun)?;
}
_ => {
child.kill().ok();
return Err(AdapterError::NotFinished(resp));
}
}
} else {
logger::adapter_no_second_response();
child.kill().ok();
return Err(AdapterError::NoSecondMessage);
}
if let Some(line) = stdout.line() {
let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
logger::adapter_too_many_responses();
child.kill().ok();
return Err(AdapterError::TooMany(resp));
}
let stderr = child.stderr();
while let Some(line) = stderr.line() {
logger::adapter_stderr_line(&line);
}
let result = child.wait().expect("FIXME");
logger::debug2(format!(
"wait result? {result:?} status.code: {:?}",
result.status().code()
));
if result.timed_out() {
logger::adapter_did_not_exit_voluntarily();
return Err(AdapterError::Failed(NOT_EXITED));
} else if let Some(exit) = result.status().code() {
logger::adapter_result(exit);
if exit != 0 {
return Err(AdapterError::Failed(exit));
}
} else {
logger::adapter_did_not_exit();
return Err(AdapterError::Signal);
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum AdapterError {
#[error(transparent)]
TimeoutCommand(#[from] crate::timeoutcmd::TimeoutError),
#[error("failed to spawn a CI adapter sub-process: {0}")]
SpawnAdapter(PathBuf, #[source] std::io::Error),
#[error("failed to create a Response message from adapter output")]
ParseResponse(#[source] MessageError),
#[error("failed to write request to adapter stdin")]
RequestWrite(#[source] MessageError),
#[error("failed to get handle for adapter's stdin")]
StdinHandle,
#[error("failed to get handle for adapter's stdout")]
StdoutHandle,
#[error("failed to get handle for adapter's stderr")]
StderrHandle,
#[error("failed to read the adapter's stderr")]
ReadStderr(#[source] std::io::Error),
#[error("failed to read from adapter stdout")]
ReadLine(#[source] std::io::Error),
#[error("failed to wait for child process to exit")]
Wait(#[source] std::io::Error),
#[error("child process failed with wait status {0}")]
Failed(i32),
#[error("child process terminated by signal")]
Signal,
#[error("adapter's first message is not 'triggered', but {0:?}")]
NotTriggered(Response),
#[error("adapter did not sent its first message")]
NoFirstMessage,
#[error("adapter did not sent its second message")]
NoSecondMessage,
#[error("adapter's second message is not 'finished', but {0:?}")]
NotFinished(Response),
#[error("adapter sent too many messages: first extra is {0:#?}")]
TooMany(Response),
#[error("failed to update CI run information in database")]
UpdateRun(#[source] DbError),
#[error(transparent)]
Notif(#[from] crate::notif::NotificationError),
}
#[cfg(test)]
mod test {
use std::{fs::write, io::ErrorKind, time::Duration};
use tempfile::{tempdir, NamedTempFile};
use radicle::git::Oid;
use radicle::prelude::RepoId;
use super::{Adapter, Db, Run};
use crate::{
adapter::AdapterError,
msg::{MessageError, Response, RunResult},
notif::NotificationChannel,
run::Whence,
test::{mock_adapter, trigger_request, TestResult},
};
const MAX: Duration = Duration::from_secs(10);
fn db() -> anyhow::Result<Db> {
let tmp = NamedTempFile::new()?;
let db = Db::new(tmp.path())?;
Ok(db)
}
fn run() -> anyhow::Result<Run> {
Ok(Run::new(
RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?,
"test.repo",
Whence::branch(
"main",
Oid::try_from("ff3099ba5de28d954c41d0b5a84316f943794ea4")?,
Some("J. Random Hacker <random@example.com>"),
),
"2024-02-29T12:58:12+02:00".into(),
))
}
#[test]
fn adapter_reports_success() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX)?;
assert_eq!(run.result(), Some(&RunResult::Success));
Ok(())
}
#[test]
fn adapter_reports_failure() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"failure"}'
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
match x {
Ok(_) => (),
Err(AdapterError::RequestWrite(_)) => (),
_ => panic!("unexpected result: {x:#?}"),
}
Ok(())
}
#[test]
fn adapter_exits_nonzero() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"failure"}'
echo woe be me 1>&2
exit 1
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
eprintln!("{x:#?}");
assert!(x.is_err());
assert_eq!(run.result(), Some(&RunResult::Failure));
Ok(())
}
#[test]
fn adapter_is_killed_before_any_messages() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
kill -9 $BASHPID
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
eprintln!("{x:#?}");
assert!(matches!(x, Err(AdapterError::NoFirstMessage)));
Ok(())
}
#[test]
fn adapter_is_killed_after_first_message() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
kill -9 $BASHPID
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
eprintln!("{x:#?}");
assert!(matches!(x, Err(AdapterError::NoSecondMessage)));
Ok(())
}
#[test]
fn adapter_ends_ok_before_second_message() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
eprintln!("{x:#?}");
assert!(matches!(x, Err(AdapterError::NoSecondMessage)));
Ok(())
}
#[test]
fn adapter_is_killed_after_second_message() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
kill -9 $BASHPID
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
eprintln!("{x:#?}");
assert!(matches!(x, Err(AdapterError::Signal)));
Ok(())
}
#[test]
fn adapter_produces_as_bad_message() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success","bad":"field"}'
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
match x {
Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
_ => panic!("unexpected result: {x:#?}"),
}
Ok(())
}
#[test]
fn adapter_first_message_isnt_triggered() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"finished","result":"success"}'
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
eprintln!("{x:#?}");
assert!(matches!(
x,
Err(AdapterError::NotTriggered(Response::Finished {
result: RunResult::Success
}))
));
Ok(())
}
#[test]
fn adapter_outputs_too_many_messages() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
echo '{"response":"finished","result":"success"}'
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
eprintln!("{x:#?}");
assert!(matches!(
x,
Err(AdapterError::TooMany(Response::Finished {
result: RunResult::Success
}))
));
Ok(())
}
#[test]
fn adapter_does_not_exist() -> TestResult<()> {
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
eprintln!("{x:#?}");
match x {
Err(AdapterError::SpawnAdapter(filename, e)) => {
assert_eq!(bin, filename);
assert_eq!(e.kind(), ErrorKind::NotFound);
}
_ => panic!("expected a specific error"),
}
Ok(())
}
#[test]
fn adapter_is_not_executable() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
write(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
eprintln!("{x:#?}");
match x {
Err(AdapterError::SpawnAdapter(filename, e)) => {
assert_eq!(bin, filename);
assert_eq!(e.kind(), ErrorKind::PermissionDenied);
}
_ => panic!("expected a specific error"),
}
Ok(())
}
#[test]
fn adapter_has_bad_interpreter() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/does-not-exist
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
eprintln!("{x:#?}");
match x {
Err(AdapterError::SpawnAdapter(filename, e)) => {
assert_eq!(bin, filename);
assert_eq!(e.kind(), ErrorKind::NotFound);
}
_ => panic!("expected a specific error"),
}
Ok(())
}
}