1use winapi::shared::winerror::{ERROR_PIPE_BUSY, ERROR_SUCCESS};
2use winapi::um::accctrl::*;
3use winapi::um::aclapi::*;
4use winapi::um::minwinbase::{LPTR, PSECURITY_ATTRIBUTES, SECURITY_ATTRIBUTES};
5use winapi::um::securitybaseapi::*;
6use winapi::um::winbase::{LocalAlloc, LocalFree};
7use winapi::um::winnt::*;
8
9use futures::Stream;
10use std::io;
11use std::marker;
12use std::mem;
13use std::path::Path;
14use std::pin::Pin;
15use std::ptr;
16use std::task::{Context, Poll};
17use std::time::{Duration, Instant};
18use tokio::io::{AsyncRead, AsyncWrite};
19
20use tokio::net::windows::named_pipe;
21
22enum NamedPipe {
23 Server(named_pipe::NamedPipeServer),
24 Client(named_pipe::NamedPipeClient),
25}
26
27const PIPE_AVAILABILITY_TIMEOUT: Duration = Duration::from_secs(5);
28
29pub struct Endpoint {
31 path: String,
32 security_attributes: SecurityAttributes,
33 created_listener: bool,
34}
35
36impl Endpoint {
37 pub fn incoming(
39 mut self,
40 ) -> io::Result<impl Stream<Item = io::Result<impl AsyncRead + AsyncWrite>> + 'static> {
41 let pipe = self.create_listener()?;
42
43 let stream =
44 futures::stream::try_unfold((pipe, self), |(listener, mut endpoint)| async move {
45 let () = listener.connect().await?;
46
47 let new_listener = endpoint.create_listener()?;
48
49 let conn = Connection::wrap(NamedPipe::Server(listener));
50
51 Ok(Some((conn, (new_listener, endpoint))))
52 });
53
54 Ok(stream)
55 }
56
57 fn create_listener(&mut self) -> io::Result<named_pipe::NamedPipeServer> {
58 let server = unsafe {
59 named_pipe::ServerOptions::new()
60 .first_pipe_instance(!self.created_listener)
61 .reject_remote_clients(true)
62 .access_inbound(true)
63 .access_outbound(true)
64 .in_buffer_size(65536)
65 .out_buffer_size(65536)
66 .create_with_security_attributes_raw(
67 &self.path,
68 self.security_attributes.as_ptr() as *mut libc::c_void,
69 )
70 }?;
71 self.created_listener = true;
72
73 Ok(server)
74 }
75
76 pub fn set_security_attributes(&mut self, security_attributes: SecurityAttributes) {
78 self.security_attributes = security_attributes;
79 }
80
81 pub fn path(&self) -> &str {
83 &self.path
84 }
85
86 pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Connection> {
88 let path = path.as_ref();
89
90 let attempt_start = Instant::now();
93 let client = loop {
94 match named_pipe::ClientOptions::new()
95 .read(true)
96 .write(true)
97 .open(path)
98 {
99 Ok(client) => break client,
100 Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {
101 if attempt_start.elapsed() < PIPE_AVAILABILITY_TIMEOUT {
102 tokio::time::sleep(Duration::from_millis(50)).await;
103 continue;
104 } else {
105 return Err(e);
106 }
107 }
108 Err(e) => return Err(e),
109 }
110 };
111
112 Ok(Connection::wrap(NamedPipe::Client(client)))
113 }
114
115 pub fn new(path: String) -> Self {
117 Endpoint {
118 path,
119 security_attributes: SecurityAttributes::empty(),
120 created_listener: false,
121 }
122 }
123}
124
125pub struct Connection {
127 inner: NamedPipe,
128}
129
130impl Connection {
131 fn wrap(pipe: NamedPipe) -> Self {
133 Self { inner: pipe }
134 }
135}
136
137impl AsyncRead for Connection {
138 fn poll_read(
139 self: Pin<&mut Self>,
140 ctx: &mut Context<'_>,
141 buf: &mut tokio::io::ReadBuf<'_>,
142 ) -> Poll<io::Result<()>> {
143 let this = Pin::into_inner(self);
144 match this.inner {
145 NamedPipe::Client(ref mut c) => Pin::new(c).poll_read(ctx, buf),
146 NamedPipe::Server(ref mut s) => Pin::new(s).poll_read(ctx, buf),
147 }
148 }
149}
150
151impl AsyncWrite for Connection {
152 fn poll_write(
153 self: Pin<&mut Self>,
154 ctx: &mut Context<'_>,
155 buf: &[u8],
156 ) -> Poll<Result<usize, io::Error>> {
157 let this = Pin::into_inner(self);
158 match this.inner {
159 NamedPipe::Client(ref mut c) => Pin::new(c).poll_write(ctx, buf),
160 NamedPipe::Server(ref mut s) => Pin::new(s).poll_write(ctx, buf),
161 }
162 }
163
164 fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
165 let this = Pin::into_inner(self);
166 match this.inner {
167 NamedPipe::Client(ref mut c) => Pin::new(c).poll_flush(ctx),
168 NamedPipe::Server(ref mut s) => Pin::new(s).poll_flush(ctx),
169 }
170 }
171
172 fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
173 let this = Pin::into_inner(self);
174 match this.inner {
175 NamedPipe::Client(ref mut c) => Pin::new(c).poll_shutdown(ctx),
176 NamedPipe::Server(ref mut s) => Pin::new(s).poll_shutdown(ctx),
177 }
178 }
179}
180
181pub struct SecurityAttributes {
183 attributes: Option<InnerAttributes>,
184}
185
186pub const DEFAULT_SECURITY_ATTRIBUTES: SecurityAttributes = SecurityAttributes {
187 attributes: Some(InnerAttributes {
188 descriptor: SecurityDescriptor {
189 descriptor_ptr: ptr::null_mut(),
190 },
191 acl: Acl {
192 acl_ptr: ptr::null_mut(),
193 },
194 attrs: SECURITY_ATTRIBUTES {
195 nLength: mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
196 lpSecurityDescriptor: ptr::null_mut(),
197 bInheritHandle: 0,
198 },
199 }),
200};
201
202impl SecurityAttributes {
203 pub fn empty() -> SecurityAttributes {
205 DEFAULT_SECURITY_ATTRIBUTES
206 }
207
208 pub fn allow_everyone_connect(&self) -> io::Result<SecurityAttributes> {
210 let attributes = Some(InnerAttributes::allow_everyone(
211 GENERIC_READ | FILE_WRITE_DATA,
212 )?);
213 Ok(SecurityAttributes { attributes })
214 }
215
216 pub fn set_mode(self, _mode: u32) -> io::Result<Self> {
218 Ok(self)
220 }
221
222 pub fn allow_everyone_create() -> io::Result<SecurityAttributes> {
224 let attributes = Some(InnerAttributes::allow_everyone(
225 GENERIC_READ | GENERIC_WRITE,
226 )?);
227 Ok(SecurityAttributes { attributes })
228 }
229
230 pub(crate) unsafe fn as_ptr(&mut self) -> PSECURITY_ATTRIBUTES {
232 match self.attributes.as_mut() {
233 Some(attributes) => attributes.as_ptr(),
234 None => ptr::null_mut(),
235 }
236 }
237}
238
239unsafe impl Send for SecurityAttributes {}
240
241struct Sid {
242 sid_ptr: PSID,
243}
244
245impl Sid {
246 fn everyone_sid() -> io::Result<Sid> {
247 let mut sid_ptr = ptr::null_mut();
248 let result = unsafe {
249 #[allow(const_item_mutation)]
250 AllocateAndInitializeSid(
251 SECURITY_WORLD_SID_AUTHORITY.as_mut_ptr() as *mut _,
252 1,
253 SECURITY_WORLD_RID,
254 0,
255 0,
256 0,
257 0,
258 0,
259 0,
260 0,
261 &mut sid_ptr,
262 )
263 };
264 if result == 0 {
265 Err(io::Error::last_os_error())
266 } else {
267 Ok(Sid { sid_ptr })
268 }
269 }
270
271 unsafe fn as_ptr(&self) -> PSID {
273 self.sid_ptr
274 }
275}
276
277impl Drop for Sid {
278 fn drop(&mut self) {
279 if !self.sid_ptr.is_null() {
280 unsafe {
281 FreeSid(self.sid_ptr);
282 }
283 }
284 }
285}
286
287struct AceWithSid<'a> {
288 explicit_access: EXPLICIT_ACCESS_W,
289 _marker: marker::PhantomData<&'a Sid>,
290}
291
292impl<'a> AceWithSid<'a> {
293 fn new(sid: &'a Sid, trustee_type: u32) -> AceWithSid<'a> {
294 let mut explicit_access = unsafe { mem::zeroed::<EXPLICIT_ACCESS_W>() };
295 explicit_access.Trustee.TrusteeForm = TRUSTEE_IS_SID;
296 explicit_access.Trustee.TrusteeType = trustee_type;
297 explicit_access.Trustee.ptstrName = unsafe { sid.as_ptr() as *mut _ };
298
299 AceWithSid {
300 explicit_access,
301 _marker: marker::PhantomData,
302 }
303 }
304
305 fn set_access_mode(&mut self, access_mode: u32) -> &mut Self {
306 self.explicit_access.grfAccessMode = access_mode;
307 self
308 }
309
310 fn set_access_permissions(&mut self, access_permissions: u32) -> &mut Self {
311 self.explicit_access.grfAccessPermissions = access_permissions;
312 self
313 }
314
315 fn allow_inheritance(&mut self, inheritance_flags: u32) -> &mut Self {
316 self.explicit_access.grfInheritance = inheritance_flags;
317 self
318 }
319}
320
321struct Acl {
322 acl_ptr: PACL,
323}
324
325impl Acl {
326 fn empty() -> io::Result<Acl> {
327 Self::new(&mut [])
328 }
329
330 fn new(entries: &mut [AceWithSid<'_>]) -> io::Result<Acl> {
331 let mut acl_ptr = ptr::null_mut();
332 let result = unsafe {
333 SetEntriesInAclW(
334 entries.len() as u32,
335 entries.as_mut_ptr() as *mut _,
336 ptr::null_mut(),
337 &mut acl_ptr,
338 )
339 };
340
341 if result != ERROR_SUCCESS {
342 return Err(io::Error::from_raw_os_error(result as i32));
343 }
344
345 Ok(Acl { acl_ptr })
346 }
347
348 unsafe fn as_ptr(&self) -> PACL {
349 self.acl_ptr
350 }
351}
352
353impl Drop for Acl {
354 fn drop(&mut self) {
355 if !self.acl_ptr.is_null() {
356 unsafe { LocalFree(self.acl_ptr as *mut _) };
357 }
358 }
359}
360
361struct SecurityDescriptor {
362 descriptor_ptr: PSECURITY_DESCRIPTOR,
363}
364
365impl SecurityDescriptor {
366 fn new() -> io::Result<Self> {
367 let descriptor_ptr = unsafe { LocalAlloc(LPTR, SECURITY_DESCRIPTOR_MIN_LENGTH) };
368 if descriptor_ptr.is_null() {
369 return Err(io::Error::new(
370 io::ErrorKind::Other,
371 "Failed to allocate security descriptor",
372 ));
373 }
374
375 if unsafe {
376 InitializeSecurityDescriptor(descriptor_ptr, SECURITY_DESCRIPTOR_REVISION) == 0
377 } {
378 return Err(io::Error::last_os_error());
379 };
380
381 Ok(SecurityDescriptor { descriptor_ptr })
382 }
383
384 fn set_dacl(&mut self, acl: &Acl) -> io::Result<()> {
385 if unsafe {
386 SetSecurityDescriptorDacl(self.descriptor_ptr, true as i32, acl.as_ptr(), false as i32)
387 == 0
388 } {
389 return Err(io::Error::last_os_error());
390 }
391 Ok(())
392 }
393
394 unsafe fn as_ptr(&self) -> PSECURITY_DESCRIPTOR {
395 self.descriptor_ptr
396 }
397}
398
399impl Drop for SecurityDescriptor {
400 fn drop(&mut self) {
401 if !self.descriptor_ptr.is_null() {
402 unsafe { LocalFree(self.descriptor_ptr) };
403 self.descriptor_ptr = ptr::null_mut();
404 }
405 }
406}
407
408struct InnerAttributes {
409 descriptor: SecurityDescriptor,
410 acl: Acl,
411 attrs: SECURITY_ATTRIBUTES,
412}
413
414impl InnerAttributes {
415 fn empty() -> io::Result<InnerAttributes> {
416 let descriptor = SecurityDescriptor::new()?;
417 let mut attrs = unsafe { mem::zeroed::<SECURITY_ATTRIBUTES>() };
418 attrs.nLength = mem::size_of::<SECURITY_ATTRIBUTES>() as u32;
419 attrs.lpSecurityDescriptor = unsafe { descriptor.as_ptr() };
420 attrs.bInheritHandle = false as i32;
421
422 let acl = Acl::empty().expect("this should never fail");
423
424 Ok(InnerAttributes {
425 acl,
426 descriptor,
427 attrs,
428 })
429 }
430
431 fn allow_everyone(permissions: u32) -> io::Result<InnerAttributes> {
432 let mut attributes = Self::empty()?;
433 let sid = Sid::everyone_sid()?;
434
435 let mut everyone_ace = AceWithSid::new(&sid, TRUSTEE_IS_WELL_KNOWN_GROUP);
436 everyone_ace
437 .set_access_mode(SET_ACCESS)
438 .set_access_permissions(permissions)
439 .allow_inheritance(false as u32);
440
441 let mut entries = vec![everyone_ace];
442 attributes.acl = Acl::new(&mut entries)?;
443 attributes.descriptor.set_dacl(&attributes.acl)?;
444
445 Ok(attributes)
446 }
447
448 unsafe fn as_ptr(&mut self) -> PSECURITY_ATTRIBUTES {
449 &mut self.attrs as *mut _
450 }
451}
452
453#[cfg(test)]
454mod test {
455 use super::SecurityAttributes;
456
457 #[test]
458 fn test_allow_everyone_everything() {
459 SecurityAttributes::allow_everyone_create()
460 .expect("failed to create security attributes that allow everyone to create a pipe");
461 }
462
463 #[test]
464 fn test_allow_eveyone_read_write() {
465 SecurityAttributes::empty()
466 .allow_everyone_connect()
467 .expect("failed to create security attributes that allow everyone to read and write to/from a pipe");
468 }
469}