use client::{
protobuf::MessageDyn,
shim::events,
ttrpc::{self, context::Context},
types::empty,
Client, Events, EventsClient,
};
use containerd_shim_protos as client;
#[cfg(unix)]
use crate::util::connect;
#[cfg(not(target_os = "macos"))] use crate::Error;
use crate::{
error::Result,
util::{convert_to_any, timestamp},
};
#[cfg(windows)]
const RETRY_COUNT: i32 = 3;
pub struct RemotePublisher {
client: EventsClient,
}
impl RemotePublisher {
pub fn new(address: impl AsRef<str>) -> Result<RemotePublisher> {
let client = Self::connect(address)?;
Ok(RemotePublisher {
client: EventsClient::new(client),
})
}
fn connect(address: impl AsRef<str>) -> Result<Client> {
#[cfg(unix)]
{
let fd = connect(address)?;
Client::new(fd).map_err(|err| err.into())
}
#[cfg(windows)]
{
for i in 0..RETRY_COUNT {
match Client::connect(address.as_ref()) {
Ok(client) => return Ok(client),
Err(e) => match e {
ttrpc::Error::Windows(231) => {
log::debug!("pipe busy during connect. try number {}", i);
std::thread::sleep(std::time::Duration::from_millis(5));
}
_ => return Err(e.into()),
},
}
}
Err(other!("failed to connect to {}", address.as_ref()))
}
}
pub fn publish(
&self,
ctx: Context,
topic: &str,
namespace: &str,
event: Box<dyn MessageDyn>,
) -> Result<()> {
let mut envelope = events::Envelope::new();
envelope.set_topic(topic.to_owned());
envelope.set_namespace(namespace.to_owned());
envelope.set_timestamp(timestamp()?);
envelope.set_event(convert_to_any(event)?);
let mut req = events::ForwardRequest::new();
req.set_envelope(envelope);
self.client.forward(ctx, &req)?;
Ok(())
}
}
impl Events for RemotePublisher {
fn forward(
&self,
_ctx: &ttrpc::TtrpcContext,
req: events::ForwardRequest,
) -> ttrpc::Result<empty::Empty> {
self.client.forward(Context::default(), &req)
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Barrier};
use client::{
api::{Empty, ForwardRequest},
events::task::TaskOOM,
};
use ttrpc::Server;
use super::*;
#[cfg(windows)]
use crate::synchronous::wait_socket_working;
struct FakeServer {}
impl Events for FakeServer {
fn forward(&self, _ctx: &ttrpc::TtrpcContext, req: ForwardRequest) -> ttrpc::Result<Empty> {
let env = req.envelope();
assert_eq!(env.topic(), "/tasks/oom");
Ok(Empty::default())
}
}
#[test]
fn test_connect() {
#[cfg(unix)]
let tmpdir = tempfile::tempdir().unwrap();
#[cfg(unix)]
let path = format!("{}/socket", tmpdir.as_ref().to_str().unwrap());
#[cfg(windows)]
let path = "\\\\.\\pipe\\test-pipe".to_string();
let path1 = path.clone();
assert!(RemotePublisher::connect("a".repeat(16384)).is_err());
assert!(RemotePublisher::connect(&path).is_err());
let barrier = Arc::new(Barrier::new(2));
let barrier2 = barrier.clone();
let thread = std::thread::spawn(move || {
let mut server = create_server(&path1);
server.start().unwrap();
#[cfg(windows)]
wait_socket_working(&path1, 5, 5).unwrap();
barrier2.wait();
barrier2.wait();
server.shutdown();
});
barrier.wait();
let client = RemotePublisher::new(&path).unwrap();
let mut msg = TaskOOM::new();
msg.set_container_id("test".to_string());
client
.publish(Context::default(), "/tasks/oom", "ns1", Box::new(msg))
.unwrap();
barrier.wait();
thread.join().unwrap();
}
fn create_server(server_address: &str) -> Server {
#[cfg(unix)]
{
use std::os::unix::{io::AsRawFd, net::UnixListener};
let listener = UnixListener::bind(server_address).unwrap();
listener.set_nonblocking(true).unwrap();
let t = Arc::new(Box::new(FakeServer {}) as Box<dyn Events + Send + Sync>);
let service = client::create_events(t);
let server = Server::new()
.add_listener(listener.as_raw_fd())
.unwrap()
.register_service(service);
std::mem::forget(listener);
server
}
#[cfg(windows)]
{
let t = Arc::new(Box::new(FakeServer {}) as Box<dyn Events + Send + Sync>);
let service = client::create_events(t);
Server::new()
.bind(server_address)
.unwrap()
.register_service(service)
}
}
}