parity_tokio_ipc/
win.rs

1use winapi::shared::winerror::{ERROR_PIPE_BUSY, ERROR_SUCCESS};
2use winapi::um::accctrl::*;
3use winapi::um::aclapi::*;
4use winapi::um::minwinbase::{LPTR, PSECURITY_ATTRIBUTES, SECURITY_ATTRIBUTES};
5use winapi::um::securitybaseapi::*;
6use winapi::um::winbase::{LocalAlloc, LocalFree};
7use winapi::um::winnt::*;
8
9use futures::Stream;
10use std::io;
11use std::marker;
12use std::mem;
13use std::path::Path;
14use std::pin::Pin;
15use std::ptr;
16use std::task::{Context, Poll};
17use std::time::{Duration, Instant};
18use tokio::io::{AsyncRead, AsyncWrite};
19
20use tokio::net::windows::named_pipe;
21
22enum NamedPipe {
23    Server(named_pipe::NamedPipeServer),
24    Client(named_pipe::NamedPipeClient),
25}
26
27const PIPE_AVAILABILITY_TIMEOUT: Duration = Duration::from_secs(5);
28
29/// Endpoint implementation for windows
30pub struct Endpoint {
31    path: String,
32    security_attributes: SecurityAttributes,
33    created_listener: bool,
34}
35
36impl Endpoint {
37    /// Stream of incoming connections
38    pub fn incoming(
39        mut self,
40    ) -> io::Result<impl Stream<Item = io::Result<impl AsyncRead + AsyncWrite>> + 'static> {
41        let pipe = self.create_listener()?;
42
43        let stream =
44            futures::stream::try_unfold((pipe, self), |(listener, mut endpoint)| async move {
45                let () = listener.connect().await?;
46
47                let new_listener = endpoint.create_listener()?;
48
49                let conn = Connection::wrap(NamedPipe::Server(listener));
50
51                Ok(Some((conn, (new_listener, endpoint))))
52            });
53
54        Ok(stream)
55    }
56
57    fn create_listener(&mut self) -> io::Result<named_pipe::NamedPipeServer> {
58        let server = unsafe {
59            named_pipe::ServerOptions::new()
60                .first_pipe_instance(!self.created_listener)
61                .reject_remote_clients(true)
62                .access_inbound(true)
63                .access_outbound(true)
64                .in_buffer_size(65536)
65                .out_buffer_size(65536)
66                .create_with_security_attributes_raw(
67                    &self.path,
68                    self.security_attributes.as_ptr() as *mut libc::c_void,
69                )
70        }?;
71        self.created_listener = true;
72
73        Ok(server)
74    }
75
76    /// Set security attributes for the connection
77    pub fn set_security_attributes(&mut self, security_attributes: SecurityAttributes) {
78        self.security_attributes = security_attributes;
79    }
80
81    /// Returns the path of the endpoint.
82    pub fn path(&self) -> &str {
83        &self.path
84    }
85
86    /// Make new connection using the provided path and running event pool.
87    pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Connection> {
88        let path = path.as_ref();
89
90        // There is not async equivalent of waiting for a named pipe in Windows,
91        // so we keep trying or sleeping for a bit, until we hit a timeout
92        let attempt_start = Instant::now();
93        let client = loop {
94            match named_pipe::ClientOptions::new()
95                .read(true)
96                .write(true)
97                .open(path)
98            {
99                Ok(client) => break client,
100                Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {
101                    if attempt_start.elapsed() < PIPE_AVAILABILITY_TIMEOUT {
102                        tokio::time::sleep(Duration::from_millis(50)).await;
103                        continue;
104                    } else {
105                        return Err(e);
106                    }
107                }
108                Err(e) => return Err(e),
109            }
110        };
111
112        Ok(Connection::wrap(NamedPipe::Client(client)))
113    }
114
115    /// New IPC endpoint at the given path
116    pub fn new(path: String) -> Self {
117        Endpoint {
118            path,
119            security_attributes: SecurityAttributes::empty(),
120            created_listener: false,
121        }
122    }
123}
124
125/// IPC connection.
126pub struct Connection {
127    inner: NamedPipe,
128}
129
130impl Connection {
131    /// Wraps an existing named pipe
132    fn wrap(pipe: NamedPipe) -> Self {
133        Self { inner: pipe }
134    }
135}
136
137impl AsyncRead for Connection {
138    fn poll_read(
139        self: Pin<&mut Self>,
140        ctx: &mut Context<'_>,
141        buf: &mut tokio::io::ReadBuf<'_>,
142    ) -> Poll<io::Result<()>> {
143        let this = Pin::into_inner(self);
144        match this.inner {
145            NamedPipe::Client(ref mut c) => Pin::new(c).poll_read(ctx, buf),
146            NamedPipe::Server(ref mut s) => Pin::new(s).poll_read(ctx, buf),
147        }
148    }
149}
150
151impl AsyncWrite for Connection {
152    fn poll_write(
153        self: Pin<&mut Self>,
154        ctx: &mut Context<'_>,
155        buf: &[u8],
156    ) -> Poll<Result<usize, io::Error>> {
157        let this = Pin::into_inner(self);
158        match this.inner {
159            NamedPipe::Client(ref mut c) => Pin::new(c).poll_write(ctx, buf),
160            NamedPipe::Server(ref mut s) => Pin::new(s).poll_write(ctx, buf),
161        }
162    }
163
164    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
165        let this = Pin::into_inner(self);
166        match this.inner {
167            NamedPipe::Client(ref mut c) => Pin::new(c).poll_flush(ctx),
168            NamedPipe::Server(ref mut s) => Pin::new(s).poll_flush(ctx),
169        }
170    }
171
172    fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
173        let this = Pin::into_inner(self);
174        match this.inner {
175            NamedPipe::Client(ref mut c) => Pin::new(c).poll_shutdown(ctx),
176            NamedPipe::Server(ref mut s) => Pin::new(s).poll_shutdown(ctx),
177        }
178    }
179}
180
181/// Security attributes.
182pub struct SecurityAttributes {
183    attributes: Option<InnerAttributes>,
184}
185
186pub const DEFAULT_SECURITY_ATTRIBUTES: SecurityAttributes = SecurityAttributes {
187    attributes: Some(InnerAttributes {
188        descriptor: SecurityDescriptor {
189            descriptor_ptr: ptr::null_mut(),
190        },
191        acl: Acl {
192            acl_ptr: ptr::null_mut(),
193        },
194        attrs: SECURITY_ATTRIBUTES {
195            nLength: mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
196            lpSecurityDescriptor: ptr::null_mut(),
197            bInheritHandle: 0,
198        },
199    }),
200};
201
202impl SecurityAttributes {
203    /// New default security attributes.
204    pub fn empty() -> SecurityAttributes {
205        DEFAULT_SECURITY_ATTRIBUTES
206    }
207
208    /// New default security attributes that allow everyone to connect.
209    pub fn allow_everyone_connect(&self) -> io::Result<SecurityAttributes> {
210        let attributes = Some(InnerAttributes::allow_everyone(
211            GENERIC_READ | FILE_WRITE_DATA,
212        )?);
213        Ok(SecurityAttributes { attributes })
214    }
215
216    /// Set a custom permission on the socket
217    pub fn set_mode(self, _mode: u32) -> io::Result<Self> {
218        // for now, does nothing.
219        Ok(self)
220    }
221
222    /// New default security attributes that allow everyone to create.
223    pub fn allow_everyone_create() -> io::Result<SecurityAttributes> {
224        let attributes = Some(InnerAttributes::allow_everyone(
225            GENERIC_READ | GENERIC_WRITE,
226        )?);
227        Ok(SecurityAttributes { attributes })
228    }
229
230    /// Return raw handle of security attributes.
231    pub(crate) unsafe fn as_ptr(&mut self) -> PSECURITY_ATTRIBUTES {
232        match self.attributes.as_mut() {
233            Some(attributes) => attributes.as_ptr(),
234            None => ptr::null_mut(),
235        }
236    }
237}
238
239unsafe impl Send for SecurityAttributes {}
240
241struct Sid {
242    sid_ptr: PSID,
243}
244
245impl Sid {
246    fn everyone_sid() -> io::Result<Sid> {
247        let mut sid_ptr = ptr::null_mut();
248        let result = unsafe {
249            #[allow(const_item_mutation)]
250            AllocateAndInitializeSid(
251                SECURITY_WORLD_SID_AUTHORITY.as_mut_ptr() as *mut _,
252                1,
253                SECURITY_WORLD_RID,
254                0,
255                0,
256                0,
257                0,
258                0,
259                0,
260                0,
261                &mut sid_ptr,
262            )
263        };
264        if result == 0 {
265            Err(io::Error::last_os_error())
266        } else {
267            Ok(Sid { sid_ptr })
268        }
269    }
270
271    // Unsafe - the returned pointer is only valid for the lifetime of self.
272    unsafe fn as_ptr(&self) -> PSID {
273        self.sid_ptr
274    }
275}
276
277impl Drop for Sid {
278    fn drop(&mut self) {
279        if !self.sid_ptr.is_null() {
280            unsafe {
281                FreeSid(self.sid_ptr);
282            }
283        }
284    }
285}
286
287struct AceWithSid<'a> {
288    explicit_access: EXPLICIT_ACCESS_W,
289    _marker: marker::PhantomData<&'a Sid>,
290}
291
292impl<'a> AceWithSid<'a> {
293    fn new(sid: &'a Sid, trustee_type: u32) -> AceWithSid<'a> {
294        let mut explicit_access = unsafe { mem::zeroed::<EXPLICIT_ACCESS_W>() };
295        explicit_access.Trustee.TrusteeForm = TRUSTEE_IS_SID;
296        explicit_access.Trustee.TrusteeType = trustee_type;
297        explicit_access.Trustee.ptstrName = unsafe { sid.as_ptr() as *mut _ };
298
299        AceWithSid {
300            explicit_access,
301            _marker: marker::PhantomData,
302        }
303    }
304
305    fn set_access_mode(&mut self, access_mode: u32) -> &mut Self {
306        self.explicit_access.grfAccessMode = access_mode;
307        self
308    }
309
310    fn set_access_permissions(&mut self, access_permissions: u32) -> &mut Self {
311        self.explicit_access.grfAccessPermissions = access_permissions;
312        self
313    }
314
315    fn allow_inheritance(&mut self, inheritance_flags: u32) -> &mut Self {
316        self.explicit_access.grfInheritance = inheritance_flags;
317        self
318    }
319}
320
321struct Acl {
322    acl_ptr: PACL,
323}
324
325impl Acl {
326    fn empty() -> io::Result<Acl> {
327        Self::new(&mut [])
328    }
329
330    fn new(entries: &mut [AceWithSid<'_>]) -> io::Result<Acl> {
331        let mut acl_ptr = ptr::null_mut();
332        let result = unsafe {
333            SetEntriesInAclW(
334                entries.len() as u32,
335                entries.as_mut_ptr() as *mut _,
336                ptr::null_mut(),
337                &mut acl_ptr,
338            )
339        };
340
341        if result != ERROR_SUCCESS {
342            return Err(io::Error::from_raw_os_error(result as i32));
343        }
344
345        Ok(Acl { acl_ptr })
346    }
347
348    unsafe fn as_ptr(&self) -> PACL {
349        self.acl_ptr
350    }
351}
352
353impl Drop for Acl {
354    fn drop(&mut self) {
355        if !self.acl_ptr.is_null() {
356            unsafe { LocalFree(self.acl_ptr as *mut _) };
357        }
358    }
359}
360
361struct SecurityDescriptor {
362    descriptor_ptr: PSECURITY_DESCRIPTOR,
363}
364
365impl SecurityDescriptor {
366    fn new() -> io::Result<Self> {
367        let descriptor_ptr = unsafe { LocalAlloc(LPTR, SECURITY_DESCRIPTOR_MIN_LENGTH) };
368        if descriptor_ptr.is_null() {
369            return Err(io::Error::new(
370                io::ErrorKind::Other,
371                "Failed to allocate security descriptor",
372            ));
373        }
374
375        if unsafe {
376            InitializeSecurityDescriptor(descriptor_ptr, SECURITY_DESCRIPTOR_REVISION) == 0
377        } {
378            return Err(io::Error::last_os_error());
379        };
380
381        Ok(SecurityDescriptor { descriptor_ptr })
382    }
383
384    fn set_dacl(&mut self, acl: &Acl) -> io::Result<()> {
385        if unsafe {
386            SetSecurityDescriptorDacl(self.descriptor_ptr, true as i32, acl.as_ptr(), false as i32)
387                == 0
388        } {
389            return Err(io::Error::last_os_error());
390        }
391        Ok(())
392    }
393
394    unsafe fn as_ptr(&self) -> PSECURITY_DESCRIPTOR {
395        self.descriptor_ptr
396    }
397}
398
399impl Drop for SecurityDescriptor {
400    fn drop(&mut self) {
401        if !self.descriptor_ptr.is_null() {
402            unsafe { LocalFree(self.descriptor_ptr) };
403            self.descriptor_ptr = ptr::null_mut();
404        }
405    }
406}
407
408struct InnerAttributes {
409    descriptor: SecurityDescriptor,
410    acl: Acl,
411    attrs: SECURITY_ATTRIBUTES,
412}
413
414impl InnerAttributes {
415    fn empty() -> io::Result<InnerAttributes> {
416        let descriptor = SecurityDescriptor::new()?;
417        let mut attrs = unsafe { mem::zeroed::<SECURITY_ATTRIBUTES>() };
418        attrs.nLength = mem::size_of::<SECURITY_ATTRIBUTES>() as u32;
419        attrs.lpSecurityDescriptor = unsafe { descriptor.as_ptr() };
420        attrs.bInheritHandle = false as i32;
421
422        let acl = Acl::empty().expect("this should never fail");
423
424        Ok(InnerAttributes {
425            acl,
426            descriptor,
427            attrs,
428        })
429    }
430
431    fn allow_everyone(permissions: u32) -> io::Result<InnerAttributes> {
432        let mut attributes = Self::empty()?;
433        let sid = Sid::everyone_sid()?;
434
435        let mut everyone_ace = AceWithSid::new(&sid, TRUSTEE_IS_WELL_KNOWN_GROUP);
436        everyone_ace
437            .set_access_mode(SET_ACCESS)
438            .set_access_permissions(permissions)
439            .allow_inheritance(false as u32);
440
441        let mut entries = vec![everyone_ace];
442        attributes.acl = Acl::new(&mut entries)?;
443        attributes.descriptor.set_dacl(&attributes.acl)?;
444
445        Ok(attributes)
446    }
447
448    unsafe fn as_ptr(&mut self) -> PSECURITY_ATTRIBUTES {
449        &mut self.attrs as *mut _
450    }
451}
452
453#[cfg(test)]
454mod test {
455    use super::SecurityAttributes;
456
457    #[test]
458    fn test_allow_everyone_everything() {
459        SecurityAttributes::allow_everyone_create()
460            .expect("failed to create security attributes that allow everyone to create a pipe");
461    }
462
463    #[test]
464    fn test_allow_eveyone_read_write() {
465        SecurityAttributes::empty()
466            .allow_everyone_connect()
467            .expect("failed to create security attributes that allow everyone to read and write to/from a pipe");
468    }
469}