parity_tokio_ipc/
unix.rs

1use libc::chmod;
2use std::ffi::CString;
3use std::io::{self, Error};
4use futures::Stream;
5use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
6use tokio::net::{UnixListener, UnixStream};
7use std::path::Path;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11/// Socket permissions and ownership on UNIX
12pub struct SecurityAttributes {
13    // read/write permissions for owner, group and others in unix octal.
14    mode: Option<u16>
15}
16
17impl SecurityAttributes {
18    /// New default security attributes. These only allow access by the
19    /// process’s own user and the system administrator.
20    pub fn empty() -> Self {
21        SecurityAttributes {
22            mode: Some(0o600)
23        }
24    }
25
26    /// New security attributes that allow everyone to connect.
27    pub fn allow_everyone_connect(mut self) -> io::Result<Self> {
28        self.mode = Some(0o666);
29        Ok(self)
30    }
31
32    /// Set a custom permission on the socket
33    pub fn set_mode(mut self, mode: u16) -> io::Result<Self> {
34        self.mode = Some(mode);
35        Ok(self)
36    }
37
38    /// New security attributes that allow everyone to create.
39    ///
40    /// This does not work on unix, where it is equivalent to
41    /// [`SecurityAttributes::allow_everyone_connect`].
42    pub fn allow_everyone_create() -> io::Result<Self> {
43        Ok(SecurityAttributes {
44            mode: None
45        })
46    }
47
48    /// called in unix, after server socket has been created
49    /// will apply security attributes to the socket.
50    fn apply_permissions(&self, path: &str) -> io::Result<()> {
51        if let Some(mode) = self.mode {
52            let path = CString::new(path)?;
53            if unsafe { chmod(path.as_ptr(), mode.into()) } == -1 {
54                return Err(Error::last_os_error());
55            }
56        }
57
58        Ok(())
59    }
60}
61
62/// Endpoint implementation for unix systems
63pub struct Endpoint {
64    path: String,
65    security_attributes: SecurityAttributes,
66}
67
68impl Endpoint {
69    /// Stream of incoming connections
70    pub fn incoming(self) -> io::Result<impl Stream<Item = std::io::Result<impl AsyncRead + AsyncWrite>> + 'static> {
71        let listener = self.inner()?;
72        // the call to bind in `inner()` creates the file
73        // `apply_permission()` will set the file permissions.
74        self.security_attributes.apply_permissions(&self.path)?;
75        Ok(Incoming {
76            path: self.path,
77            listener,
78        })
79    }
80
81    /// Inner platform-dependant state of the endpoint
82    fn inner(&self) -> io::Result<UnixListener> {
83        UnixListener::bind(&self.path)
84    }
85
86    /// Set security attributes for the connection
87    pub fn set_security_attributes(&mut self, security_attributes: SecurityAttributes) {
88        self.security_attributes = security_attributes;
89    }
90
91    /// Make new connection using the provided path and running event pool
92    pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Connection> {
93        Ok(Connection::wrap(UnixStream::connect(path.as_ref()).await?))
94    }
95
96    /// Returns the path of the endpoint.
97    pub fn path(&self) -> &str {
98        &self.path
99    }
100
101    /// New IPC endpoint at the given path
102    pub fn new(path: String) -> Self {
103        Endpoint {
104            path,
105            security_attributes: SecurityAttributes::empty(),
106        }
107    }
108}
109
110/// Stream of incoming connections.
111///
112/// Removes the bound socket file when dropped.
113struct Incoming {
114    path: String,
115    listener: UnixListener,
116}
117
118impl Stream for Incoming {
119    type Item = io::Result<UnixStream>;
120
121    fn poll_next(
122        self: Pin<&mut Self>,
123        cx: &mut Context<'_>,
124    ) -> Poll<Option<Self::Item>> {
125        let this = Pin::into_inner(self);
126        match Pin::new(&mut this.listener).poll_accept(cx) {
127            Poll::Pending => Poll::Pending,
128            Poll::Ready(result) => Poll::Ready(Some(result.map(|(stream, _addr)| stream))),
129        }
130    }
131}
132
133impl Drop for Incoming {
134    fn drop(&mut self) {
135        use std::fs;
136        if let Ok(()) = fs::remove_file(&self.path) {
137            log::trace!("Removed socket file at: {}", self.path)
138        }
139    }
140}
141
142/// IPC connection.
143pub struct Connection {
144    inner: UnixStream,
145}
146
147impl Connection {
148    fn wrap(stream: UnixStream) -> Self {
149        Self { inner: stream }
150    }
151}
152
153impl AsyncRead for Connection {
154    fn poll_read(
155        self: Pin<&mut Self>,
156        ctx: &mut Context<'_>,
157        buf: &mut ReadBuf<'_>,
158    ) -> Poll<io::Result<()>> {
159        let this = Pin::into_inner(self);
160        Pin::new(&mut this.inner).poll_read(ctx, buf)
161    }
162}
163
164impl AsyncWrite for Connection {
165    fn poll_write(
166        self: Pin<&mut Self>,
167        ctx: &mut Context<'_>,
168        buf: &[u8],
169    ) -> Poll<Result<usize, io::Error>> {
170        let this = Pin::into_inner(self);
171        Pin::new(&mut this.inner).poll_write(ctx, buf)
172    }
173
174    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
175        let this = Pin::into_inner(self);
176        Pin::new(&mut this.inner).poll_flush(ctx)
177    }
178
179    fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
180        let this = Pin::into_inner(self);
181        Pin::new(&mut this.inner).poll_shutdown(ctx)
182    }
183}