use libc::chmod;
use std::ffi::CString;
use std::io::{self, Error};
use futures::Stream;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::{UnixListener, UnixStream};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct SecurityAttributes {
mode: Option<u16>
}
impl SecurityAttributes {
pub fn empty() -> Self {
SecurityAttributes {
mode: Some(0o600)
}
}
pub fn allow_everyone_connect(mut self) -> io::Result<Self> {
self.mode = Some(0o666);
Ok(self)
}
pub fn set_mode(mut self, mode: u16) -> io::Result<Self> {
self.mode = Some(mode);
Ok(self)
}
pub fn allow_everyone_create() -> io::Result<Self> {
Ok(SecurityAttributes {
mode: None
})
}
fn apply_permissions(&self, path: &str) -> io::Result<()> {
if let Some(mode) = self.mode {
let path = CString::new(path)?;
if unsafe { chmod(path.as_ptr(), mode.into()) } == -1 {
return Err(Error::last_os_error());
}
}
Ok(())
}
}
pub struct Endpoint {
path: String,
security_attributes: SecurityAttributes,
}
impl Endpoint {
pub fn incoming(self) -> io::Result<impl Stream<Item = std::io::Result<impl AsyncRead + AsyncWrite>> + 'static> {
let listener = self.inner()?;
self.security_attributes.apply_permissions(&self.path)?;
Ok(Incoming {
path: self.path,
listener,
})
}
fn inner(&self) -> io::Result<UnixListener> {
UnixListener::bind(&self.path)
}
pub fn set_security_attributes(&mut self, security_attributes: SecurityAttributes) {
self.security_attributes = security_attributes;
}
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Connection> {
Ok(Connection::wrap(UnixStream::connect(path.as_ref()).await?))
}
pub fn path(&self) -> &str {
&self.path
}
pub fn new(path: String) -> Self {
Endpoint {
path,
security_attributes: SecurityAttributes::empty(),
}
}
}
struct Incoming {
path: String,
listener: UnixListener,
}
impl Stream for Incoming {
type Item = io::Result<UnixStream>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
match Pin::new(&mut this.listener).poll_accept(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => Poll::Ready(Some(result.map(|(stream, _addr)| stream))),
}
}
}
impl Drop for Incoming {
fn drop(&mut self) {
use std::fs;
if let Ok(()) = fs::remove_file(&self.path) {
log::trace!("Removed socket file at: {}", self.path)
}
}
}
pub struct Connection {
inner: UnixStream,
}
impl Connection {
fn wrap(stream: UnixStream) -> Self {
Self { inner: stream }
}
}
impl AsyncRead for Connection {
fn poll_read(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = Pin::into_inner(self);
Pin::new(&mut this.inner).poll_read(ctx, buf)
}
}
impl AsyncWrite for Connection {
fn poll_write(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let this = Pin::into_inner(self);
Pin::new(&mut this.inner).poll_write(ctx, buf)
}
fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = Pin::into_inner(self);
Pin::new(&mut this.inner).poll_flush(ctx)
}
fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = Pin::into_inner(self);
Pin::new(&mut this.inner).poll_shutdown(ctx)
}
}