#![warn(missing_docs)]
#![deny(rust_2018_idioms)]
use std::io::{self, Read, Write};
use std::path::Path;
use bytes::{Buf, BufMut};
use futures::{stream::Stream, Async, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
#[cfg(windows)]
use tokio_named_pipes::NamedPipe;
#[cfg(windows)]
mod win_permissions;
#[cfg(windows)]
pub use win_permissions::SecurityAttributes;
#[cfg(unix)]
mod unix_permissions;
#[cfg(unix)]
pub use unix_permissions::SecurityAttributes;
#[cfg(windows)]
const PIPE_AVAILABILITY_TIMEOUT: u64 = 5000;
pub fn dummy_endpoint() -> String {
let num: u64 = rand::Rng::gen(&mut rand::thread_rng());
if cfg!(windows) {
format!(r"\\.\pipe\my-pipe-{}", num)
} else {
format!(r"/tmp/my-uds-{}", num)
}
}
pub struct Endpoint {
path: String,
security_attributes: SecurityAttributes,
}
impl Endpoint {
#[cfg(not(windows))]
pub fn incoming(self, handle: &Handle) -> io::Result<Incoming> {
let inner = self.inner(handle)?.incoming();
unsafe {
self.security_attributes.apply_permissions(&self.path)?;
}
Ok(Incoming { inner, path: self.path })
}
#[cfg(windows)]
pub fn incoming(mut self, handle: &Handle) -> io::Result<Incoming> {
let pipe = self.inner(handle)?;
Ok(Incoming {
path: self.path.clone(),
inner: NamedPipeSupport {
path: self.path,
handle: handle.clone(),
pipe,
security_attributes: self.security_attributes,
},
})
}
#[cfg(windows)]
fn inner(&mut self, handle: &Handle) -> io::Result<NamedPipe> {
use miow::pipe::NamedPipeBuilder;
use std::os::windows::io::*;
let raw_handle = unsafe {
NamedPipeBuilder::new(&self.path)
.first(true)
.inbound(true)
.outbound(true)
.out_buffer_size(65536)
.in_buffer_size(65536)
.with_security_attributes(self.security_attributes.as_ptr())?
.into_raw_handle()
};
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
NamedPipe::from_pipe(mio_pipe, handle)
}
#[cfg(not(windows))]
fn inner(&self, _handle: &Handle) -> io::Result<tokio_uds::UnixListener> {
tokio_uds::UnixListener::bind(&self.path)
}
pub fn set_security_attributes(&mut self, security_attributes: SecurityAttributes) {
self.security_attributes = security_attributes;
}
pub fn path(&self) -> &str {
&self.path
}
pub fn new(path: String) -> Self {
Endpoint {
path,
security_attributes: SecurityAttributes::empty(),
}
}
}
pub struct RemoteId;
#[cfg(windows)]
struct NamedPipeSupport {
path: String,
handle: Handle,
pipe: NamedPipe,
security_attributes: SecurityAttributes,
}
#[cfg(windows)]
impl NamedPipeSupport {
fn replacement_pipe(&mut self) -> io::Result<NamedPipe> {
use miow::pipe::NamedPipeBuilder;
use std::os::windows::io::*;
let raw_handle = unsafe {
NamedPipeBuilder::new(&self.path)
.first(false)
.inbound(true)
.outbound(true)
.out_buffer_size(65536)
.in_buffer_size(65536)
.with_security_attributes(self.security_attributes.as_ptr())?
.into_raw_handle()
};
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
NamedPipe::from_pipe(mio_pipe, &self.handle)
}
}
pub struct Incoming {
path: String,
#[cfg(not(windows))]
inner: tokio_uds::Incoming,
#[cfg(windows)]
inner: NamedPipeSupport,
}
impl Stream for Incoming {
type Item = (IpcConnection, RemoteId);
type Error = io::Error;
#[cfg(not(windows))]
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
self.inner.poll().map(|poll| match poll {
Async::Ready(Some(val)) => Async::Ready(Some((IpcConnection { inner: val }, RemoteId))),
Async::Ready(None) => Async::Ready(None),
Async::NotReady => Async::NotReady,
})
}
#[cfg(windows)]
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
match self.inner.pipe.connect() {
Ok(()) => {
log::trace!("Incoming connection polled successfully");
let new_listener = self.inner.replacement_pipe()?;
Ok(Async::Ready(Some((
IpcConnection {
inner: ::std::mem::replace(&mut self.inner.pipe, new_listener),
},
RemoteId,
))))
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
log::trace!("Incoming connection was to block, waiting for connection to become writeable");
self.inner.pipe.poll_write_ready()?;
Ok(Async::NotReady)
} else {
Err(e)
}
}
}
}
}
#[cfg(unix)]
impl Drop for Incoming {
fn drop(&mut self) {
use std::fs;
if let Ok(()) = fs::remove_file(Path::new(&self.path)) {
log::trace!("Removed socket file at: {}", self.path)
}
}
}
pub struct IpcConnection {
#[cfg(not(windows))]
inner: tokio_uds::UnixStream,
#[cfg(windows)]
inner: tokio_named_pipes::NamedPipe,
}
impl IpcConnection {
pub fn connect<P: AsRef<Path>>(path: P, handle: &Handle) -> io::Result<IpcConnection> {
Ok(IpcConnection {
inner: Self::connect_inner(path.as_ref(), handle)?,
})
}
#[cfg(unix)]
fn connect_inner(path: &Path, _handle: &Handle) -> io::Result<tokio_uds::UnixStream> {
use futures::Future;
tokio_uds::UnixStream::connect(&path).wait()
}
#[cfg(windows)]
fn connect_inner(path: &Path, handle: &Handle) -> io::Result<NamedPipe> {
use std::fs::OpenOptions;
use std::os::windows::fs::OpenOptionsExt;
use std::os::windows::io::{FromRawHandle, IntoRawHandle};
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
miow::pipe::NamedPipe::wait(
path,
Some(std::time::Duration::from_millis(PIPE_AVAILABILITY_TIMEOUT)),
)?;
let file = OpenOptions::new()
.read(true)
.write(true)
.custom_flags(FILE_FLAG_OVERLAPPED)
.open(path)?;
let mio_pipe =
unsafe { mio_named_pipes::NamedPipe::from_raw_handle(file.into_raw_handle()) };
let pipe = NamedPipe::from_pipe(mio_pipe, handle)?;
Ok(pipe)
}
}
impl Read for IpcConnection {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl Write for IpcConnection {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl AsyncRead for IpcConnection {
unsafe fn prepare_uninitialized_buffer(&self, b: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(b)
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
}
}
impl AsyncWrite for IpcConnection {
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.inner)
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.write_buf(buf)
}
}
#[cfg(test)]
mod tests {
use futures::{sync::oneshot, Future, Stream};
use std::thread;
use std::time::Duration;
use tokio::{
self,
io::{self, AsyncRead},
reactor::Handle,
runtime::TaskExecutor,
};
use super::{dummy_endpoint, Endpoint, IpcConnection, SecurityAttributes};
use futures::sync::oneshot::Sender;
use std::path::Path;
fn run_server(path: &str, exec: TaskExecutor, handle: Handle) -> Sender<()> {
let path = path.to_owned();
let (ok_signal, ok_rx) = oneshot::channel();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
thread::spawn(move || {
let mut endpoint = Endpoint::new(path);
endpoint.set_security_attributes(
SecurityAttributes::empty()
.set_mode(0o777)
.unwrap()
);
let connections = endpoint
.incoming(&handle)
.expect("failed to open up a new pipe/socket");
let srv = connections
.for_each(|(stream, _)| {
let (reader, writer) = stream.split();
let buf = [0u8; 5];
io::read_exact(reader, buf)
.and_then(move |(_reader, buf)| {
let mut reply = vec![];
reply.extend(&buf[..]);
io::write_all(writer, reply)
})
.map_err(|e| {
log::trace!("io error: {:?}", e);
e
})
.map(|_| ())
})
.map_err(|_| ())
.select(
shutdown_rx
.map_err(|_| ())
)
.map(|(_, server)| {
drop(server);
()
})
.map_err(|_| {
()
});
exec.spawn(srv);
ok_signal.send(()).expect("failed to send ok");
println!("Server running.");
});
ok_rx.wait().expect("failed to receive handle");
shutdown_tx
}
#[test]
fn smoke_test() {
let mut runtime = tokio::runtime::Runtime::new().expect("Error creating tokio runtime");
let exec = runtime.executor();
#[allow(deprecated)]
let handle = runtime.reactor().clone();
let path = dummy_endpoint();
let shutdown = run_server(&path, exec, handle.clone());
println!("Connecting to client 0...");
let client_0 = IpcConnection::connect(&path, &handle)
.expect("failed to open client_0");
println!("Connecting to client 1...");
let client_1 = IpcConnection::connect(&path, &handle)
.expect("failed to open client_1");
let msg = b"hello";
let rx_buf = vec![0u8; msg.len()];
let client_0_fut = io::write_all(client_0, msg)
.map_err(|err| panic!("Client 0 write error: {:?}", err))
.and_then(move |(client, _)| {
io::read_exact(client, rx_buf)
.map(|(_, buf)| buf)
.map_err(|err| panic!("Client 0 read error: {:?}", err))
});
let rx_buf2 = vec![0u8; msg.len()];
let client_1_fut = io::write_all(client_1, msg)
.map_err(|err| panic!("Client 1 write error: {:?}", err))
.and_then(move |(client, _)| {
io::read_exact(client, rx_buf2)
.map(|(_, buf)| buf)
.map_err(|err| panic!("Client 1 read error: {:?}", err))
});
let fut = client_0_fut
.join(client_1_fut)
.and_then(move |(rx_msg, other_rx_msg)| {
assert_eq!(rx_msg, msg);
assert_eq!(other_rx_msg, msg);
Ok(())
})
.map_err(|err| panic!("Smoke test error: {:?}", err));
runtime.block_on(fut).expect("Runtime error");
if let Ok(()) = shutdown.send(()) {
thread::sleep(Duration::from_secs(1));
let path = Path::new(&path);
assert!(!path.exists());
}
}
#[cfg(windows)]
fn create_pipe_with_permissions(attr: SecurityAttributes) -> ::std::io::Result<()> {
let runtime = tokio::runtime::Runtime::new().expect("Error creating tokio runtime");
#[allow(deprecated)]
let handle = runtime.reactor();
let path = dummy_endpoint();
let mut endpoint = Endpoint::new(path);
endpoint.set_security_attributes(attr);
endpoint.incoming(handle).map(|_| ())
}
#[cfg(windows)]
#[test]
fn test_pipe_permissions() {
create_pipe_with_permissions(SecurityAttributes::empty())
.expect("failed with no attributes");
create_pipe_with_permissions(SecurityAttributes::allow_everyone_create().unwrap())
.expect("failed with attributes for creating");
create_pipe_with_permissions(SecurityAttributes::empty().allow_everyone_connect().unwrap())
.expect("failed with attributes for connecting");
}
}