compio_fs/
named_pipe.rs

1//! [Windows named pipes](https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipes).
2//!
3//! The infrastructure of the code comes from tokio.
4
5#[cfg(doc)]
6use std::ptr::null_mut;
7use std::{ffi::OsStr, io, os::windows::io::FromRawHandle, ptr::null};
8
9use compio_buf::{BufResult, IoBuf, IoBufMut};
10use compio_driver::{AsRawFd, RawFd, ToSharedFd, impl_raw_fd, op::ConnectNamedPipe, syscall};
11use compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt};
12use widestring::U16CString;
13use windows_sys::Win32::{
14    Storage::FileSystem::{
15        FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_INBOUND,
16        PIPE_ACCESS_OUTBOUND, WRITE_DAC, WRITE_OWNER,
17    },
18    System::{
19        Pipes::{
20            CreateNamedPipeW, DisconnectNamedPipe, GetNamedPipeInfo, PIPE_ACCEPT_REMOTE_CLIENTS,
21            PIPE_READMODE_BYTE, PIPE_READMODE_MESSAGE, PIPE_REJECT_REMOTE_CLIENTS, PIPE_SERVER_END,
22            PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES,
23        },
24        SystemServices::ACCESS_SYSTEM_SECURITY,
25    },
26};
27
28use crate::{AsyncFd, File, OpenOptions};
29
30/// A [Windows named pipe] server.
31///
32/// Accepting client connections involves creating a server with
33/// [`ServerOptions::create`] and waiting for clients to connect using
34/// [`NamedPipeServer::connect`].
35///
36/// To avoid having clients sporadically fail with
37/// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
38/// ensure that at least one server instance is available at all times. This
39/// means that the typical listen loop for a server is a bit involved, because
40/// we have to ensure that we never drop a server accidentally while a client
41/// might connect.
42///
43/// So a correctly implemented server looks like this:
44///
45/// ```no_run
46/// use std::io;
47///
48/// use compio_fs::named_pipe::ServerOptions;
49///
50/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
51///
52/// # fn main() -> std::io::Result<()> {
53/// // The first server needs to be constructed early so that clients can
54/// // be correctly connected. Otherwise calling .wait will cause the client to
55/// // error.
56/// //
57/// // Here we also make use of `first_pipe_instance`, which will ensure that
58/// // there are no other servers up and running already.
59/// let mut server = ServerOptions::new()
60///     .first_pipe_instance(true)
61///     .create(PIPE_NAME)?;
62///
63/// // Spawn the server loop.
64/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
65/// loop {
66///     // Wait for a client to connect.
67///     let connected = server.connect().await?;
68///
69///     // Construct the next server to be connected before sending the one
70///     // we already have of onto a task. This ensures that the server
71///     // isn't closed (after it's done in the task) before a new one is
72///     // available. Otherwise the client might error with
73///     // `io::ErrorKind::NotFound`.
74///     server = ServerOptions::new().create(PIPE_NAME)?;
75///
76///     let client = compio_runtime::spawn(async move {
77///         // use the connected client
78/// #       Ok::<_, std::io::Error>(())
79///     });
80/// # if true { break } // needed for type inference to work
81/// }
82/// # Ok::<_, io::Error>(())
83/// # })
84/// # }
85/// ```
86///
87/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
88#[derive(Debug, Clone)]
89pub struct NamedPipeServer {
90    handle: AsyncFd<std::fs::File>,
91}
92
93impl NamedPipeServer {
94    /// Retrieves information about the named pipe the server is associated
95    /// with.
96    ///
97    /// ```no_run
98    /// use compio_fs::named_pipe::{PipeEnd, PipeMode, ServerOptions};
99    ///
100    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-server-info";
101    ///
102    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
103    /// let server = ServerOptions::new()
104    ///     .pipe_mode(PipeMode::Message)
105    ///     .max_instances(5)
106    ///     .create(PIPE_NAME)?;
107    ///
108    /// let server_info = server.info()?;
109    ///
110    /// assert_eq!(server_info.end, PipeEnd::Server);
111    /// assert_eq!(server_info.mode, PipeMode::Message);
112    /// assert_eq!(server_info.max_instances, 5);
113    /// # std::io::Result::Ok(()) });
114    /// ```
115    pub fn info(&self) -> io::Result<PipeInfo> {
116        // Safety: we're ensuring the lifetime of the named pipe.
117        unsafe { named_pipe_info(self.as_raw_fd()) }
118    }
119
120    /// Enables a named pipe server process to wait for a client process to
121    /// connect to an instance of a named pipe. A client process connects by
122    /// creating a named pipe with the same name.
123    ///
124    /// This corresponds to the [`ConnectNamedPipe`] system call.
125    ///
126    /// # Example
127    ///
128    /// ```no_run
129    /// use compio_fs::named_pipe::ServerOptions;
130    ///
131    /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
132    ///
133    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
134    /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
135    ///
136    /// // Wait for a client to connect.
137    /// pipe.connect().await?;
138    ///
139    /// // Use the connected client...
140    /// # std::io::Result::Ok(()) });
141    /// ```
142    pub async fn connect(&self) -> io::Result<()> {
143        let op = ConnectNamedPipe::new(self.handle.to_shared_fd());
144        compio_runtime::submit(op).await.0?;
145        Ok(())
146    }
147
148    /// Disconnects the server end of a named pipe instance from a client
149    /// process.
150    ///
151    /// ```
152    /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
153    /// use compio_io::AsyncWrite;
154    /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
155    ///
156    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-disconnect";
157    ///
158    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
159    /// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
160    ///
161    /// let mut client = ClientOptions::new().open(PIPE_NAME).await.unwrap();
162    ///
163    /// // Wait for a client to become connected.
164    /// server.connect().await.unwrap();
165    ///
166    /// // Forcibly disconnect the client.
167    /// server.disconnect().unwrap();
168    ///
169    /// // Write fails with an OS-specific error after client has been
170    /// // disconnected.
171    /// let e = client.write("ping").await.0.unwrap();
172    /// assert_eq!(e, 0);
173    /// # })
174    /// ```
175    pub fn disconnect(&self) -> io::Result<()> {
176        syscall!(BOOL, DisconnectNamedPipe(self.as_raw_fd() as _))?;
177        Ok(())
178    }
179}
180
181impl AsyncRead for NamedPipeServer {
182    #[inline]
183    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
184        (&*self).read(buf).await
185    }
186}
187
188impl AsyncRead for &NamedPipeServer {
189    #[inline]
190    async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
191        (&self.handle).read(buffer).await
192    }
193}
194
195impl AsyncWrite for NamedPipeServer {
196    #[inline]
197    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
198        (&*self).write(buf).await
199    }
200
201    #[inline]
202    async fn flush(&mut self) -> io::Result<()> {
203        (&*self).flush().await
204    }
205
206    #[inline]
207    async fn shutdown(&mut self) -> io::Result<()> {
208        (&*self).shutdown().await
209    }
210}
211
212impl AsyncWrite for &NamedPipeServer {
213    #[inline]
214    async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
215        (&self.handle).write(buffer).await
216    }
217
218    #[inline]
219    async fn flush(&mut self) -> io::Result<()> {
220        Ok(())
221    }
222
223    #[inline]
224    async fn shutdown(&mut self) -> io::Result<()> {
225        Ok(())
226    }
227}
228
229impl_raw_fd!(NamedPipeServer, std::fs::File, handle, file);
230
231/// A [Windows named pipe] client.
232///
233/// Constructed using [`ClientOptions::open`].
234///
235/// Connecting a client correctly involves a few steps. When connecting through
236/// [`ClientOptions::open`], it might error indicating one of two things:
237///
238/// * [`std::io::ErrorKind::NotFound`] - There is no server available.
239/// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
240///   for a while and try again.
241///
242/// So a correctly implemented client looks like this:
243///
244/// ```no_run
245/// use std::time::Duration;
246///
247/// use compio_fs::named_pipe::ClientOptions;
248/// use compio_runtime::time;
249/// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
250///
251/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
252///
253/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
254/// let client = loop {
255///     match ClientOptions::new().open(PIPE_NAME).await {
256///         Ok(client) => break client,
257///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
258///         Err(e) => return Err(e),
259///     }
260///
261///     time::sleep(Duration::from_millis(50)).await;
262/// };
263///
264/// // use the connected client
265/// # Ok(()) });
266/// ```
267///
268/// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
269/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
270#[derive(Debug, Clone)]
271pub struct NamedPipeClient {
272    handle: File,
273}
274
275impl NamedPipeClient {
276    /// Retrieves information about the named pipe the client is associated
277    /// with.
278    ///
279    /// ```no_run
280    /// use compio_fs::named_pipe::{ClientOptions, PipeEnd, PipeMode};
281    ///
282    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-client-info";
283    ///
284    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
285    /// let client = ClientOptions::new().open(PIPE_NAME).await?;
286    ///
287    /// let client_info = client.info()?;
288    ///
289    /// assert_eq!(client_info.end, PipeEnd::Client);
290    /// assert_eq!(client_info.mode, PipeMode::Message);
291    /// assert_eq!(client_info.max_instances, 5);
292    /// # std::io::Result::Ok(()) });
293    /// ```
294    pub fn info(&self) -> io::Result<PipeInfo> {
295        // Safety: we're ensuring the lifetime of the named pipe.
296        unsafe { named_pipe_info(self.as_raw_fd()) }
297    }
298}
299
300impl AsyncRead for NamedPipeClient {
301    #[inline]
302    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
303        (&*self).read(buf).await
304    }
305}
306
307impl AsyncRead for &NamedPipeClient {
308    #[inline]
309    async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
310        // The position is ignored.
311        self.handle.read_at(buffer, 0).await
312    }
313}
314
315impl AsyncWrite for NamedPipeClient {
316    #[inline]
317    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
318        (&*self).write(buf).await
319    }
320
321    #[inline]
322    async fn flush(&mut self) -> io::Result<()> {
323        (&*self).flush().await
324    }
325
326    #[inline]
327    async fn shutdown(&mut self) -> io::Result<()> {
328        (&*self).shutdown().await
329    }
330}
331
332impl AsyncWrite for &NamedPipeClient {
333    #[inline]
334    async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
335        // The position is ignored.
336        (&self.handle).write_at(buffer, 0).await
337    }
338
339    #[inline]
340    async fn flush(&mut self) -> io::Result<()> {
341        Ok(())
342    }
343
344    #[inline]
345    async fn shutdown(&mut self) -> io::Result<()> {
346        Ok(())
347    }
348}
349
350impl_raw_fd!(NamedPipeClient, std::fs::File, handle, file);
351
352/// A builder structure for construct a named pipe with named pipe-specific
353/// options. This is required to use for named pipe servers who wants to modify
354/// pipe-related options.
355///
356/// See [`ServerOptions::create`].
357#[derive(Debug, Clone)]
358pub struct ServerOptions {
359    // dwOpenMode
360    access_inbound: bool,
361    access_outbound: bool,
362    first_pipe_instance: bool,
363    write_dac: bool,
364    write_owner: bool,
365    access_system_security: bool,
366    // dwPipeMode
367    pipe_mode: PipeMode,
368    reject_remote_clients: bool,
369    // other options
370    max_instances: u32,
371    out_buffer_size: u32,
372    in_buffer_size: u32,
373    default_timeout: u32,
374}
375
376impl ServerOptions {
377    /// Creates a new named pipe builder with the default settings.
378    ///
379    /// ```
380    /// use compio_fs::named_pipe::ServerOptions;
381    ///
382    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-new";
383    ///
384    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
385    /// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
386    /// # })
387    /// ```
388    pub fn new() -> ServerOptions {
389        ServerOptions {
390            access_inbound: true,
391            access_outbound: true,
392            first_pipe_instance: false,
393            write_dac: false,
394            write_owner: false,
395            access_system_security: false,
396            pipe_mode: PipeMode::Byte,
397            reject_remote_clients: true,
398            max_instances: PIPE_UNLIMITED_INSTANCES,
399            out_buffer_size: 65536,
400            in_buffer_size: 65536,
401            default_timeout: 0,
402        }
403    }
404
405    /// The pipe mode.
406    ///
407    /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
408    /// documentation of what each mode means.
409    ///
410    /// This corresponds to specifying `PIPE_TYPE_` and `PIPE_READMODE_` in
411    /// [`dwPipeMode`].
412    ///
413    /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
414    pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
415        self.pipe_mode = pipe_mode;
416        self
417    }
418
419    /// The flow of data in the pipe goes from client to server only.
420    ///
421    /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
422    ///
423    /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
424    ///
425    /// # Errors
426    ///
427    /// Server side prevents connecting by denying inbound access, client errors
428    /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
429    /// the connection.
430    ///
431    /// ```
432    /// use std::io;
433    ///
434    /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
435    ///
436    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound-err1";
437    ///
438    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
439    /// let _server = ServerOptions::new()
440    ///     .access_inbound(false)
441    ///     .create(PIPE_NAME)
442    ///     .unwrap();
443    ///
444    /// let e = ClientOptions::new().open(PIPE_NAME).await.unwrap_err();
445    ///
446    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
447    /// # })
448    /// ```
449    ///
450    /// Disabling writing allows a client to connect, but errors with
451    /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
452    ///
453    /// ```
454    /// use std::io;
455    ///
456    /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
457    /// use compio_io::AsyncWrite;
458    ///
459    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound-err2";
460    ///
461    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
462    /// let server = ServerOptions::new()
463    ///     .access_inbound(false)
464    ///     .create(PIPE_NAME)
465    ///     .unwrap();
466    ///
467    /// let mut client = ClientOptions::new()
468    ///     .write(false)
469    ///     .open(PIPE_NAME)
470    ///     .await
471    ///     .unwrap();
472    ///
473    /// server.connect().await.unwrap();
474    ///
475    /// let e = client.write("ping").await.0.unwrap_err();
476    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
477    /// # })
478    /// ```
479    ///
480    /// # Examples
481    ///
482    /// A unidirectional named pipe that only supports server-to-client
483    /// communication.
484    ///
485    /// ```
486    /// use std::io;
487    ///
488    /// use compio_buf::BufResult;
489    /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
490    /// use compio_io::{AsyncReadExt, AsyncWriteExt};
491    ///
492    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound";
493    ///
494    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
495    /// let mut server = ServerOptions::new()
496    ///     .access_inbound(false)
497    ///     .create(PIPE_NAME)
498    ///     .unwrap();
499    ///
500    /// let mut client = ClientOptions::new()
501    ///     .write(false)
502    ///     .open(PIPE_NAME)
503    ///     .await
504    ///     .unwrap();
505    ///
506    /// server.connect().await.unwrap();
507    ///
508    /// let write = server.write_all("ping");
509    ///
510    /// let buf = Vec::with_capacity(4);
511    /// let read = client.read_exact(buf);
512    ///
513    /// let (BufResult(write, _), BufResult(read, buf)) = futures_util::join!(write, read);
514    /// write.unwrap();
515    /// read.unwrap();
516    ///
517    /// assert_eq!(&buf[..], b"ping");
518    /// # })
519    /// ```
520    pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
521        self.access_inbound = allowed;
522        self
523    }
524
525    /// The flow of data in the pipe goes from server to client only.
526    ///
527    /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
528    ///
529    /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
530    ///
531    /// # Errors
532    ///
533    /// Server side prevents connecting by denying outbound access, client
534    /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
535    /// create the connection.
536    ///
537    /// ```
538    /// use std::io;
539    ///
540    /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
541    ///
542    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound-err1";
543    ///
544    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
545    /// let server = ServerOptions::new()
546    ///     .access_outbound(false)
547    ///     .create(PIPE_NAME)
548    ///     .unwrap();
549    ///
550    /// let e = ClientOptions::new().open(PIPE_NAME).await.unwrap_err();
551    ///
552    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
553    /// # })
554    /// ```
555    ///
556    /// Disabling reading allows a client to connect, but attempting to read
557    /// will error with [`std::io::ErrorKind::PermissionDenied`].
558    ///
559    /// ```
560    /// use std::io;
561    ///
562    /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
563    /// use compio_io::AsyncRead;
564    ///
565    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound-err2";
566    ///
567    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
568    /// let server = ServerOptions::new()
569    ///     .access_outbound(false)
570    ///     .create(PIPE_NAME)
571    ///     .unwrap();
572    ///
573    /// let mut client = ClientOptions::new()
574    ///     .read(false)
575    ///     .open(PIPE_NAME)
576    ///     .await
577    ///     .unwrap();
578    ///
579    /// server.connect().await.unwrap();
580    ///
581    /// let buf = Vec::with_capacity(4);
582    /// let e = client.read(buf).await.0.unwrap_err();
583    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
584    /// # })
585    /// ```
586    ///
587    /// # Examples
588    ///
589    /// A unidirectional named pipe that only supports client-to-server
590    /// communication.
591    ///
592    /// ```
593    /// use compio_buf::BufResult;
594    /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
595    /// use compio_io::{AsyncReadExt, AsyncWriteExt};
596    ///
597    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound";
598    ///
599    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
600    /// let mut server = ServerOptions::new()
601    ///     .access_outbound(false)
602    ///     .create(PIPE_NAME)
603    ///     .unwrap();
604    ///
605    /// let mut client = ClientOptions::new()
606    ///     .read(false)
607    ///     .open(PIPE_NAME)
608    ///     .await
609    ///     .unwrap();
610    ///
611    /// server.connect().await.unwrap();
612    ///
613    /// let write = client.write_all("ping");
614    ///
615    /// let buf = Vec::with_capacity(4);
616    /// let read = server.read_exact(buf);
617    ///
618    /// let (BufResult(write, _), BufResult(read, buf)) = futures_util::join!(write, read);
619    /// write.unwrap();
620    /// read.unwrap();
621    ///
622    /// println!("done reading and writing");
623    ///
624    /// assert_eq!(&buf[..], b"ping");
625    /// # })
626    /// ```
627    pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
628        self.access_outbound = allowed;
629        self
630    }
631
632    /// If you attempt to create multiple instances of a pipe with this flag
633    /// set, creation of the first server instance succeeds, but creation of any
634    /// subsequent instances will fail with
635    /// [`std::io::ErrorKind::PermissionDenied`].
636    ///
637    /// This option is intended to be used with servers that want to ensure that
638    /// they are the only process listening for clients on a given named pipe.
639    /// This is accomplished by enabling it for the first server instance
640    /// created in a process.
641    ///
642    /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
643    ///
644    /// # Errors
645    ///
646    /// If this option is set and more than one instance of the server for a
647    /// given named pipe exists, calling [`create`] will fail with
648    /// [`std::io::ErrorKind::PermissionDenied`].
649    ///
650    /// ```
651    /// use std::io;
652    ///
653    /// use compio_fs::named_pipe::ServerOptions;
654    ///
655    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-first-instance-error";
656    ///
657    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
658    /// let server1 = ServerOptions::new()
659    ///     .first_pipe_instance(true)
660    ///     .create(PIPE_NAME)
661    ///     .unwrap();
662    ///
663    /// // Second server errs, since it's not the first instance.
664    /// let e = ServerOptions::new()
665    ///     .first_pipe_instance(true)
666    ///     .create(PIPE_NAME)
667    ///     .unwrap_err();
668    ///
669    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
670    /// # })
671    /// ```
672    ///
673    /// # Examples
674    ///
675    /// ```
676    /// use std::io;
677    ///
678    /// use compio_fs::named_pipe::ServerOptions;
679    ///
680    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-first-instance";
681    ///
682    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
683    /// let mut builder = ServerOptions::new();
684    /// builder.first_pipe_instance(true);
685    ///
686    /// let server = builder.create(PIPE_NAME).unwrap();
687    /// let e = builder.create(PIPE_NAME).unwrap_err();
688    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
689    /// drop(server);
690    ///
691    /// // OK: since, we've closed the other instance.
692    /// let _server2 = builder.create(PIPE_NAME).unwrap();
693    /// # })
694    /// ```
695    ///
696    /// [`create`]: ServerOptions::create
697    /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
698    pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
699        self.first_pipe_instance = first;
700        self
701    }
702
703    /// Requests permission to modify the pipe's discretionary access control
704    /// list.
705    ///
706    /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
707    ///
708    /// # Examples
709    ///
710    /// ```
711    /// use std::{io, ptr};
712    ///
713    /// use compio_driver::AsRawFd;
714    /// use compio_fs::named_pipe::ServerOptions;
715    /// use windows_sys::Win32::{
716    ///     Foundation::ERROR_SUCCESS,
717    ///     Security::{
718    ///         Authorization::{SE_KERNEL_OBJECT, SetSecurityInfo},
719    ///         DACL_SECURITY_INFORMATION,
720    ///     },
721    /// };
722    ///
723    /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
724    ///
725    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
726    /// let mut pipe_template = ServerOptions::new();
727    /// pipe_template.write_dac(true);
728    /// let pipe = pipe_template.create(PIPE_NAME).unwrap();
729    ///
730    /// unsafe {
731    ///     assert_eq!(
732    ///         ERROR_SUCCESS,
733    ///         SetSecurityInfo(
734    ///             pipe.as_raw_fd() as _,
735    ///             SE_KERNEL_OBJECT,
736    ///             DACL_SECURITY_INFORMATION,
737    ///             ptr::null_mut(),
738    ///             ptr::null_mut(),
739    ///             ptr::null_mut(),
740    ///             ptr::null_mut(),
741    ///         )
742    ///     );
743    /// }
744    ///
745    /// # })
746    /// ```
747    /// ```
748    /// use std::{io, ptr};
749    ///
750    /// use compio_driver::AsRawFd;
751    /// use compio_fs::named_pipe::ServerOptions;
752    /// use windows_sys::Win32::{
753    ///     Foundation::ERROR_ACCESS_DENIED,
754    ///     Security::{
755    ///         Authorization::{SE_KERNEL_OBJECT, SetSecurityInfo},
756    ///         DACL_SECURITY_INFORMATION,
757    ///     },
758    /// };
759    ///
760    /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
761    ///
762    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
763    /// let mut pipe_template = ServerOptions::new();
764    /// pipe_template.write_dac(false);
765    /// let pipe = pipe_template.create(PIPE_NAME).unwrap();
766    ///
767    /// unsafe {
768    ///     assert_eq!(
769    ///         ERROR_ACCESS_DENIED,
770    ///         SetSecurityInfo(
771    ///             pipe.as_raw_fd() as _,
772    ///             SE_KERNEL_OBJECT,
773    ///             DACL_SECURITY_INFORMATION,
774    ///             ptr::null_mut(),
775    ///             ptr::null_mut(),
776    ///             ptr::null_mut(),
777    ///             ptr::null_mut(),
778    ///         )
779    ///     );
780    /// }
781    ///
782    /// # })
783    /// ```
784    ///
785    /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
786    pub fn write_dac(&mut self, requested: bool) -> &mut Self {
787        self.write_dac = requested;
788        self
789    }
790
791    /// Requests permission to modify the pipe's owner.
792    ///
793    /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
794    ///
795    /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
796    pub fn write_owner(&mut self, requested: bool) -> &mut Self {
797        self.write_owner = requested;
798        self
799    }
800
801    /// Requests permission to modify the pipe's system access control list.
802    ///
803    /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
804    ///
805    /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
806    pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
807        self.access_system_security = requested;
808        self
809    }
810
811    /// Indicates whether this server can accept remote clients or not. Remote
812    /// clients are disabled by default.
813    ///
814    /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
815    ///
816    /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
817    pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
818        self.reject_remote_clients = reject;
819        self
820    }
821
822    /// The maximum number of instances that can be created for this pipe. The
823    /// first instance of the pipe can specify this value; the same number must
824    /// be specified for other instances of the pipe. Acceptable values are in
825    /// the range 1 through 254. The default value is unlimited.
826    ///
827    /// This corresponds to specifying [`nMaxInstances`].
828    ///
829    /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
830    ///
831    /// # Errors
832    ///
833    /// The same numbers of `max_instances` have to be used by all servers. Any
834    /// additional servers trying to be built which uses a mismatching value
835    /// might error.
836    ///
837    /// ```
838    /// use std::io;
839    ///
840    /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
841    /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
842    ///
843    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-max-instances";
844    ///
845    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
846    /// let mut server = ServerOptions::new();
847    /// server.max_instances(2);
848    ///
849    /// let s1 = server.create(PIPE_NAME).unwrap();
850    /// let c1 = ClientOptions::new().open(PIPE_NAME).await.unwrap();
851    ///
852    /// let s2 = server.create(PIPE_NAME).unwrap();
853    /// let c2 = ClientOptions::new().open(PIPE_NAME).await.unwrap();
854    ///
855    /// // Too many servers!
856    /// let e = server.create(PIPE_NAME).unwrap_err();
857    /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
858    ///
859    /// // Still too many servers even if we specify a higher value!
860    /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
861    /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
862    /// # })
863    /// ```
864    ///
865    /// # Panics
866    ///
867    /// This function will panic if more than 254 instances are specified. If
868    /// you do not wish to set an instance limit, leave it unspecified.
869    ///
870    /// ```should_panic
871    /// use compio_fs::named_pipe::ServerOptions;
872    ///
873    /// let builder = ServerOptions::new().max_instances(255);
874    /// ```
875    #[track_caller]
876    pub fn max_instances(&mut self, instances: usize) -> &mut Self {
877        assert!(instances < 255, "cannot specify more than 254 instances");
878        self.max_instances = instances as u32;
879        self
880    }
881
882    /// The number of bytes to reserve for the output buffer.
883    ///
884    /// This corresponds to specifying [`nOutBufferSize`].
885    ///
886    /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
887    pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
888        self.out_buffer_size = buffer;
889        self
890    }
891
892    /// The number of bytes to reserve for the input buffer.
893    ///
894    /// This corresponds to specifying [`nInBufferSize`].
895    ///
896    /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
897    pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
898        self.in_buffer_size = buffer;
899        self
900    }
901
902    /// Creates the named pipe identified by `addr` for use as a server.
903    ///
904    /// This uses the [`CreateNamedPipe`] function.
905    ///
906    /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
907    ///
908    /// # Examples
909    ///
910    /// ```
911    /// use compio_fs::named_pipe::ServerOptions;
912    ///
913    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-create";
914    ///
915    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
916    /// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
917    /// # })
918    /// ```
919    pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
920        let addr = U16CString::from_os_str(addr)
921            .map_err(|e| io::Error::new(std::io::ErrorKind::InvalidData, e))?;
922
923        let pipe_mode = {
924            let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
925                PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE
926            } else {
927                PIPE_TYPE_BYTE | PIPE_READMODE_BYTE
928            };
929            if self.reject_remote_clients {
930                mode |= PIPE_REJECT_REMOTE_CLIENTS;
931            } else {
932                mode |= PIPE_ACCEPT_REMOTE_CLIENTS;
933            }
934            mode
935        };
936        let open_mode = {
937            let mut mode = FILE_FLAG_OVERLAPPED;
938            if self.access_inbound {
939                mode |= PIPE_ACCESS_INBOUND;
940            }
941            if self.access_outbound {
942                mode |= PIPE_ACCESS_OUTBOUND;
943            }
944            if self.first_pipe_instance {
945                mode |= FILE_FLAG_FIRST_PIPE_INSTANCE;
946            }
947            if self.write_dac {
948                mode |= WRITE_DAC;
949            }
950            if self.write_owner {
951                mode |= WRITE_OWNER;
952            }
953            if self.access_system_security {
954                mode |= ACCESS_SYSTEM_SECURITY;
955            }
956            mode
957        };
958
959        let h = syscall!(
960            HANDLE,
961            CreateNamedPipeW(
962                addr.as_ptr(),
963                open_mode,
964                pipe_mode,
965                self.max_instances,
966                self.out_buffer_size,
967                self.in_buffer_size,
968                self.default_timeout,
969                null(),
970            )
971        )?;
972
973        Ok(NamedPipeServer {
974            handle: AsyncFd::new(unsafe { std::fs::File::from_raw_handle(h as _) })?,
975        })
976    }
977}
978
979impl Default for ServerOptions {
980    fn default() -> Self {
981        Self::new()
982    }
983}
984
985/// A builder suitable for building and interacting with named pipes from the
986/// client side.
987///
988/// See [`ClientOptions::open`].
989#[derive(Debug, Clone)]
990pub struct ClientOptions {
991    options: OpenOptions,
992    pipe_mode: PipeMode,
993}
994
995impl ClientOptions {
996    /// Creates a new named pipe builder with the default settings.
997    ///
998    /// ```
999    /// use compio_fs::named_pipe::{ClientOptions, ServerOptions};
1000    ///
1001    /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-client-new";
1002    ///
1003    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
1004    /// // Server must be created in order for the client creation to succeed.
1005    /// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
1006    /// let client = ClientOptions::new().open(PIPE_NAME).await.unwrap();
1007    /// # })
1008    /// ```
1009    pub fn new() -> Self {
1010        use windows_sys::Win32::Storage::FileSystem::SECURITY_IDENTIFICATION;
1011
1012        let mut options = OpenOptions::new();
1013        options
1014            .read(true)
1015            .write(true)
1016            .security_qos_flags(SECURITY_IDENTIFICATION);
1017        Self {
1018            options,
1019            pipe_mode: PipeMode::Byte,
1020        }
1021    }
1022
1023    /// If the client supports reading data. This is enabled by default.
1024    pub fn read(&mut self, allowed: bool) -> &mut Self {
1025        self.options.read(allowed);
1026        self
1027    }
1028
1029    /// If the created pipe supports writing data. This is enabled by default.
1030    pub fn write(&mut self, allowed: bool) -> &mut Self {
1031        self.options.write(allowed);
1032        self
1033    }
1034
1035    /// Sets qos flags which are combined with other flags and attributes in the
1036    /// call to [`CreateFile`].
1037    ///
1038    /// When `security_qos_flags` is not set, a malicious program can gain the
1039    /// elevated privileges of a privileged Rust process when it allows opening
1040    /// user-specified paths, by tricking it into opening a named pipe. So
1041    /// arguably `security_qos_flags` should also be set when opening arbitrary
1042    /// paths. However the bits can then conflict with other flags, specifically
1043    /// `FILE_FLAG_OPEN_NO_RECALL`.
1044    ///
1045    /// For information about possible values, see [Impersonation Levels] on the
1046    /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
1047    /// automatically when using this method.
1048    ///
1049    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
1050    /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
1051    /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
1052    pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
1053        self.options.security_qos_flags(flags);
1054        self
1055    }
1056
1057    /// The pipe mode.
1058    ///
1059    /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1060    /// documentation of what each mode means.
1061    pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1062        self.pipe_mode = pipe_mode;
1063        self
1064    }
1065
1066    /// Opens the named pipe identified by `addr`.
1067    ///
1068    /// This opens the client using [`CreateFile`] with the
1069    /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
1070    ///
1071    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
1072    ///
1073    /// # Errors
1074    ///
1075    /// There are a few errors you need to take into account when creating a
1076    /// named pipe on the client side:
1077    ///
1078    /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
1079    ///   does not exist. Presumably the server is not up.
1080    /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
1081    ///   but the server is not currently waiting for a connection. Please see
1082    ///   the examples for how to check for this error.
1083    ///
1084    /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
1085    ///
1086    /// A connect loop that waits until a pipe becomes available looks like
1087    /// this:
1088    ///
1089    /// ```no_run
1090    /// use std::time::Duration;
1091    ///
1092    /// use compio_fs::named_pipe::ClientOptions;
1093    /// use compio_runtime::time;
1094    /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
1095    ///
1096    /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
1097    ///
1098    /// # compio_runtime::Runtime::new().unwrap().block_on(async move {
1099    /// let client = loop {
1100    ///     match ClientOptions::new().open(PIPE_NAME).await {
1101    ///         Ok(client) => break client,
1102    ///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
1103    ///         Err(e) => return Err(e),
1104    ///     }
1105    ///
1106    ///     time::sleep(Duration::from_millis(50)).await;
1107    /// };
1108    ///
1109    /// // use the connected client.
1110    /// # Ok(()) });
1111    /// ```
1112    pub async fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
1113        use windows_sys::Win32::System::Pipes::SetNamedPipeHandleState;
1114
1115        let file = self.options.open(addr.as_ref()).await?;
1116
1117        if matches!(self.pipe_mode, PipeMode::Message) {
1118            let mode = PIPE_READMODE_MESSAGE;
1119            syscall!(
1120                BOOL,
1121                SetNamedPipeHandleState(file.as_raw_fd() as _, &mode, null(), null())
1122            )?;
1123        }
1124
1125        Ok(NamedPipeClient { handle: file })
1126    }
1127}
1128
1129impl Default for ClientOptions {
1130    fn default() -> Self {
1131        Self::new()
1132    }
1133}
1134
1135/// The pipe mode of a named pipe.
1136///
1137/// Set through [`ServerOptions::pipe_mode`].
1138#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1139pub enum PipeMode {
1140    /// Data is written to the pipe as a stream of bytes. The pipe does not
1141    /// distinguish bytes written during different write operations.
1142    ///
1143    /// Corresponds to [`PIPE_TYPE_BYTE`].
1144    ///
1145    /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
1146    Byte,
1147    /// Data is written to the pipe as a stream of messages. The pipe treats the
1148    /// bytes written during each write operation as a message unit. Any reading
1149    /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
1150    /// completely.
1151    ///
1152    /// Corresponds to [`PIPE_TYPE_MESSAGE`].
1153    ///
1154    /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
1155    /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
1156    Message,
1157}
1158
1159/// Indicates the end of a named pipe.
1160#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1161pub enum PipeEnd {
1162    /// The named pipe refers to the client end of a named pipe instance.
1163    ///
1164    /// Corresponds to [`PIPE_CLIENT_END`].
1165    ///
1166    /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
1167    Client,
1168    /// The named pipe refers to the server end of a named pipe instance.
1169    ///
1170    /// Corresponds to [`PIPE_SERVER_END`].
1171    ///
1172    /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
1173    Server,
1174}
1175
1176/// Information about a named pipe.
1177///
1178/// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
1179#[derive(Debug)]
1180pub struct PipeInfo {
1181    /// Indicates the mode of a named pipe.
1182    pub mode: PipeMode,
1183    /// Indicates the end of a named pipe.
1184    pub end: PipeEnd,
1185    /// The maximum number of instances that can be created for this pipe.
1186    pub max_instances: u32,
1187    /// The number of bytes to reserve for the output buffer.
1188    pub out_buffer_size: u32,
1189    /// The number of bytes to reserve for the input buffer.
1190    pub in_buffer_size: u32,
1191}
1192
1193/// Internal function to get the info out of a raw named pipe.
1194unsafe fn named_pipe_info(handle: RawFd) -> io::Result<PipeInfo> {
1195    let mut flags = 0;
1196    let mut out_buffer_size = 0;
1197    let mut in_buffer_size = 0;
1198    let mut max_instances = 0;
1199
1200    syscall!(
1201        BOOL,
1202        GetNamedPipeInfo(
1203            handle as _,
1204            &mut flags,
1205            &mut out_buffer_size,
1206            &mut in_buffer_size,
1207            &mut max_instances,
1208        )
1209    )?;
1210
1211    let mut end = PipeEnd::Client;
1212    let mut mode = PipeMode::Byte;
1213
1214    if flags & PIPE_SERVER_END != 0 {
1215        end = PipeEnd::Server;
1216    }
1217
1218    if flags & PIPE_TYPE_MESSAGE != 0 {
1219        mode = PipeMode::Message;
1220    }
1221
1222    Ok(PipeInfo {
1223        end,
1224        mode,
1225        out_buffer_size,
1226        in_buffer_size,
1227        max_instances,
1228    })
1229}