1use libc::chmod;
2use std::ffi::CString;
3use std::io::{self, Error};
4use futures::Stream;
5use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
6use tokio::net::{UnixListener, UnixStream};
7use std::path::Path;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11pub struct SecurityAttributes {
13 mode: Option<u16>
15}
16
17impl SecurityAttributes {
18 pub fn empty() -> Self {
21 SecurityAttributes {
22 mode: Some(0o600)
23 }
24 }
25
26 pub fn allow_everyone_connect(mut self) -> io::Result<Self> {
28 self.mode = Some(0o666);
29 Ok(self)
30 }
31
32 pub fn set_mode(mut self, mode: u16) -> io::Result<Self> {
34 self.mode = Some(mode);
35 Ok(self)
36 }
37
38 pub fn allow_everyone_create() -> io::Result<Self> {
43 Ok(SecurityAttributes {
44 mode: None
45 })
46 }
47
48 fn apply_permissions(&self, path: &str) -> io::Result<()> {
51 if let Some(mode) = self.mode {
52 let path = CString::new(path)?;
53 if unsafe { chmod(path.as_ptr(), mode.into()) } == -1 {
54 return Err(Error::last_os_error());
55 }
56 }
57
58 Ok(())
59 }
60}
61
62pub struct Endpoint {
64 path: String,
65 security_attributes: SecurityAttributes,
66}
67
68impl Endpoint {
69 pub fn incoming(self) -> io::Result<impl Stream<Item = std::io::Result<impl AsyncRead + AsyncWrite>> + 'static> {
71 let listener = self.inner()?;
72 self.security_attributes.apply_permissions(&self.path)?;
75 Ok(Incoming {
76 path: self.path,
77 listener,
78 })
79 }
80
81 fn inner(&self) -> io::Result<UnixListener> {
83 UnixListener::bind(&self.path)
84 }
85
86 pub fn set_security_attributes(&mut self, security_attributes: SecurityAttributes) {
88 self.security_attributes = security_attributes;
89 }
90
91 pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Connection> {
93 Ok(Connection::wrap(UnixStream::connect(path.as_ref()).await?))
94 }
95
96 pub fn path(&self) -> &str {
98 &self.path
99 }
100
101 pub fn new(path: String) -> Self {
103 Endpoint {
104 path,
105 security_attributes: SecurityAttributes::empty(),
106 }
107 }
108}
109
110struct Incoming {
114 path: String,
115 listener: UnixListener,
116}
117
118impl Stream for Incoming {
119 type Item = io::Result<UnixStream>;
120
121 fn poll_next(
122 self: Pin<&mut Self>,
123 cx: &mut Context<'_>,
124 ) -> Poll<Option<Self::Item>> {
125 let this = Pin::into_inner(self);
126 match Pin::new(&mut this.listener).poll_accept(cx) {
127 Poll::Pending => Poll::Pending,
128 Poll::Ready(result) => Poll::Ready(Some(result.map(|(stream, _addr)| stream))),
129 }
130 }
131}
132
133impl Drop for Incoming {
134 fn drop(&mut self) {
135 use std::fs;
136 if let Ok(()) = fs::remove_file(&self.path) {
137 log::trace!("Removed socket file at: {}", self.path)
138 }
139 }
140}
141
142pub struct Connection {
144 inner: UnixStream,
145}
146
147impl Connection {
148 fn wrap(stream: UnixStream) -> Self {
149 Self { inner: stream }
150 }
151}
152
153impl AsyncRead for Connection {
154 fn poll_read(
155 self: Pin<&mut Self>,
156 ctx: &mut Context<'_>,
157 buf: &mut ReadBuf<'_>,
158 ) -> Poll<io::Result<()>> {
159 let this = Pin::into_inner(self);
160 Pin::new(&mut this.inner).poll_read(ctx, buf)
161 }
162}
163
164impl AsyncWrite for Connection {
165 fn poll_write(
166 self: Pin<&mut Self>,
167 ctx: &mut Context<'_>,
168 buf: &[u8],
169 ) -> Poll<Result<usize, io::Error>> {
170 let this = Pin::into_inner(self);
171 Pin::new(&mut this.inner).poll_write(ctx, buf)
172 }
173
174 fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
175 let this = Pin::into_inner(self);
176 Pin::new(&mut this.inner).poll_flush(ctx)
177 }
178
179 fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
180 let this = Pin::into_inner(self);
181 Pin::new(&mut this.inner).poll_shutdown(ctx)
182 }
183}