compio_fs/named_pipe.rs
1//! [Windows named pipes](https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipes).
2//!
3//! The infrastructure of the code comes from tokio.
4
5#[cfg(doc)]
6use std::ptr::null_mut;
7use std::{ffi::OsStr, io, os::windows::io::FromRawHandle, ptr::null};
8
9use compio_buf::{BufResult, IoBuf, IoBufMut};
10use compio_driver::{AsRawFd, RawFd, ToSharedFd, impl_raw_fd, op::ConnectNamedPipe, syscall};
11use compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt};
12use widestring::U16CString;
13use windows_sys::Win32::{
14 Storage::FileSystem::{
15 FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_INBOUND,
16 PIPE_ACCESS_OUTBOUND, WRITE_DAC, WRITE_OWNER,
17 },
18 System::{
19 Pipes::{
20 CreateNamedPipeW, DisconnectNamedPipe, GetNamedPipeInfo, PIPE_ACCEPT_REMOTE_CLIENTS,
21 PIPE_READMODE_BYTE, PIPE_READMODE_MESSAGE, PIPE_REJECT_REMOTE_CLIENTS, PIPE_SERVER_END,
22 PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES,
23 },
24 SystemServices::ACCESS_SYSTEM_SECURITY,
25 },
26};
27
28use crate::{AsyncFd, File, OpenOptions};
29
30/// A [Windows named pipe] server.
31///
32/// Accepting client connections involves creating a server with
33/// [`ServerOptions::create`] and waiting for clients to connect using
34/// [`NamedPipeServer::connect`].
35///
36/// To avoid having clients sporadically fail with
37/// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
38/// ensure that at least one server instance is available at all times. This
39/// means that the typical listen loop for a server is a bit involved, because
40/// we have to ensure that we never drop a server accidentally while a client
41/// might connect.
42///
43/// So a correctly implemented server looks like this:
44///
45/// ```no_run
46/// use std::io;
47///
48/// use compio_fs::named_pipe::ServerOptions;
49///
50/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
51///
52/// # fn main() -> std::io::Result<()> {
53/// // The first server needs to be constructed early so that clients can
54/// // be correctly connected. Otherwise calling .wait will cause the client to
55/// // error.
56/// //
57/// // Here we also make use of `first_pipe_instance`, which will ensure that
58/// // there are no other servers up and running already.
59/// let mut server = ServerOptions::new()
60/// .first_pipe_instance(true)
61/// .create(PIPE_NAME)?;
62///
63/// // Spawn the server loop.
64/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
65/// loop {
66/// // Wait for a client to connect.
67/// let connected = server.connect().await?;
68///
69/// // Construct the next server to be connected before sending the one
70/// // we already have of onto a task. This ensures that the server
71/// // isn't closed (after it's done in the task) before a new one is
72/// // available. Otherwise the client might error with
73/// // `io::ErrorKind::NotFound`.
74/// server = ServerOptions::new().create(PIPE_NAME)?;
75///
76/// let client = compio_runtime::spawn(async move {
77/// // use the connected client
78/// # Ok::<_, std::io::Error>(())
79/// });
80/// # if true { break } // needed for type inference to work
81/// }
82/// # Ok::<_, io::Error>(())
83/// # })
84/// # }
85/// ```
86///
87/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
88#[derive(Debug, Clone)]
89pub struct NamedPipeServer {
90 handle: AsyncFd<std::fs::File>,
91}
92
93impl NamedPipeServer {
94 /// Retrieves information about the named pipe the server is associated
95 /// with.
96 ///
97 /// ```no_run
98 /// use compio_fs::named_pipe::{PipeEnd, PipeMode, ServerOptions};
99 ///
100 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-server-info";
101 ///
102 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
103 /// let server = ServerOptions::new()
104 /// .pipe_mode(PipeMode::Message)
105 /// .max_instances(5)
106 /// .create(PIPE_NAME)?;
107 ///
108 /// let server_info = server.info()?;
109 ///
110 /// assert_eq!(server_info.end, PipeEnd::Server);
111 /// assert_eq!(server_info.mode, PipeMode::Message);
112 /// assert_eq!(server_info.max_instances, 5);
113 /// # std::io::Result::Ok(()) });
114 /// ```
115 pub fn info(&self) -> io::Result<PipeInfo> {
116 // Safety: we're ensuring the lifetime of the named pipe.
117 unsafe { named_pipe_info(self.as_raw_fd()) }
118 }
119
120 /// Enables a named pipe server process to wait for a client process to
121 /// connect to an instance of a named pipe. A client process connects by
122 /// creating a named pipe with the same name.
123 ///
124 /// This corresponds to the [`ConnectNamedPipe`] system call.
125 ///
126 /// # Example
127 ///
128 /// ```no_run
129 /// use compio_fs::named_pipe::ServerOptions;
130 ///
131 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
132 ///
133 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
134 /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
135 ///
136 /// // Wait for a client to connect.
137 /// pipe.connect().await?;
138 ///
139 /// // Use the connected client...
140 /// # std::io::Result::Ok(()) });
141 /// ```
142 pub async fn connect(&self) -> io::Result<()> {
143 let op = ConnectNamedPipe::new(self.handle.to_shared_fd());
144 compio_runtime::submit(op).await.0?;
145 Ok(())
146 }
147
148 /// Disconnects the server end of a named pipe instance from a client
149 /// process.
150 ///
151 /// ```
152 /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
153 /// use compio_io::AsyncWrite;
154 /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
155 ///
156 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-disconnect";
157 ///
158 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
159 /// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
160 ///
161 /// let mut client = ClientOptions::new().open(PIPE_NAME).await.unwrap();
162 ///
163 /// // Wait for a client to become connected.
164 /// server.connect().await.unwrap();
165 ///
166 /// // Forcibly disconnect the client.
167 /// server.disconnect().unwrap();
168 ///
169 /// // Write fails with an OS-specific error after client has been
170 /// // disconnected.
171 /// let e = client.write("ping").await.0.unwrap();
172 /// assert_eq!(e, 0);
173 /// # })
174 /// ```
175 pub fn disconnect(&self) -> io::Result<()> {
176 syscall!(BOOL, DisconnectNamedPipe(self.as_raw_fd() as _))?;
177 Ok(())
178 }
179}
180
181impl AsyncRead for NamedPipeServer {
182 #[inline]
183 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
184 (&*self).read(buf).await
185 }
186}
187
188impl AsyncRead for &NamedPipeServer {
189 #[inline]
190 async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
191 (&self.handle).read(buffer).await
192 }
193}
194
195impl AsyncWrite for NamedPipeServer {
196 #[inline]
197 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
198 (&*self).write(buf).await
199 }
200
201 #[inline]
202 async fn flush(&mut self) -> io::Result<()> {
203 (&*self).flush().await
204 }
205
206 #[inline]
207 async fn shutdown(&mut self) -> io::Result<()> {
208 (&*self).shutdown().await
209 }
210}
211
212impl AsyncWrite for &NamedPipeServer {
213 #[inline]
214 async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
215 (&self.handle).write(buffer).await
216 }
217
218 #[inline]
219 async fn flush(&mut self) -> io::Result<()> {
220 Ok(())
221 }
222
223 #[inline]
224 async fn shutdown(&mut self) -> io::Result<()> {
225 Ok(())
226 }
227}
228
229impl_raw_fd!(NamedPipeServer, std::fs::File, handle, file);
230
231/// A [Windows named pipe] client.
232///
233/// Constructed using [`ClientOptions::open`].
234///
235/// Connecting a client correctly involves a few steps. When connecting through
236/// [`ClientOptions::open`], it might error indicating one of two things:
237///
238/// * [`std::io::ErrorKind::NotFound`] - There is no server available.
239/// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
240/// for a while and try again.
241///
242/// So a correctly implemented client looks like this:
243///
244/// ```no_run
245/// use std::time::Duration;
246///
247/// use compio_fs::named_pipe::ClientOptions;
248/// use compio_runtime::time;
249/// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
250///
251/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
252///
253/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
254/// let client = loop {
255/// match ClientOptions::new().open(PIPE_NAME).await {
256/// Ok(client) => break client,
257/// Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
258/// Err(e) => return Err(e),
259/// }
260///
261/// time::sleep(Duration::from_millis(50)).await;
262/// };
263///
264/// // use the connected client
265/// # Ok(()) });
266/// ```
267///
268/// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
269/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
270#[derive(Debug, Clone)]
271pub struct NamedPipeClient {
272 handle: File,
273}
274
275impl NamedPipeClient {
276 /// Retrieves information about the named pipe the client is associated
277 /// with.
278 ///
279 /// ```no_run
280 /// use compio_fs::named_pipe::{ClientOptions, PipeEnd, PipeMode};
281 ///
282 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-client-info";
283 ///
284 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
285 /// let client = ClientOptions::new().open(PIPE_NAME).await?;
286 ///
287 /// let client_info = client.info()?;
288 ///
289 /// assert_eq!(client_info.end, PipeEnd::Client);
290 /// assert_eq!(client_info.mode, PipeMode::Message);
291 /// assert_eq!(client_info.max_instances, 5);
292 /// # std::io::Result::Ok(()) });
293 /// ```
294 pub fn info(&self) -> io::Result<PipeInfo> {
295 // Safety: we're ensuring the lifetime of the named pipe.
296 unsafe { named_pipe_info(self.as_raw_fd()) }
297 }
298}
299
300impl AsyncRead for NamedPipeClient {
301 #[inline]
302 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
303 (&*self).read(buf).await
304 }
305}
306
307impl AsyncRead for &NamedPipeClient {
308 #[inline]
309 async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
310 // The position is ignored.
311 self.handle.read_at(buffer, 0).await
312 }
313}
314
315impl AsyncWrite for NamedPipeClient {
316 #[inline]
317 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
318 (&*self).write(buf).await
319 }
320
321 #[inline]
322 async fn flush(&mut self) -> io::Result<()> {
323 (&*self).flush().await
324 }
325
326 #[inline]
327 async fn shutdown(&mut self) -> io::Result<()> {
328 (&*self).shutdown().await
329 }
330}
331
332impl AsyncWrite for &NamedPipeClient {
333 #[inline]
334 async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
335 // The position is ignored.
336 (&self.handle).write_at(buffer, 0).await
337 }
338
339 #[inline]
340 async fn flush(&mut self) -> io::Result<()> {
341 Ok(())
342 }
343
344 #[inline]
345 async fn shutdown(&mut self) -> io::Result<()> {
346 Ok(())
347 }
348}
349
350impl_raw_fd!(NamedPipeClient, std::fs::File, handle, file);
351
352/// A builder structure for construct a named pipe with named pipe-specific
353/// options. This is required to use for named pipe servers who wants to modify
354/// pipe-related options.
355///
356/// See [`ServerOptions::create`].
357#[derive(Debug, Clone)]
358pub struct ServerOptions {
359 // dwOpenMode
360 access_inbound: bool,
361 access_outbound: bool,
362 first_pipe_instance: bool,
363 write_dac: bool,
364 write_owner: bool,
365 access_system_security: bool,
366 // dwPipeMode
367 pipe_mode: PipeMode,
368 reject_remote_clients: bool,
369 // other options
370 max_instances: u32,
371 out_buffer_size: u32,
372 in_buffer_size: u32,
373 default_timeout: u32,
374}
375
376impl ServerOptions {
377 /// Creates a new named pipe builder with the default settings.
378 ///
379 /// ```
380 /// use compio_fs::named_pipe::ServerOptions;
381 ///
382 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-new";
383 ///
384 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
385 /// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
386 /// # })
387 /// ```
388 pub fn new() -> ServerOptions {
389 ServerOptions {
390 access_inbound: true,
391 access_outbound: true,
392 first_pipe_instance: false,
393 write_dac: false,
394 write_owner: false,
395 access_system_security: false,
396 pipe_mode: PipeMode::Byte,
397 reject_remote_clients: true,
398 max_instances: PIPE_UNLIMITED_INSTANCES,
399 out_buffer_size: 65536,
400 in_buffer_size: 65536,
401 default_timeout: 0,
402 }
403 }
404
405 /// The pipe mode.
406 ///
407 /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
408 /// documentation of what each mode means.
409 ///
410 /// This corresponds to specifying `PIPE_TYPE_` and `PIPE_READMODE_` in
411 /// [`dwPipeMode`].
412 ///
413 /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
414 pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
415 self.pipe_mode = pipe_mode;
416 self
417 }
418
419 /// The flow of data in the pipe goes from client to server only.
420 ///
421 /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
422 ///
423 /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
424 ///
425 /// # Errors
426 ///
427 /// Server side prevents connecting by denying inbound access, client errors
428 /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
429 /// the connection.
430 ///
431 /// ```
432 /// use std::io;
433 ///
434 /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
435 ///
436 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound-err1";
437 ///
438 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
439 /// let _server = ServerOptions::new()
440 /// .access_inbound(false)
441 /// .create(PIPE_NAME)
442 /// .unwrap();
443 ///
444 /// let e = ClientOptions::new().open(PIPE_NAME).await.unwrap_err();
445 ///
446 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
447 /// # })
448 /// ```
449 ///
450 /// Disabling writing allows a client to connect, but errors with
451 /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
452 ///
453 /// ```
454 /// use std::io;
455 ///
456 /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
457 /// use compio_io::AsyncWrite;
458 ///
459 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound-err2";
460 ///
461 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
462 /// let server = ServerOptions::new()
463 /// .access_inbound(false)
464 /// .create(PIPE_NAME)
465 /// .unwrap();
466 ///
467 /// let mut client = ClientOptions::new()
468 /// .write(false)
469 /// .open(PIPE_NAME)
470 /// .await
471 /// .unwrap();
472 ///
473 /// server.connect().await.unwrap();
474 ///
475 /// let e = client.write("ping").await.0.unwrap_err();
476 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
477 /// # })
478 /// ```
479 ///
480 /// # Examples
481 ///
482 /// A unidirectional named pipe that only supports server-to-client
483 /// communication.
484 ///
485 /// ```
486 /// use std::io;
487 ///
488 /// use compio_buf::BufResult;
489 /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
490 /// use compio_io::{AsyncReadExt, AsyncWriteExt};
491 ///
492 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound";
493 ///
494 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
495 /// let mut server = ServerOptions::new()
496 /// .access_inbound(false)
497 /// .create(PIPE_NAME)
498 /// .unwrap();
499 ///
500 /// let mut client = ClientOptions::new()
501 /// .write(false)
502 /// .open(PIPE_NAME)
503 /// .await
504 /// .unwrap();
505 ///
506 /// server.connect().await.unwrap();
507 ///
508 /// let write = server.write_all("ping");
509 ///
510 /// let buf = Vec::with_capacity(4);
511 /// let read = client.read_exact(buf);
512 ///
513 /// let (BufResult(write, _), BufResult(read, buf)) = futures_util::join!(write, read);
514 /// write.unwrap();
515 /// read.unwrap();
516 ///
517 /// assert_eq!(&buf[..], b"ping");
518 /// # })
519 /// ```
520 pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
521 self.access_inbound = allowed;
522 self
523 }
524
525 /// The flow of data in the pipe goes from server to client only.
526 ///
527 /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
528 ///
529 /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
530 ///
531 /// # Errors
532 ///
533 /// Server side prevents connecting by denying outbound access, client
534 /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
535 /// create the connection.
536 ///
537 /// ```
538 /// use std::io;
539 ///
540 /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
541 ///
542 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound-err1";
543 ///
544 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
545 /// let server = ServerOptions::new()
546 /// .access_outbound(false)
547 /// .create(PIPE_NAME)
548 /// .unwrap();
549 ///
550 /// let e = ClientOptions::new().open(PIPE_NAME).await.unwrap_err();
551 ///
552 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
553 /// # })
554 /// ```
555 ///
556 /// Disabling reading allows a client to connect, but attempting to read
557 /// will error with [`std::io::ErrorKind::PermissionDenied`].
558 ///
559 /// ```
560 /// use std::io;
561 ///
562 /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
563 /// use compio_io::AsyncRead;
564 ///
565 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound-err2";
566 ///
567 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
568 /// let server = ServerOptions::new()
569 /// .access_outbound(false)
570 /// .create(PIPE_NAME)
571 /// .unwrap();
572 ///
573 /// let mut client = ClientOptions::new()
574 /// .read(false)
575 /// .open(PIPE_NAME)
576 /// .await
577 /// .unwrap();
578 ///
579 /// server.connect().await.unwrap();
580 ///
581 /// let buf = Vec::with_capacity(4);
582 /// let e = client.read(buf).await.0.unwrap_err();
583 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
584 /// # })
585 /// ```
586 ///
587 /// # Examples
588 ///
589 /// A unidirectional named pipe that only supports client-to-server
590 /// communication.
591 ///
592 /// ```
593 /// use compio_buf::BufResult;
594 /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
595 /// use compio_io::{AsyncReadExt, AsyncWriteExt};
596 ///
597 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound";
598 ///
599 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
600 /// let mut server = ServerOptions::new()
601 /// .access_outbound(false)
602 /// .create(PIPE_NAME)
603 /// .unwrap();
604 ///
605 /// let mut client = ClientOptions::new()
606 /// .read(false)
607 /// .open(PIPE_NAME)
608 /// .await
609 /// .unwrap();
610 ///
611 /// server.connect().await.unwrap();
612 ///
613 /// let write = client.write_all("ping");
614 ///
615 /// let buf = Vec::with_capacity(4);
616 /// let read = server.read_exact(buf);
617 ///
618 /// let (BufResult(write, _), BufResult(read, buf)) = futures_util::join!(write, read);
619 /// write.unwrap();
620 /// read.unwrap();
621 ///
622 /// println!("done reading and writing");
623 ///
624 /// assert_eq!(&buf[..], b"ping");
625 /// # })
626 /// ```
627 pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
628 self.access_outbound = allowed;
629 self
630 }
631
632 /// If you attempt to create multiple instances of a pipe with this flag
633 /// set, creation of the first server instance succeeds, but creation of any
634 /// subsequent instances will fail with
635 /// [`std::io::ErrorKind::PermissionDenied`].
636 ///
637 /// This option is intended to be used with servers that want to ensure that
638 /// they are the only process listening for clients on a given named pipe.
639 /// This is accomplished by enabling it for the first server instance
640 /// created in a process.
641 ///
642 /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
643 ///
644 /// # Errors
645 ///
646 /// If this option is set and more than one instance of the server for a
647 /// given named pipe exists, calling [`create`] will fail with
648 /// [`std::io::ErrorKind::PermissionDenied`].
649 ///
650 /// ```
651 /// use std::io;
652 ///
653 /// use compio_fs::named_pipe::ServerOptions;
654 ///
655 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-first-instance-error";
656 ///
657 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
658 /// let server1 = ServerOptions::new()
659 /// .first_pipe_instance(true)
660 /// .create(PIPE_NAME)
661 /// .unwrap();
662 ///
663 /// // Second server errs, since it's not the first instance.
664 /// let e = ServerOptions::new()
665 /// .first_pipe_instance(true)
666 /// .create(PIPE_NAME)
667 /// .unwrap_err();
668 ///
669 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
670 /// # })
671 /// ```
672 ///
673 /// # Examples
674 ///
675 /// ```
676 /// use std::io;
677 ///
678 /// use compio_fs::named_pipe::ServerOptions;
679 ///
680 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-first-instance";
681 ///
682 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
683 /// let mut builder = ServerOptions::new();
684 /// builder.first_pipe_instance(true);
685 ///
686 /// let server = builder.create(PIPE_NAME).unwrap();
687 /// let e = builder.create(PIPE_NAME).unwrap_err();
688 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
689 /// drop(server);
690 ///
691 /// // OK: since, we've closed the other instance.
692 /// let _server2 = builder.create(PIPE_NAME).unwrap();
693 /// # })
694 /// ```
695 ///
696 /// [`create`]: ServerOptions::create
697 /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
698 pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
699 self.first_pipe_instance = first;
700 self
701 }
702
703 /// Requests permission to modify the pipe's discretionary access control
704 /// list.
705 ///
706 /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
707 ///
708 /// # Examples
709 ///
710 /// ```
711 /// use std::{io, ptr};
712 ///
713 /// use compio_driver::AsRawFd;
714 /// use compio_fs::named_pipe::ServerOptions;
715 /// use windows_sys::Win32::{
716 /// Foundation::ERROR_SUCCESS,
717 /// Security::{
718 /// Authorization::{SE_KERNEL_OBJECT, SetSecurityInfo},
719 /// DACL_SECURITY_INFORMATION,
720 /// },
721 /// };
722 ///
723 /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
724 ///
725 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
726 /// let mut pipe_template = ServerOptions::new();
727 /// pipe_template.write_dac(true);
728 /// let pipe = pipe_template.create(PIPE_NAME).unwrap();
729 ///
730 /// unsafe {
731 /// assert_eq!(
732 /// ERROR_SUCCESS,
733 /// SetSecurityInfo(
734 /// pipe.as_raw_fd() as _,
735 /// SE_KERNEL_OBJECT,
736 /// DACL_SECURITY_INFORMATION,
737 /// ptr::null_mut(),
738 /// ptr::null_mut(),
739 /// ptr::null_mut(),
740 /// ptr::null_mut(),
741 /// )
742 /// );
743 /// }
744 ///
745 /// # })
746 /// ```
747 /// ```
748 /// use std::{io, ptr};
749 ///
750 /// use compio_driver::AsRawFd;
751 /// use compio_fs::named_pipe::ServerOptions;
752 /// use windows_sys::Win32::{
753 /// Foundation::ERROR_ACCESS_DENIED,
754 /// Security::{
755 /// Authorization::{SE_KERNEL_OBJECT, SetSecurityInfo},
756 /// DACL_SECURITY_INFORMATION,
757 /// },
758 /// };
759 ///
760 /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
761 ///
762 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
763 /// let mut pipe_template = ServerOptions::new();
764 /// pipe_template.write_dac(false);
765 /// let pipe = pipe_template.create(PIPE_NAME).unwrap();
766 ///
767 /// unsafe {
768 /// assert_eq!(
769 /// ERROR_ACCESS_DENIED,
770 /// SetSecurityInfo(
771 /// pipe.as_raw_fd() as _,
772 /// SE_KERNEL_OBJECT,
773 /// DACL_SECURITY_INFORMATION,
774 /// ptr::null_mut(),
775 /// ptr::null_mut(),
776 /// ptr::null_mut(),
777 /// ptr::null_mut(),
778 /// )
779 /// );
780 /// }
781 ///
782 /// # })
783 /// ```
784 ///
785 /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
786 pub fn write_dac(&mut self, requested: bool) -> &mut Self {
787 self.write_dac = requested;
788 self
789 }
790
791 /// Requests permission to modify the pipe's owner.
792 ///
793 /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
794 ///
795 /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
796 pub fn write_owner(&mut self, requested: bool) -> &mut Self {
797 self.write_owner = requested;
798 self
799 }
800
801 /// Requests permission to modify the pipe's system access control list.
802 ///
803 /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
804 ///
805 /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
806 pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
807 self.access_system_security = requested;
808 self
809 }
810
811 /// Indicates whether this server can accept remote clients or not. Remote
812 /// clients are disabled by default.
813 ///
814 /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
815 ///
816 /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
817 pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
818 self.reject_remote_clients = reject;
819 self
820 }
821
822 /// The maximum number of instances that can be created for this pipe. The
823 /// first instance of the pipe can specify this value; the same number must
824 /// be specified for other instances of the pipe. Acceptable values are in
825 /// the range 1 through 254. The default value is unlimited.
826 ///
827 /// This corresponds to specifying [`nMaxInstances`].
828 ///
829 /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
830 ///
831 /// # Errors
832 ///
833 /// The same numbers of `max_instances` have to be used by all servers. Any
834 /// additional servers trying to be built which uses a mismatching value
835 /// might error.
836 ///
837 /// ```
838 /// use std::io;
839 ///
840 /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
841 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
842 ///
843 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-max-instances";
844 ///
845 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
846 /// let mut server = ServerOptions::new();
847 /// server.max_instances(2);
848 ///
849 /// let s1 = server.create(PIPE_NAME).unwrap();
850 /// let c1 = ClientOptions::new().open(PIPE_NAME).await.unwrap();
851 ///
852 /// let s2 = server.create(PIPE_NAME).unwrap();
853 /// let c2 = ClientOptions::new().open(PIPE_NAME).await.unwrap();
854 ///
855 /// // Too many servers!
856 /// let e = server.create(PIPE_NAME).unwrap_err();
857 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
858 ///
859 /// // Still too many servers even if we specify a higher value!
860 /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
861 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
862 /// # })
863 /// ```
864 ///
865 /// # Panics
866 ///
867 /// This function will panic if more than 254 instances are specified. If
868 /// you do not wish to set an instance limit, leave it unspecified.
869 ///
870 /// ```should_panic
871 /// use compio_fs::named_pipe::ServerOptions;
872 ///
873 /// let builder = ServerOptions::new().max_instances(255);
874 /// ```
875 #[track_caller]
876 pub fn max_instances(&mut self, instances: usize) -> &mut Self {
877 assert!(instances < 255, "cannot specify more than 254 instances");
878 self.max_instances = instances as u32;
879 self
880 }
881
882 /// The number of bytes to reserve for the output buffer.
883 ///
884 /// This corresponds to specifying [`nOutBufferSize`].
885 ///
886 /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
887 pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
888 self.out_buffer_size = buffer;
889 self
890 }
891
892 /// The number of bytes to reserve for the input buffer.
893 ///
894 /// This corresponds to specifying [`nInBufferSize`].
895 ///
896 /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
897 pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
898 self.in_buffer_size = buffer;
899 self
900 }
901
902 /// Creates the named pipe identified by `addr` for use as a server.
903 ///
904 /// This uses the [`CreateNamedPipe`] function.
905 ///
906 /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
907 ///
908 /// # Examples
909 ///
910 /// ```
911 /// use compio_fs::named_pipe::ServerOptions;
912 ///
913 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-create";
914 ///
915 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
916 /// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
917 /// # })
918 /// ```
919 pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
920 let addr = U16CString::from_os_str(addr)
921 .map_err(|e| io::Error::new(std::io::ErrorKind::InvalidData, e))?;
922
923 let pipe_mode = {
924 let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
925 PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE
926 } else {
927 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE
928 };
929 if self.reject_remote_clients {
930 mode |= PIPE_REJECT_REMOTE_CLIENTS;
931 } else {
932 mode |= PIPE_ACCEPT_REMOTE_CLIENTS;
933 }
934 mode
935 };
936 let open_mode = {
937 let mut mode = FILE_FLAG_OVERLAPPED;
938 if self.access_inbound {
939 mode |= PIPE_ACCESS_INBOUND;
940 }
941 if self.access_outbound {
942 mode |= PIPE_ACCESS_OUTBOUND;
943 }
944 if self.first_pipe_instance {
945 mode |= FILE_FLAG_FIRST_PIPE_INSTANCE;
946 }
947 if self.write_dac {
948 mode |= WRITE_DAC;
949 }
950 if self.write_owner {
951 mode |= WRITE_OWNER;
952 }
953 if self.access_system_security {
954 mode |= ACCESS_SYSTEM_SECURITY;
955 }
956 mode
957 };
958
959 let h = syscall!(
960 HANDLE,
961 CreateNamedPipeW(
962 addr.as_ptr(),
963 open_mode,
964 pipe_mode,
965 self.max_instances,
966 self.out_buffer_size,
967 self.in_buffer_size,
968 self.default_timeout,
969 null(),
970 )
971 )?;
972
973 Ok(NamedPipeServer {
974 handle: AsyncFd::new(unsafe { std::fs::File::from_raw_handle(h as _) })?,
975 })
976 }
977}
978
979impl Default for ServerOptions {
980 fn default() -> Self {
981 Self::new()
982 }
983}
984
985/// A builder suitable for building and interacting with named pipes from the
986/// client side.
987///
988/// See [`ClientOptions::open`].
989#[derive(Debug, Clone)]
990pub struct ClientOptions {
991 options: OpenOptions,
992 pipe_mode: PipeMode,
993}
994
995impl ClientOptions {
996 /// Creates a new named pipe builder with the default settings.
997 ///
998 /// ```
999 /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
1000 ///
1001 /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-client-new";
1002 ///
1003 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
1004 /// // Server must be created in order for the client creation to succeed.
1005 /// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
1006 /// let client = ClientOptions::new().open(PIPE_NAME).await.unwrap();
1007 /// # })
1008 /// ```
1009 pub fn new() -> Self {
1010 use windows_sys::Win32::Storage::FileSystem::SECURITY_IDENTIFICATION;
1011
1012 let mut options = OpenOptions::new();
1013 options
1014 .read(true)
1015 .write(true)
1016 .security_qos_flags(SECURITY_IDENTIFICATION);
1017 Self {
1018 options,
1019 pipe_mode: PipeMode::Byte,
1020 }
1021 }
1022
1023 /// If the client supports reading data. This is enabled by default.
1024 pub fn read(&mut self, allowed: bool) -> &mut Self {
1025 self.options.read(allowed);
1026 self
1027 }
1028
1029 /// If the created pipe supports writing data. This is enabled by default.
1030 pub fn write(&mut self, allowed: bool) -> &mut Self {
1031 self.options.write(allowed);
1032 self
1033 }
1034
1035 /// Sets qos flags which are combined with other flags and attributes in the
1036 /// call to [`CreateFile`].
1037 ///
1038 /// When `security_qos_flags` is not set, a malicious program can gain the
1039 /// elevated privileges of a privileged Rust process when it allows opening
1040 /// user-specified paths, by tricking it into opening a named pipe. So
1041 /// arguably `security_qos_flags` should also be set when opening arbitrary
1042 /// paths. However the bits can then conflict with other flags, specifically
1043 /// `FILE_FLAG_OPEN_NO_RECALL`.
1044 ///
1045 /// For information about possible values, see [Impersonation Levels] on the
1046 /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
1047 /// automatically when using this method.
1048 ///
1049 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
1050 /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
1051 /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
1052 pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
1053 self.options.security_qos_flags(flags);
1054 self
1055 }
1056
1057 /// The pipe mode.
1058 ///
1059 /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1060 /// documentation of what each mode means.
1061 pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1062 self.pipe_mode = pipe_mode;
1063 self
1064 }
1065
1066 /// Opens the named pipe identified by `addr`.
1067 ///
1068 /// This opens the client using [`CreateFile`] with the
1069 /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
1070 ///
1071 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
1072 ///
1073 /// # Errors
1074 ///
1075 /// There are a few errors you need to take into account when creating a
1076 /// named pipe on the client side:
1077 ///
1078 /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
1079 /// does not exist. Presumably the server is not up.
1080 /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
1081 /// but the server is not currently waiting for a connection. Please see
1082 /// the examples for how to check for this error.
1083 ///
1084 /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
1085 ///
1086 /// A connect loop that waits until a pipe becomes available looks like
1087 /// this:
1088 ///
1089 /// ```no_run
1090 /// use std::time::Duration;
1091 ///
1092 /// use compio_fs::named_pipe::ClientOptions;
1093 /// use compio_runtime::time;
1094 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
1095 ///
1096 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
1097 ///
1098 /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
1099 /// let client = loop {
1100 /// match ClientOptions::new().open(PIPE_NAME).await {
1101 /// Ok(client) => break client,
1102 /// Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
1103 /// Err(e) => return Err(e),
1104 /// }
1105 ///
1106 /// time::sleep(Duration::from_millis(50)).await;
1107 /// };
1108 ///
1109 /// // use the connected client.
1110 /// # Ok(()) });
1111 /// ```
1112 pub async fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
1113 use windows_sys::Win32::System::Pipes::SetNamedPipeHandleState;
1114
1115 let file = self.options.open(addr.as_ref()).await?;
1116
1117 if matches!(self.pipe_mode, PipeMode::Message) {
1118 let mode = PIPE_READMODE_MESSAGE;
1119 syscall!(
1120 BOOL,
1121 SetNamedPipeHandleState(file.as_raw_fd() as _, &mode, null(), null())
1122 )?;
1123 }
1124
1125 Ok(NamedPipeClient { handle: file })
1126 }
1127}
1128
1129impl Default for ClientOptions {
1130 fn default() -> Self {
1131 Self::new()
1132 }
1133}
1134
1135/// The pipe mode of a named pipe.
1136///
1137/// Set through [`ServerOptions::pipe_mode`].
1138#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1139pub enum PipeMode {
1140 /// Data is written to the pipe as a stream of bytes. The pipe does not
1141 /// distinguish bytes written during different write operations.
1142 ///
1143 /// Corresponds to [`PIPE_TYPE_BYTE`].
1144 ///
1145 /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
1146 Byte,
1147 /// Data is written to the pipe as a stream of messages. The pipe treats the
1148 /// bytes written during each write operation as a message unit. Any reading
1149 /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
1150 /// completely.
1151 ///
1152 /// Corresponds to [`PIPE_TYPE_MESSAGE`].
1153 ///
1154 /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
1155 /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
1156 Message,
1157}
1158
1159/// Indicates the end of a named pipe.
1160#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1161pub enum PipeEnd {
1162 /// The named pipe refers to the client end of a named pipe instance.
1163 ///
1164 /// Corresponds to [`PIPE_CLIENT_END`].
1165 ///
1166 /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
1167 Client,
1168 /// The named pipe refers to the server end of a named pipe instance.
1169 ///
1170 /// Corresponds to [`PIPE_SERVER_END`].
1171 ///
1172 /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
1173 Server,
1174}
1175
1176/// Information about a named pipe.
1177///
1178/// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
1179#[derive(Debug)]
1180pub struct PipeInfo {
1181 /// Indicates the mode of a named pipe.
1182 pub mode: PipeMode,
1183 /// Indicates the end of a named pipe.
1184 pub end: PipeEnd,
1185 /// The maximum number of instances that can be created for this pipe.
1186 pub max_instances: u32,
1187 /// The number of bytes to reserve for the output buffer.
1188 pub out_buffer_size: u32,
1189 /// The number of bytes to reserve for the input buffer.
1190 pub in_buffer_size: u32,
1191}
1192
1193/// Internal function to get the info out of a raw named pipe.
1194unsafe fn named_pipe_info(handle: RawFd) -> io::Result<PipeInfo> {
1195 let mut flags = 0;
1196 let mut out_buffer_size = 0;
1197 let mut in_buffer_size = 0;
1198 let mut max_instances = 0;
1199
1200 syscall!(
1201 BOOL,
1202 GetNamedPipeInfo(
1203 handle as _,
1204 &mut flags,
1205 &mut out_buffer_size,
1206 &mut in_buffer_size,
1207 &mut max_instances,
1208 )
1209 )?;
1210
1211 let mut end = PipeEnd::Client;
1212 let mut mode = PipeMode::Byte;
1213
1214 if flags & PIPE_SERVER_END != 0 {
1215 end = PipeEnd::Server;
1216 }
1217
1218 if flags & PIPE_TYPE_MESSAGE != 0 {
1219 mode = PipeMode::Message;
1220 }
1221
1222 Ok(PipeInfo {
1223 end,
1224 mode,
1225 out_buffer_size,
1226 in_buffer_size,
1227 max_instances,
1228 })
1229}