madsim_real_tokio/net/windows/
named_pipe.rs

1//! Tokio support for [Windows named pipes].
2//!
3//! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4
5use std::ffi::c_void;
6use std::ffi::OsStr;
7use std::io::{self, Read, Write};
8use std::pin::Pin;
9use std::ptr;
10use std::task::{Context, Poll};
11
12use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
13use crate::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, RawHandle};
14
15cfg_io_util! {
16    use bytes::BufMut;
17}
18
19// Hide imports which are not used when generating documentation.
20#[cfg(not(docsrs))]
21mod doc {
22    pub(super) use crate::os::windows::ffi::OsStrExt;
23    pub(super) mod windows_sys {
24        pub(crate) use windows_sys::{
25            Win32::Foundation::*, Win32::Storage::FileSystem::*, Win32::System::Pipes::*,
26            Win32::System::SystemServices::*,
27        };
28    }
29    pub(super) use mio::windows as mio_windows;
30}
31
32// NB: none of these shows up in public API, so don't document them.
33#[cfg(docsrs)]
34mod doc {
35    pub(super) mod mio_windows {
36        pub type NamedPipe = crate::doc::NotDefinedHere;
37    }
38}
39
40use self::doc::*;
41
42/// A [Windows named pipe] server.
43///
44/// Accepting client connections involves creating a server with
45/// [`ServerOptions::create`] and waiting for clients to connect using
46/// [`NamedPipeServer::connect`].
47///
48/// To avoid having clients sporadically fail with
49/// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
50/// ensure that at least one server instance is available at all times. This
51/// means that the typical listen loop for a server is a bit involved, because
52/// we have to ensure that we never drop a server accidentally while a client
53/// might connect.
54///
55/// So a correctly implemented server looks like this:
56///
57/// ```no_run
58/// use std::io;
59/// use tokio::net::windows::named_pipe::ServerOptions;
60///
61/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
62///
63/// # #[tokio::main] async fn main() -> std::io::Result<()> {
64/// // The first server needs to be constructed early so that clients can
65/// // be correctly connected. Otherwise calling .wait will cause the client to
66/// // error.
67/// //
68/// // Here we also make use of `first_pipe_instance`, which will ensure that
69/// // there are no other servers up and running already.
70/// let mut server = ServerOptions::new()
71///     .first_pipe_instance(true)
72///     .create(PIPE_NAME)?;
73///
74/// // Spawn the server loop.
75/// let server = tokio::spawn(async move {
76///     loop {
77///         // Wait for a client to connect.
78///         let connected = server.connect().await?;
79///
80///         // Construct the next server to be connected before sending the one
81///         // we already have of onto a task. This ensures that the server
82///         // isn't closed (after it's done in the task) before a new one is
83///         // available. Otherwise the client might error with
84///         // `io::ErrorKind::NotFound`.
85///         server = ServerOptions::new().create(PIPE_NAME)?;
86///
87///         let client = tokio::spawn(async move {
88///             /* use the connected client */
89/// #           Ok::<_, std::io::Error>(())
90///         });
91/// #       if true { break } // needed for type inference to work
92///     }
93///
94///     Ok::<_, io::Error>(())
95/// });
96///
97/// /* do something else not server related here */
98/// # Ok(()) }
99/// ```
100///
101/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
102#[derive(Debug)]
103pub struct NamedPipeServer {
104    io: PollEvented<mio_windows::NamedPipe>,
105}
106
107impl NamedPipeServer {
108    /// Constructs a new named pipe server from the specified raw handle.
109    ///
110    /// This function will consume ownership of the handle given, passing
111    /// responsibility for closing the handle to the returned object.
112    ///
113    /// This function is also unsafe as the primitives currently returned have
114    /// the contract that they are the sole owner of the file descriptor they
115    /// are wrapping. Usage of this function could accidentally allow violating
116    /// this contract which can cause memory unsafety in code that relies on it
117    /// being true.
118    ///
119    /// # Errors
120    ///
121    /// This errors if called outside of a [Tokio Runtime], or in a runtime that
122    /// has not [enabled I/O], or if any OS-specific I/O errors occur.
123    ///
124    /// [Tokio Runtime]: crate::runtime::Runtime
125    /// [enabled I/O]: crate::runtime::Builder::enable_io
126    pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
127        let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
128
129        Ok(Self {
130            io: PollEvented::new(named_pipe)?,
131        })
132    }
133
134    /// Retrieves information about the named pipe the server is associated
135    /// with.
136    ///
137    /// ```no_run
138    /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
139    ///
140    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
141    ///
142    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
143    /// let server = ServerOptions::new()
144    ///     .pipe_mode(PipeMode::Message)
145    ///     .max_instances(5)
146    ///     .create(PIPE_NAME)?;
147    ///
148    /// let server_info = server.info()?;
149    ///
150    /// assert_eq!(server_info.end, PipeEnd::Server);
151    /// assert_eq!(server_info.mode, PipeMode::Message);
152    /// assert_eq!(server_info.max_instances, 5);
153    /// # Ok(()) }
154    /// ```
155    pub fn info(&self) -> io::Result<PipeInfo> {
156        // Safety: we're ensuring the lifetime of the named pipe.
157        unsafe { named_pipe_info(self.io.as_raw_handle()) }
158    }
159
160    /// Enables a named pipe server process to wait for a client process to
161    /// connect to an instance of a named pipe. A client process connects by
162    /// creating a named pipe with the same name.
163    ///
164    /// This corresponds to the [`ConnectNamedPipe`] system call.
165    ///
166    /// # Cancel safety
167    ///
168    /// This method is cancellation safe in the sense that if it is used as the
169    /// event in a [`select!`](crate::select) statement and some other branch
170    /// completes first, then no connection events have been lost.
171    ///
172    /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
173    ///
174    /// # Example
175    ///
176    /// ```no_run
177    /// use tokio::net::windows::named_pipe::ServerOptions;
178    ///
179    /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
180    ///
181    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
182    /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
183    ///
184    /// // Wait for a client to connect.
185    /// pipe.connect().await?;
186    ///
187    /// // Use the connected client...
188    /// # Ok(()) }
189    /// ```
190    pub async fn connect(&self) -> io::Result<()> {
191        match self.io.connect() {
192            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
193                self.io
194                    .registration()
195                    .async_io(Interest::WRITABLE, || self.io.connect())
196                    .await
197            }
198            x => x,
199        }
200    }
201
202    /// Disconnects the server end of a named pipe instance from a client
203    /// process.
204    ///
205    /// ```
206    /// use tokio::io::AsyncWriteExt;
207    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
208    /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
209    ///
210    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
211    ///
212    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
213    /// let server = ServerOptions::new()
214    ///     .create(PIPE_NAME)?;
215    ///
216    /// let mut client = ClientOptions::new()
217    ///     .open(PIPE_NAME)?;
218    ///
219    /// // Wait for a client to become connected.
220    /// server.connect().await?;
221    ///
222    /// // Forcibly disconnect the client.
223    /// server.disconnect()?;
224    ///
225    /// // Write fails with an OS-specific error after client has been
226    /// // disconnected.
227    /// let e = client.write(b"ping").await.unwrap_err();
228    /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_NOT_CONNECTED as i32));
229    /// # Ok(()) }
230    /// ```
231    pub fn disconnect(&self) -> io::Result<()> {
232        self.io.disconnect()
233    }
234
235    /// Waits for any of the requested ready states.
236    ///
237    /// This function is usually paired with `try_read()` or `try_write()`. It
238    /// can be used to concurrently read / write to the same pipe on a single
239    /// task without splitting the pipe.
240    ///
241    /// The function may complete without the pipe being ready. This is a
242    /// false-positive and attempting an operation will return with
243    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
244    /// [`Ready`] set, so you should always check the returned value and possibly
245    /// wait again if the requested states are not set.
246    ///
247    /// # Examples
248    ///
249    /// Concurrently read and write to the pipe on the same task without
250    /// splitting.
251    ///
252    /// ```no_run
253    /// use tokio::io::Interest;
254    /// use tokio::net::windows::named_pipe;
255    /// use std::error::Error;
256    /// use std::io;
257    ///
258    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
259    ///
260    /// #[tokio::main]
261    /// async fn main() -> Result<(), Box<dyn Error>> {
262    ///     let server = named_pipe::ServerOptions::new()
263    ///         .create(PIPE_NAME)?;
264    ///
265    ///     loop {
266    ///         let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
267    ///
268    ///         if ready.is_readable() {
269    ///             let mut data = vec![0; 1024];
270    ///             // Try to read data, this may still fail with `WouldBlock`
271    ///             // if the readiness event is a false positive.
272    ///             match server.try_read(&mut data) {
273    ///                 Ok(n) => {
274    ///                     println!("read {} bytes", n);
275    ///                 }
276    ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
277    ///                     continue;
278    ///                 }
279    ///                 Err(e) => {
280    ///                     return Err(e.into());
281    ///                 }
282    ///             }
283    ///         }
284    ///
285    ///         if ready.is_writable() {
286    ///             // Try to write data, this may still fail with `WouldBlock`
287    ///             // if the readiness event is a false positive.
288    ///             match server.try_write(b"hello world") {
289    ///                 Ok(n) => {
290    ///                     println!("write {} bytes", n);
291    ///                 }
292    ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
293    ///                     continue;
294    ///                 }
295    ///                 Err(e) => {
296    ///                     return Err(e.into());
297    ///                 }
298    ///             }
299    ///         }
300    ///     }
301    /// }
302    /// ```
303    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
304        let event = self.io.registration().readiness(interest).await?;
305        Ok(event.ready)
306    }
307
308    /// Waits for the pipe to become readable.
309    ///
310    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
311    /// paired with `try_read()`.
312    ///
313    /// # Examples
314    ///
315    /// ```no_run
316    /// use tokio::net::windows::named_pipe;
317    /// use std::error::Error;
318    /// use std::io;
319    ///
320    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
321    ///
322    /// #[tokio::main]
323    /// async fn main() -> Result<(), Box<dyn Error>> {
324    ///     let server = named_pipe::ServerOptions::new()
325    ///         .create(PIPE_NAME)?;
326    ///
327    ///     let mut msg = vec![0; 1024];
328    ///
329    ///     loop {
330    ///         // Wait for the pipe to be readable
331    ///         server.readable().await?;
332    ///
333    ///         // Try to read data, this may still fail with `WouldBlock`
334    ///         // if the readiness event is a false positive.
335    ///         match server.try_read(&mut msg) {
336    ///             Ok(n) => {
337    ///                 msg.truncate(n);
338    ///                 break;
339    ///             }
340    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
341    ///                 continue;
342    ///             }
343    ///             Err(e) => {
344    ///                 return Err(e.into());
345    ///             }
346    ///         }
347    ///     }
348    ///
349    ///     println!("GOT = {:?}", msg);
350    ///     Ok(())
351    /// }
352    /// ```
353    pub async fn readable(&self) -> io::Result<()> {
354        self.ready(Interest::READABLE).await?;
355        Ok(())
356    }
357
358    /// Polls for read readiness.
359    ///
360    /// If the pipe is not currently ready for reading, this method will
361    /// store a clone of the `Waker` from the provided `Context`. When the pipe
362    /// becomes ready for reading, `Waker::wake` will be called on the waker.
363    ///
364    /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
365    /// the `Waker` from the `Context` passed to the most recent call is
366    /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
367    /// second, independent waker.)
368    ///
369    /// This function is intended for cases where creating and pinning a future
370    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
371    /// preferred, as this supports polling from multiple tasks at once.
372    ///
373    /// # Return value
374    ///
375    /// The function returns:
376    ///
377    /// * `Poll::Pending` if the pipe is not ready for reading.
378    /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
379    /// * `Poll::Ready(Err(e))` if an error is encountered.
380    ///
381    /// # Errors
382    ///
383    /// This function may encounter any standard I/O error except `WouldBlock`.
384    ///
385    /// [`readable`]: method@Self::readable
386    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
387        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
388    }
389
390    /// Tries to read data from the pipe into the provided buffer, returning how
391    /// many bytes were read.
392    ///
393    /// Receives any pending data from the pipe but does not wait for new data
394    /// to arrive. On success, returns the number of bytes read. Because
395    /// `try_read()` is non-blocking, the buffer does not have to be stored by
396    /// the async task and can exist entirely on the stack.
397    ///
398    /// Usually, [`readable()`] or [`ready()`] is used with this function.
399    ///
400    /// [`readable()`]: NamedPipeServer::readable()
401    /// [`ready()`]: NamedPipeServer::ready()
402    ///
403    /// # Return
404    ///
405    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
406    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
407    ///
408    /// 1. The pipe's read half is closed and will no longer yield data.
409    /// 2. The specified buffer was 0 bytes in length.
410    ///
411    /// If the pipe is not ready to read data,
412    /// `Err(io::ErrorKind::WouldBlock)` is returned.
413    ///
414    /// # Examples
415    ///
416    /// ```no_run
417    /// use tokio::net::windows::named_pipe;
418    /// use std::error::Error;
419    /// use std::io;
420    ///
421    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
422    ///
423    /// #[tokio::main]
424    /// async fn main() -> Result<(), Box<dyn Error>> {
425    ///     let server = named_pipe::ServerOptions::new()
426    ///         .create(PIPE_NAME)?;
427    ///
428    ///     loop {
429    ///         // Wait for the pipe to be readable
430    ///         server.readable().await?;
431    ///
432    ///         // Creating the buffer **after** the `await` prevents it from
433    ///         // being stored in the async task.
434    ///         let mut buf = [0; 4096];
435    ///
436    ///         // Try to read data, this may still fail with `WouldBlock`
437    ///         // if the readiness event is a false positive.
438    ///         match server.try_read(&mut buf) {
439    ///             Ok(0) => break,
440    ///             Ok(n) => {
441    ///                 println!("read {} bytes", n);
442    ///             }
443    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
444    ///                 continue;
445    ///             }
446    ///             Err(e) => {
447    ///                 return Err(e.into());
448    ///             }
449    ///         }
450    ///     }
451    ///
452    ///     Ok(())
453    /// }
454    /// ```
455    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
456        self.io
457            .registration()
458            .try_io(Interest::READABLE, || (&*self.io).read(buf))
459    }
460
461    /// Tries to read data from the pipe into the provided buffers, returning
462    /// how many bytes were read.
463    ///
464    /// Data is copied to fill each buffer in order, with the final buffer
465    /// written to possibly being only partially filled. This method behaves
466    /// equivalently to a single call to [`try_read()`] with concatenated
467    /// buffers.
468    ///
469    /// Receives any pending data from the pipe but does not wait for new data
470    /// to arrive. On success, returns the number of bytes read. Because
471    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
472    /// stored by the async task and can exist entirely on the stack.
473    ///
474    /// Usually, [`readable()`] or [`ready()`] is used with this function.
475    ///
476    /// [`try_read()`]: NamedPipeServer::try_read()
477    /// [`readable()`]: NamedPipeServer::readable()
478    /// [`ready()`]: NamedPipeServer::ready()
479    ///
480    /// # Return
481    ///
482    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
483    /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
484    /// and will no longer yield data. If the pipe is not ready to read data
485    /// `Err(io::ErrorKind::WouldBlock)` is returned.
486    ///
487    /// # Examples
488    ///
489    /// ```no_run
490    /// use tokio::net::windows::named_pipe;
491    /// use std::error::Error;
492    /// use std::io::{self, IoSliceMut};
493    ///
494    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
495    ///
496    /// #[tokio::main]
497    /// async fn main() -> Result<(), Box<dyn Error>> {
498    ///     let server = named_pipe::ServerOptions::new()
499    ///         .create(PIPE_NAME)?;
500    ///
501    ///     loop {
502    ///         // Wait for the pipe to be readable
503    ///         server.readable().await?;
504    ///
505    ///         // Creating the buffer **after** the `await` prevents it from
506    ///         // being stored in the async task.
507    ///         let mut buf_a = [0; 512];
508    ///         let mut buf_b = [0; 1024];
509    ///         let mut bufs = [
510    ///             IoSliceMut::new(&mut buf_a),
511    ///             IoSliceMut::new(&mut buf_b),
512    ///         ];
513    ///
514    ///         // Try to read data, this may still fail with `WouldBlock`
515    ///         // if the readiness event is a false positive.
516    ///         match server.try_read_vectored(&mut bufs) {
517    ///             Ok(0) => break,
518    ///             Ok(n) => {
519    ///                 println!("read {} bytes", n);
520    ///             }
521    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
522    ///                 continue;
523    ///             }
524    ///             Err(e) => {
525    ///                 return Err(e.into());
526    ///             }
527    ///         }
528    ///     }
529    ///
530    ///     Ok(())
531    /// }
532    /// ```
533    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
534        self.io
535            .registration()
536            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
537    }
538
539    cfg_io_util! {
540        /// Tries to read data from the stream into the provided buffer, advancing the
541        /// buffer's internal cursor, returning how many bytes were read.
542        ///
543        /// Receives any pending data from the pipe but does not wait for new data
544        /// to arrive. On success, returns the number of bytes read. Because
545        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
546        /// the async task and can exist entirely on the stack.
547        ///
548        /// Usually, [`readable()`] or [`ready()`] is used with this function.
549        ///
550        /// [`readable()`]: NamedPipeServer::readable()
551        /// [`ready()`]: NamedPipeServer::ready()
552        ///
553        /// # Return
554        ///
555        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
556        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
557        /// and will no longer yield data. If the stream is not ready to read data
558        /// `Err(io::ErrorKind::WouldBlock)` is returned.
559        ///
560        /// # Examples
561        ///
562        /// ```no_run
563        /// use tokio::net::windows::named_pipe;
564        /// use std::error::Error;
565        /// use std::io;
566        ///
567        /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
568        ///
569        /// #[tokio::main]
570        /// async fn main() -> Result<(), Box<dyn Error>> {
571        ///     let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
572        ///
573        ///     loop {
574        ///         // Wait for the pipe to be readable
575        ///         server.readable().await?;
576        ///
577        ///         let mut buf = Vec::with_capacity(4096);
578        ///
579        ///         // Try to read data, this may still fail with `WouldBlock`
580        ///         // if the readiness event is a false positive.
581        ///         match server.try_read_buf(&mut buf) {
582        ///             Ok(0) => break,
583        ///             Ok(n) => {
584        ///                 println!("read {} bytes", n);
585        ///             }
586        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
587        ///                 continue;
588        ///             }
589        ///             Err(e) => {
590        ///                 return Err(e.into());
591        ///             }
592        ///         }
593        ///     }
594        ///
595        ///     Ok(())
596        /// }
597        /// ```
598        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
599            self.io.registration().try_io(Interest::READABLE, || {
600                use std::io::Read;
601
602                let dst = buf.chunk_mut();
603                let dst =
604                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
605
606                // Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
607                // buffer.
608                let n = (&*self.io).read(dst)?;
609
610                unsafe {
611                    buf.advance_mut(n);
612                }
613
614                Ok(n)
615            })
616        }
617    }
618
619    /// Waits for the pipe to become writable.
620    ///
621    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
622    /// paired with `try_write()`.
623    ///
624    /// # Examples
625    ///
626    /// ```no_run
627    /// use tokio::net::windows::named_pipe;
628    /// use std::error::Error;
629    /// use std::io;
630    ///
631    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
632    ///
633    /// #[tokio::main]
634    /// async fn main() -> Result<(), Box<dyn Error>> {
635    ///     let server = named_pipe::ServerOptions::new()
636    ///         .create(PIPE_NAME)?;
637    ///
638    ///     loop {
639    ///         // Wait for the pipe to be writable
640    ///         server.writable().await?;
641    ///
642    ///         // Try to write data, this may still fail with `WouldBlock`
643    ///         // if the readiness event is a false positive.
644    ///         match server.try_write(b"hello world") {
645    ///             Ok(n) => {
646    ///                 break;
647    ///             }
648    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
649    ///                 continue;
650    ///             }
651    ///             Err(e) => {
652    ///                 return Err(e.into());
653    ///             }
654    ///         }
655    ///     }
656    ///
657    ///     Ok(())
658    /// }
659    /// ```
660    pub async fn writable(&self) -> io::Result<()> {
661        self.ready(Interest::WRITABLE).await?;
662        Ok(())
663    }
664
665    /// Polls for write readiness.
666    ///
667    /// If the pipe is not currently ready for writing, this method will
668    /// store a clone of the `Waker` from the provided `Context`. When the pipe
669    /// becomes ready for writing, `Waker::wake` will be called on the waker.
670    ///
671    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
672    /// the `Waker` from the `Context` passed to the most recent call is
673    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
674    /// second, independent waker.)
675    ///
676    /// This function is intended for cases where creating and pinning a future
677    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
678    /// preferred, as this supports polling from multiple tasks at once.
679    ///
680    /// # Return value
681    ///
682    /// The function returns:
683    ///
684    /// * `Poll::Pending` if the pipe is not ready for writing.
685    /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
686    /// * `Poll::Ready(Err(e))` if an error is encountered.
687    ///
688    /// # Errors
689    ///
690    /// This function may encounter any standard I/O error except `WouldBlock`.
691    ///
692    /// [`writable`]: method@Self::writable
693    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
694        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
695    }
696
697    /// Tries to write a buffer to the pipe, returning how many bytes were
698    /// written.
699    ///
700    /// The function will attempt to write the entire contents of `buf`, but
701    /// only part of the buffer may be written.
702    ///
703    /// This function is usually paired with `writable()`.
704    ///
705    /// # Return
706    ///
707    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
708    /// number of bytes written. If the pipe is not ready to write data,
709    /// `Err(io::ErrorKind::WouldBlock)` is returned.
710    ///
711    /// # Examples
712    ///
713    /// ```no_run
714    /// use tokio::net::windows::named_pipe;
715    /// use std::error::Error;
716    /// use std::io;
717    ///
718    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
719    ///
720    /// #[tokio::main]
721    /// async fn main() -> Result<(), Box<dyn Error>> {
722    ///     let server = named_pipe::ServerOptions::new()
723    ///         .create(PIPE_NAME)?;
724    ///
725    ///     loop {
726    ///         // Wait for the pipe to be writable
727    ///         server.writable().await?;
728    ///
729    ///         // Try to write data, this may still fail with `WouldBlock`
730    ///         // if the readiness event is a false positive.
731    ///         match server.try_write(b"hello world") {
732    ///             Ok(n) => {
733    ///                 break;
734    ///             }
735    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
736    ///                 continue;
737    ///             }
738    ///             Err(e) => {
739    ///                 return Err(e.into());
740    ///             }
741    ///         }
742    ///     }
743    ///
744    ///     Ok(())
745    /// }
746    /// ```
747    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
748        self.io
749            .registration()
750            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
751    }
752
753    /// Tries to write several buffers to the pipe, returning how many bytes
754    /// were written.
755    ///
756    /// Data is written from each buffer in order, with the final buffer read
757    /// from possible being only partially consumed. This method behaves
758    /// equivalently to a single call to [`try_write()`] with concatenated
759    /// buffers.
760    ///
761    /// This function is usually paired with `writable()`.
762    ///
763    /// [`try_write()`]: NamedPipeServer::try_write()
764    ///
765    /// # Return
766    ///
767    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
768    /// number of bytes written. If the pipe is not ready to write data,
769    /// `Err(io::ErrorKind::WouldBlock)` is returned.
770    ///
771    /// # Examples
772    ///
773    /// ```no_run
774    /// use tokio::net::windows::named_pipe;
775    /// use std::error::Error;
776    /// use std::io;
777    ///
778    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
779    ///
780    /// #[tokio::main]
781    /// async fn main() -> Result<(), Box<dyn Error>> {
782    ///     let server = named_pipe::ServerOptions::new()
783    ///         .create(PIPE_NAME)?;
784    ///
785    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
786    ///
787    ///     loop {
788    ///         // Wait for the pipe to be writable
789    ///         server.writable().await?;
790    ///
791    ///         // Try to write data, this may still fail with `WouldBlock`
792    ///         // if the readiness event is a false positive.
793    ///         match server.try_write_vectored(&bufs) {
794    ///             Ok(n) => {
795    ///                 break;
796    ///             }
797    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
798    ///                 continue;
799    ///             }
800    ///             Err(e) => {
801    ///                 return Err(e.into());
802    ///             }
803    ///         }
804    ///     }
805    ///
806    ///     Ok(())
807    /// }
808    /// ```
809    pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
810        self.io
811            .registration()
812            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
813    }
814
815    /// Tries to read or write from the pipe using a user-provided IO operation.
816    ///
817    /// If the pipe is ready, the provided closure is called. The closure
818    /// should attempt to perform IO operation from the pipe by manually
819    /// calling the appropriate syscall. If the operation fails because the
820    /// pipe is not actually ready, then the closure should return a
821    /// `WouldBlock` error and the readiness flag is cleared. The return value
822    /// of the closure is then returned by `try_io`.
823    ///
824    /// If the pipe is not ready, then the closure is not called
825    /// and a `WouldBlock` error is returned.
826    ///
827    /// The closure should only return a `WouldBlock` error if it has performed
828    /// an IO operation on the pipe that failed due to the pipe not being
829    /// ready. Returning a `WouldBlock` error in any other situation will
830    /// incorrectly clear the readiness flag, which can cause the pipe to
831    /// behave incorrectly.
832    ///
833    /// The closure should not perform the IO operation using any of the
834    /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
835    /// the readiness flag and can cause the pipe to behave incorrectly.
836    ///
837    /// This method is not intended to be used with combined interests.
838    /// The closure should perform only one type of IO operation, so it should not
839    /// require more than one ready state. This method may panic or sleep forever
840    /// if it is called with a combined interest.
841    ///
842    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
843    ///
844    /// [`readable()`]: NamedPipeServer::readable()
845    /// [`writable()`]: NamedPipeServer::writable()
846    /// [`ready()`]: NamedPipeServer::ready()
847    pub fn try_io<R>(
848        &self,
849        interest: Interest,
850        f: impl FnOnce() -> io::Result<R>,
851    ) -> io::Result<R> {
852        self.io.registration().try_io(interest, f)
853    }
854
855    /// Reads or writes from the pipe using a user-provided IO operation.
856    ///
857    /// The readiness of the pipe is awaited and when the pipe is ready,
858    /// the provided closure is called. The closure should attempt to perform
859    /// IO operation on the pipe by manually calling the appropriate syscall.
860    /// If the operation fails because the pipe is not actually ready,
861    /// then the closure should return a `WouldBlock` error. In such case the
862    /// readiness flag is cleared and the pipe readiness is awaited again.
863    /// This loop is repeated until the closure returns an `Ok` or an error
864    /// other than `WouldBlock`.
865    ///
866    /// The closure should only return a `WouldBlock` error if it has performed
867    /// an IO operation on the pipe that failed due to the pipe not being
868    /// ready. Returning a `WouldBlock` error in any other situation will
869    /// incorrectly clear the readiness flag, which can cause the pipe to
870    /// behave incorrectly.
871    ///
872    /// The closure should not perform the IO operation using any of the methods
873    /// defined on the Tokio `NamedPipeServer` type, as this will mess with the
874    /// readiness flag and can cause the pipe to behave incorrectly.
875    ///
876    /// This method is not intended to be used with combined interests.
877    /// The closure should perform only one type of IO operation, so it should not
878    /// require more than one ready state. This method may panic or sleep forever
879    /// if it is called with a combined interest.
880    pub async fn async_io<R>(
881        &self,
882        interest: Interest,
883        f: impl FnMut() -> io::Result<R>,
884    ) -> io::Result<R> {
885        self.io.registration().async_io(interest, f).await
886    }
887}
888
889impl AsyncRead for NamedPipeServer {
890    fn poll_read(
891        self: Pin<&mut Self>,
892        cx: &mut Context<'_>,
893        buf: &mut ReadBuf<'_>,
894    ) -> Poll<io::Result<()>> {
895        unsafe { self.io.poll_read(cx, buf) }
896    }
897}
898
899impl AsyncWrite for NamedPipeServer {
900    fn poll_write(
901        self: Pin<&mut Self>,
902        cx: &mut Context<'_>,
903        buf: &[u8],
904    ) -> Poll<io::Result<usize>> {
905        self.io.poll_write(cx, buf)
906    }
907
908    fn poll_write_vectored(
909        self: Pin<&mut Self>,
910        cx: &mut Context<'_>,
911        bufs: &[io::IoSlice<'_>],
912    ) -> Poll<io::Result<usize>> {
913        self.io.poll_write_vectored(cx, bufs)
914    }
915
916    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
917        Poll::Ready(Ok(()))
918    }
919
920    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
921        self.poll_flush(cx)
922    }
923}
924
925impl AsRawHandle for NamedPipeServer {
926    fn as_raw_handle(&self) -> RawHandle {
927        self.io.as_raw_handle()
928    }
929}
930
931impl AsHandle for NamedPipeServer {
932    fn as_handle(&self) -> BorrowedHandle<'_> {
933        unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
934    }
935}
936
937/// A [Windows named pipe] client.
938///
939/// Constructed using [`ClientOptions::open`].
940///
941/// Connecting a client correctly involves a few steps. When connecting through
942/// [`ClientOptions::open`], it might error indicating one of two things:
943///
944/// * [`std::io::ErrorKind::NotFound`] - There is no server available.
945/// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
946///   for a while and try again.
947///
948/// So a correctly implemented client looks like this:
949///
950/// ```no_run
951/// use std::time::Duration;
952/// use tokio::net::windows::named_pipe::ClientOptions;
953/// use tokio::time;
954/// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
955///
956/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
957///
958/// # #[tokio::main] async fn main() -> std::io::Result<()> {
959/// let client = loop {
960///     match ClientOptions::new().open(PIPE_NAME) {
961///         Ok(client) => break client,
962///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
963///         Err(e) => return Err(e),
964///     }
965///
966///     time::sleep(Duration::from_millis(50)).await;
967/// };
968///
969/// /* use the connected client */
970/// # Ok(()) }
971/// ```
972///
973/// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
974/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
975#[derive(Debug)]
976pub struct NamedPipeClient {
977    io: PollEvented<mio_windows::NamedPipe>,
978}
979
980impl NamedPipeClient {
981    /// Constructs a new named pipe client from the specified raw handle.
982    ///
983    /// This function will consume ownership of the handle given, passing
984    /// responsibility for closing the handle to the returned object.
985    ///
986    /// This function is also unsafe as the primitives currently returned have
987    /// the contract that they are the sole owner of the file descriptor they
988    /// are wrapping. Usage of this function could accidentally allow violating
989    /// this contract which can cause memory unsafety in code that relies on it
990    /// being true.
991    ///
992    /// # Errors
993    ///
994    /// This errors if called outside of a [Tokio Runtime], or in a runtime that
995    /// has not [enabled I/O], or if any OS-specific I/O errors occur.
996    ///
997    /// [Tokio Runtime]: crate::runtime::Runtime
998    /// [enabled I/O]: crate::runtime::Builder::enable_io
999    pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
1000        let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
1001
1002        Ok(Self {
1003            io: PollEvented::new(named_pipe)?,
1004        })
1005    }
1006
1007    /// Retrieves information about the named pipe the client is associated
1008    /// with.
1009    ///
1010    /// ```no_run
1011    /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
1012    ///
1013    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
1014    ///
1015    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1016    /// let client = ClientOptions::new()
1017    ///     .open(PIPE_NAME)?;
1018    ///
1019    /// let client_info = client.info()?;
1020    ///
1021    /// assert_eq!(client_info.end, PipeEnd::Client);
1022    /// assert_eq!(client_info.mode, PipeMode::Message);
1023    /// assert_eq!(client_info.max_instances, 5);
1024    /// # Ok(()) }
1025    /// ```
1026    pub fn info(&self) -> io::Result<PipeInfo> {
1027        // Safety: we're ensuring the lifetime of the named pipe.
1028        unsafe { named_pipe_info(self.io.as_raw_handle()) }
1029    }
1030
1031    /// Waits for any of the requested ready states.
1032    ///
1033    /// This function is usually paired with `try_read()` or `try_write()`. It
1034    /// can be used to concurrently read / write to the same pipe on a single
1035    /// task without splitting the pipe.
1036    ///
1037    /// The function may complete without the pipe being ready. This is a
1038    /// false-positive and attempting an operation will return with
1039    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1040    /// [`Ready`] set, so you should always check the returned value and possibly
1041    /// wait again if the requested states are not set.
1042    ///
1043    /// # Examples
1044    ///
1045    /// Concurrently read and write to the pipe on the same task without
1046    /// splitting.
1047    ///
1048    /// ```no_run
1049    /// use tokio::io::Interest;
1050    /// use tokio::net::windows::named_pipe;
1051    /// use std::error::Error;
1052    /// use std::io;
1053    ///
1054    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
1055    ///
1056    /// #[tokio::main]
1057    /// async fn main() -> Result<(), Box<dyn Error>> {
1058    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1059    ///
1060    ///     loop {
1061    ///         let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
1062    ///
1063    ///         if ready.is_readable() {
1064    ///             let mut data = vec![0; 1024];
1065    ///             // Try to read data, this may still fail with `WouldBlock`
1066    ///             // if the readiness event is a false positive.
1067    ///             match client.try_read(&mut data) {
1068    ///                 Ok(n) => {
1069    ///                     println!("read {} bytes", n);
1070    ///                 }
1071    ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1072    ///                     continue;
1073    ///                 }
1074    ///                 Err(e) => {
1075    ///                     return Err(e.into());
1076    ///                 }
1077    ///             }
1078    ///         }
1079    ///
1080    ///         if ready.is_writable() {
1081    ///             // Try to write data, this may still fail with `WouldBlock`
1082    ///             // if the readiness event is a false positive.
1083    ///             match client.try_write(b"hello world") {
1084    ///                 Ok(n) => {
1085    ///                     println!("write {} bytes", n);
1086    ///                 }
1087    ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1088    ///                     continue;
1089    ///                 }
1090    ///                 Err(e) => {
1091    ///                     return Err(e.into());
1092    ///                 }
1093    ///             }
1094    ///         }
1095    ///     }
1096    /// }
1097    /// ```
1098    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1099        let event = self.io.registration().readiness(interest).await?;
1100        Ok(event.ready)
1101    }
1102
1103    /// Waits for the pipe to become readable.
1104    ///
1105    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1106    /// paired with `try_read()`.
1107    ///
1108    /// # Examples
1109    ///
1110    /// ```no_run
1111    /// use tokio::net::windows::named_pipe;
1112    /// use std::error::Error;
1113    /// use std::io;
1114    ///
1115    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1116    ///
1117    /// #[tokio::main]
1118    /// async fn main() -> Result<(), Box<dyn Error>> {
1119    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1120    ///
1121    ///     let mut msg = vec![0; 1024];
1122    ///
1123    ///     loop {
1124    ///         // Wait for the pipe to be readable
1125    ///         client.readable().await?;
1126    ///
1127    ///         // Try to read data, this may still fail with `WouldBlock`
1128    ///         // if the readiness event is a false positive.
1129    ///         match client.try_read(&mut msg) {
1130    ///             Ok(n) => {
1131    ///                 msg.truncate(n);
1132    ///                 break;
1133    ///             }
1134    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1135    ///                 continue;
1136    ///             }
1137    ///             Err(e) => {
1138    ///                 return Err(e.into());
1139    ///             }
1140    ///         }
1141    ///     }
1142    ///
1143    ///     println!("GOT = {:?}", msg);
1144    ///     Ok(())
1145    /// }
1146    /// ```
1147    pub async fn readable(&self) -> io::Result<()> {
1148        self.ready(Interest::READABLE).await?;
1149        Ok(())
1150    }
1151
1152    /// Polls for read readiness.
1153    ///
1154    /// If the pipe is not currently ready for reading, this method will
1155    /// store a clone of the `Waker` from the provided `Context`. When the pipe
1156    /// becomes ready for reading, `Waker::wake` will be called on the waker.
1157    ///
1158    /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1159    /// the `Waker` from the `Context` passed to the most recent call is
1160    /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1161    /// second, independent waker.)
1162    ///
1163    /// This function is intended for cases where creating and pinning a future
1164    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1165    /// preferred, as this supports polling from multiple tasks at once.
1166    ///
1167    /// # Return value
1168    ///
1169    /// The function returns:
1170    ///
1171    /// * `Poll::Pending` if the pipe is not ready for reading.
1172    /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1173    /// * `Poll::Ready(Err(e))` if an error is encountered.
1174    ///
1175    /// # Errors
1176    ///
1177    /// This function may encounter any standard I/O error except `WouldBlock`.
1178    ///
1179    /// [`readable`]: method@Self::readable
1180    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1181        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1182    }
1183
1184    /// Tries to read data from the pipe into the provided buffer, returning how
1185    /// many bytes were read.
1186    ///
1187    /// Receives any pending data from the pipe but does not wait for new data
1188    /// to arrive. On success, returns the number of bytes read. Because
1189    /// `try_read()` is non-blocking, the buffer does not have to be stored by
1190    /// the async task and can exist entirely on the stack.
1191    ///
1192    /// Usually, [`readable()`] or [`ready()`] is used with this function.
1193    ///
1194    /// [`readable()`]: NamedPipeClient::readable()
1195    /// [`ready()`]: NamedPipeClient::ready()
1196    ///
1197    /// # Return
1198    ///
1199    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1200    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1201    ///
1202    /// 1. The pipe's read half is closed and will no longer yield data.
1203    /// 2. The specified buffer was 0 bytes in length.
1204    ///
1205    /// If the pipe is not ready to read data,
1206    /// `Err(io::ErrorKind::WouldBlock)` is returned.
1207    ///
1208    /// # Examples
1209    ///
1210    /// ```no_run
1211    /// use tokio::net::windows::named_pipe;
1212    /// use std::error::Error;
1213    /// use std::io;
1214    ///
1215    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1216    ///
1217    /// #[tokio::main]
1218    /// async fn main() -> Result<(), Box<dyn Error>> {
1219    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1220    ///
1221    ///     loop {
1222    ///         // Wait for the pipe to be readable
1223    ///         client.readable().await?;
1224    ///
1225    ///         // Creating the buffer **after** the `await` prevents it from
1226    ///         // being stored in the async task.
1227    ///         let mut buf = [0; 4096];
1228    ///
1229    ///         // Try to read data, this may still fail with `WouldBlock`
1230    ///         // if the readiness event is a false positive.
1231    ///         match client.try_read(&mut buf) {
1232    ///             Ok(0) => break,
1233    ///             Ok(n) => {
1234    ///                 println!("read {} bytes", n);
1235    ///             }
1236    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1237    ///                 continue;
1238    ///             }
1239    ///             Err(e) => {
1240    ///                 return Err(e.into());
1241    ///             }
1242    ///         }
1243    ///     }
1244    ///
1245    ///     Ok(())
1246    /// }
1247    /// ```
1248    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1249        self.io
1250            .registration()
1251            .try_io(Interest::READABLE, || (&*self.io).read(buf))
1252    }
1253
1254    /// Tries to read data from the pipe into the provided buffers, returning
1255    /// how many bytes were read.
1256    ///
1257    /// Data is copied to fill each buffer in order, with the final buffer
1258    /// written to possibly being only partially filled. This method behaves
1259    /// equivalently to a single call to [`try_read()`] with concatenated
1260    /// buffers.
1261    ///
1262    /// Receives any pending data from the pipe but does not wait for new data
1263    /// to arrive. On success, returns the number of bytes read. Because
1264    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1265    /// stored by the async task and can exist entirely on the stack.
1266    ///
1267    /// Usually, [`readable()`] or [`ready()`] is used with this function.
1268    ///
1269    /// [`try_read()`]: NamedPipeClient::try_read()
1270    /// [`readable()`]: NamedPipeClient::readable()
1271    /// [`ready()`]: NamedPipeClient::ready()
1272    ///
1273    /// # Return
1274    ///
1275    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1276    /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1277    /// and will no longer yield data. If the pipe is not ready to read data
1278    /// `Err(io::ErrorKind::WouldBlock)` is returned.
1279    ///
1280    /// # Examples
1281    ///
1282    /// ```no_run
1283    /// use tokio::net::windows::named_pipe;
1284    /// use std::error::Error;
1285    /// use std::io::{self, IoSliceMut};
1286    ///
1287    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1288    ///
1289    /// #[tokio::main]
1290    /// async fn main() -> Result<(), Box<dyn Error>> {
1291    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1292    ///
1293    ///     loop {
1294    ///         // Wait for the pipe to be readable
1295    ///         client.readable().await?;
1296    ///
1297    ///         // Creating the buffer **after** the `await` prevents it from
1298    ///         // being stored in the async task.
1299    ///         let mut buf_a = [0; 512];
1300    ///         let mut buf_b = [0; 1024];
1301    ///         let mut bufs = [
1302    ///             IoSliceMut::new(&mut buf_a),
1303    ///             IoSliceMut::new(&mut buf_b),
1304    ///         ];
1305    ///
1306    ///         // Try to read data, this may still fail with `WouldBlock`
1307    ///         // if the readiness event is a false positive.
1308    ///         match client.try_read_vectored(&mut bufs) {
1309    ///             Ok(0) => break,
1310    ///             Ok(n) => {
1311    ///                 println!("read {} bytes", n);
1312    ///             }
1313    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1314    ///                 continue;
1315    ///             }
1316    ///             Err(e) => {
1317    ///                 return Err(e.into());
1318    ///             }
1319    ///         }
1320    ///     }
1321    ///
1322    ///     Ok(())
1323    /// }
1324    /// ```
1325    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1326        self.io
1327            .registration()
1328            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1329    }
1330
1331    cfg_io_util! {
1332        /// Tries to read data from the stream into the provided buffer, advancing the
1333        /// buffer's internal cursor, returning how many bytes were read.
1334        ///
1335        /// Receives any pending data from the pipe but does not wait for new data
1336        /// to arrive. On success, returns the number of bytes read. Because
1337        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1338        /// the async task and can exist entirely on the stack.
1339        ///
1340        /// Usually, [`readable()`] or [`ready()`] is used with this function.
1341        ///
1342        /// [`readable()`]: NamedPipeClient::readable()
1343        /// [`ready()`]: NamedPipeClient::ready()
1344        ///
1345        /// # Return
1346        ///
1347        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1348        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
1349        /// and will no longer yield data. If the stream is not ready to read data
1350        /// `Err(io::ErrorKind::WouldBlock)` is returned.
1351        ///
1352        /// # Examples
1353        ///
1354        /// ```no_run
1355        /// use tokio::net::windows::named_pipe;
1356        /// use std::error::Error;
1357        /// use std::io;
1358        ///
1359        /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1360        ///
1361        /// #[tokio::main]
1362        /// async fn main() -> Result<(), Box<dyn Error>> {
1363        ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1364        ///
1365        ///     loop {
1366        ///         // Wait for the pipe to be readable
1367        ///         client.readable().await?;
1368        ///
1369        ///         let mut buf = Vec::with_capacity(4096);
1370        ///
1371        ///         // Try to read data, this may still fail with `WouldBlock`
1372        ///         // if the readiness event is a false positive.
1373        ///         match client.try_read_buf(&mut buf) {
1374        ///             Ok(0) => break,
1375        ///             Ok(n) => {
1376        ///                 println!("read {} bytes", n);
1377        ///             }
1378        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1379        ///                 continue;
1380        ///             }
1381        ///             Err(e) => {
1382        ///                 return Err(e.into());
1383        ///             }
1384        ///         }
1385        ///     }
1386        ///
1387        ///     Ok(())
1388        /// }
1389        /// ```
1390        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1391            self.io.registration().try_io(Interest::READABLE, || {
1392                use std::io::Read;
1393
1394                let dst = buf.chunk_mut();
1395                let dst =
1396                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1397
1398                // Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
1399                // buffer.
1400                let n = (&*self.io).read(dst)?;
1401
1402                unsafe {
1403                    buf.advance_mut(n);
1404                }
1405
1406                Ok(n)
1407            })
1408        }
1409    }
1410
1411    /// Waits for the pipe to become writable.
1412    ///
1413    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1414    /// paired with `try_write()`.
1415    ///
1416    /// # Examples
1417    ///
1418    /// ```no_run
1419    /// use tokio::net::windows::named_pipe;
1420    /// use std::error::Error;
1421    /// use std::io;
1422    ///
1423    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1424    ///
1425    /// #[tokio::main]
1426    /// async fn main() -> Result<(), Box<dyn Error>> {
1427    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1428    ///
1429    ///     loop {
1430    ///         // Wait for the pipe to be writable
1431    ///         client.writable().await?;
1432    ///
1433    ///         // Try to write data, this may still fail with `WouldBlock`
1434    ///         // if the readiness event is a false positive.
1435    ///         match client.try_write(b"hello world") {
1436    ///             Ok(n) => {
1437    ///                 break;
1438    ///             }
1439    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1440    ///                 continue;
1441    ///             }
1442    ///             Err(e) => {
1443    ///                 return Err(e.into());
1444    ///             }
1445    ///         }
1446    ///     }
1447    ///
1448    ///     Ok(())
1449    /// }
1450    /// ```
1451    pub async fn writable(&self) -> io::Result<()> {
1452        self.ready(Interest::WRITABLE).await?;
1453        Ok(())
1454    }
1455
1456    /// Polls for write readiness.
1457    ///
1458    /// If the pipe is not currently ready for writing, this method will
1459    /// store a clone of the `Waker` from the provided `Context`. When the pipe
1460    /// becomes ready for writing, `Waker::wake` will be called on the waker.
1461    ///
1462    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1463    /// the `Waker` from the `Context` passed to the most recent call is
1464    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1465    /// second, independent waker.)
1466    ///
1467    /// This function is intended for cases where creating and pinning a future
1468    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1469    /// preferred, as this supports polling from multiple tasks at once.
1470    ///
1471    /// # Return value
1472    ///
1473    /// The function returns:
1474    ///
1475    /// * `Poll::Pending` if the pipe is not ready for writing.
1476    /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1477    /// * `Poll::Ready(Err(e))` if an error is encountered.
1478    ///
1479    /// # Errors
1480    ///
1481    /// This function may encounter any standard I/O error except `WouldBlock`.
1482    ///
1483    /// [`writable`]: method@Self::writable
1484    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1485        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1486    }
1487
1488    /// Tries to write a buffer to the pipe, returning how many bytes were
1489    /// written.
1490    ///
1491    /// The function will attempt to write the entire contents of `buf`, but
1492    /// only part of the buffer may be written.
1493    ///
1494    /// This function is usually paired with `writable()`.
1495    ///
1496    /// # Return
1497    ///
1498    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1499    /// number of bytes written. If the pipe is not ready to write data,
1500    /// `Err(io::ErrorKind::WouldBlock)` is returned.
1501    ///
1502    /// # Examples
1503    ///
1504    /// ```no_run
1505    /// use tokio::net::windows::named_pipe;
1506    /// use std::error::Error;
1507    /// use std::io;
1508    ///
1509    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1510    ///
1511    /// #[tokio::main]
1512    /// async fn main() -> Result<(), Box<dyn Error>> {
1513    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1514    ///
1515    ///     loop {
1516    ///         // Wait for the pipe to be writable
1517    ///         client.writable().await?;
1518    ///
1519    ///         // Try to write data, this may still fail with `WouldBlock`
1520    ///         // if the readiness event is a false positive.
1521    ///         match client.try_write(b"hello world") {
1522    ///             Ok(n) => {
1523    ///                 break;
1524    ///             }
1525    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1526    ///                 continue;
1527    ///             }
1528    ///             Err(e) => {
1529    ///                 return Err(e.into());
1530    ///             }
1531    ///         }
1532    ///     }
1533    ///
1534    ///     Ok(())
1535    /// }
1536    /// ```
1537    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1538        self.io
1539            .registration()
1540            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1541    }
1542
1543    /// Tries to write several buffers to the pipe, returning how many bytes
1544    /// were written.
1545    ///
1546    /// Data is written from each buffer in order, with the final buffer read
1547    /// from possible being only partially consumed. This method behaves
1548    /// equivalently to a single call to [`try_write()`] with concatenated
1549    /// buffers.
1550    ///
1551    /// This function is usually paired with `writable()`.
1552    ///
1553    /// [`try_write()`]: NamedPipeClient::try_write()
1554    ///
1555    /// # Return
1556    ///
1557    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1558    /// number of bytes written. If the pipe is not ready to write data,
1559    /// `Err(io::ErrorKind::WouldBlock)` is returned.
1560    ///
1561    /// # Examples
1562    ///
1563    /// ```no_run
1564    /// use tokio::net::windows::named_pipe;
1565    /// use std::error::Error;
1566    /// use std::io;
1567    ///
1568    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1569    ///
1570    /// #[tokio::main]
1571    /// async fn main() -> Result<(), Box<dyn Error>> {
1572    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1573    ///
1574    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1575    ///
1576    ///     loop {
1577    ///         // Wait for the pipe to be writable
1578    ///         client.writable().await?;
1579    ///
1580    ///         // Try to write data, this may still fail with `WouldBlock`
1581    ///         // if the readiness event is a false positive.
1582    ///         match client.try_write_vectored(&bufs) {
1583    ///             Ok(n) => {
1584    ///                 break;
1585    ///             }
1586    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1587    ///                 continue;
1588    ///             }
1589    ///             Err(e) => {
1590    ///                 return Err(e.into());
1591    ///             }
1592    ///         }
1593    ///     }
1594    ///
1595    ///     Ok(())
1596    /// }
1597    /// ```
1598    pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1599        self.io
1600            .registration()
1601            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1602    }
1603
1604    /// Tries to read or write from the pipe using a user-provided IO operation.
1605    ///
1606    /// If the pipe is ready, the provided closure is called. The closure
1607    /// should attempt to perform IO operation from the pipe by manually
1608    /// calling the appropriate syscall. If the operation fails because the
1609    /// pipe is not actually ready, then the closure should return a
1610    /// `WouldBlock` error and the readiness flag is cleared. The return value
1611    /// of the closure is then returned by `try_io`.
1612    ///
1613    /// If the pipe is not ready, then the closure is not called
1614    /// and a `WouldBlock` error is returned.
1615    ///
1616    /// The closure should only return a `WouldBlock` error if it has performed
1617    /// an IO operation on the pipe that failed due to the pipe not being
1618    /// ready. Returning a `WouldBlock` error in any other situation will
1619    /// incorrectly clear the readiness flag, which can cause the pipe to
1620    /// behave incorrectly.
1621    ///
1622    /// The closure should not perform the IO operation using any of the methods
1623    /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1624    /// readiness flag and can cause the pipe to behave incorrectly.
1625    ///
1626    /// This method is not intended to be used with combined interests.
1627    /// The closure should perform only one type of IO operation, so it should not
1628    /// require more than one ready state. This method may panic or sleep forever
1629    /// if it is called with a combined interest.
1630    ///
1631    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1632    ///
1633    /// [`readable()`]: NamedPipeClient::readable()
1634    /// [`writable()`]: NamedPipeClient::writable()
1635    /// [`ready()`]: NamedPipeClient::ready()
1636    pub fn try_io<R>(
1637        &self,
1638        interest: Interest,
1639        f: impl FnOnce() -> io::Result<R>,
1640    ) -> io::Result<R> {
1641        self.io.registration().try_io(interest, f)
1642    }
1643
1644    /// Reads or writes from the pipe using a user-provided IO operation.
1645    ///
1646    /// The readiness of the pipe is awaited and when the pipe is ready,
1647    /// the provided closure is called. The closure should attempt to perform
1648    /// IO operation on the pipe by manually calling the appropriate syscall.
1649    /// If the operation fails because the pipe is not actually ready,
1650    /// then the closure should return a `WouldBlock` error. In such case the
1651    /// readiness flag is cleared and the pipe readiness is awaited again.
1652    /// This loop is repeated until the closure returns an `Ok` or an error
1653    /// other than `WouldBlock`.
1654    ///
1655    /// The closure should only return a `WouldBlock` error if it has performed
1656    /// an IO operation on the pipe that failed due to the pipe not being
1657    /// ready. Returning a `WouldBlock` error in any other situation will
1658    /// incorrectly clear the readiness flag, which can cause the pipe to
1659    /// behave incorrectly.
1660    ///
1661    /// The closure should not perform the IO operation using any of the methods
1662    /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1663    /// readiness flag and can cause the pipe to behave incorrectly.
1664    ///
1665    /// This method is not intended to be used with combined interests.
1666    /// The closure should perform only one type of IO operation, so it should not
1667    /// require more than one ready state. This method may panic or sleep forever
1668    /// if it is called with a combined interest.
1669    pub async fn async_io<R>(
1670        &self,
1671        interest: Interest,
1672        f: impl FnMut() -> io::Result<R>,
1673    ) -> io::Result<R> {
1674        self.io.registration().async_io(interest, f).await
1675    }
1676}
1677
1678impl AsyncRead for NamedPipeClient {
1679    fn poll_read(
1680        self: Pin<&mut Self>,
1681        cx: &mut Context<'_>,
1682        buf: &mut ReadBuf<'_>,
1683    ) -> Poll<io::Result<()>> {
1684        unsafe { self.io.poll_read(cx, buf) }
1685    }
1686}
1687
1688impl AsyncWrite for NamedPipeClient {
1689    fn poll_write(
1690        self: Pin<&mut Self>,
1691        cx: &mut Context<'_>,
1692        buf: &[u8],
1693    ) -> Poll<io::Result<usize>> {
1694        self.io.poll_write(cx, buf)
1695    }
1696
1697    fn poll_write_vectored(
1698        self: Pin<&mut Self>,
1699        cx: &mut Context<'_>,
1700        bufs: &[io::IoSlice<'_>],
1701    ) -> Poll<io::Result<usize>> {
1702        self.io.poll_write_vectored(cx, bufs)
1703    }
1704
1705    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1706        Poll::Ready(Ok(()))
1707    }
1708
1709    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1710        self.poll_flush(cx)
1711    }
1712}
1713
1714impl AsRawHandle for NamedPipeClient {
1715    fn as_raw_handle(&self) -> RawHandle {
1716        self.io.as_raw_handle()
1717    }
1718}
1719
1720impl AsHandle for NamedPipeClient {
1721    fn as_handle(&self) -> BorrowedHandle<'_> {
1722        unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
1723    }
1724}
1725
1726/// A builder structure for construct a named pipe with named pipe-specific
1727/// options. This is required to use for named pipe servers who wants to modify
1728/// pipe-related options.
1729///
1730/// See [`ServerOptions::create`].
1731#[derive(Debug, Clone)]
1732pub struct ServerOptions {
1733    // dwOpenMode
1734    access_inbound: bool,
1735    access_outbound: bool,
1736    first_pipe_instance: bool,
1737    write_dac: bool,
1738    write_owner: bool,
1739    access_system_security: bool,
1740    // dwPipeMode
1741    pipe_mode: PipeMode,
1742    reject_remote_clients: bool,
1743    // other options
1744    max_instances: u32,
1745    out_buffer_size: u32,
1746    in_buffer_size: u32,
1747    default_timeout: u32,
1748}
1749
1750impl ServerOptions {
1751    /// Creates a new named pipe builder with the default settings.
1752    ///
1753    /// ```
1754    /// use tokio::net::windows::named_pipe::ServerOptions;
1755    ///
1756    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1757    ///
1758    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1759    /// let server = ServerOptions::new().create(PIPE_NAME)?;
1760    /// # Ok(()) }
1761    /// ```
1762    pub fn new() -> ServerOptions {
1763        ServerOptions {
1764            access_inbound: true,
1765            access_outbound: true,
1766            first_pipe_instance: false,
1767            write_dac: false,
1768            write_owner: false,
1769            access_system_security: false,
1770            pipe_mode: PipeMode::Byte,
1771            reject_remote_clients: true,
1772            max_instances: windows_sys::PIPE_UNLIMITED_INSTANCES,
1773            out_buffer_size: 65536,
1774            in_buffer_size: 65536,
1775            default_timeout: 0,
1776        }
1777    }
1778
1779    /// The pipe mode.
1780    ///
1781    /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1782    /// documentation of what each mode means.
1783    ///
1784    /// This corresponds to specifying `PIPE_TYPE_` and `PIPE_READMODE_` in  [`dwPipeMode`].
1785    ///
1786    /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
1787    pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1788        self.pipe_mode = pipe_mode;
1789        self
1790    }
1791
1792    /// The flow of data in the pipe goes from client to server only.
1793    ///
1794    /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1795    ///
1796    /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1797    ///
1798    /// # Errors
1799    ///
1800    /// Server side prevents connecting by denying inbound access, client errors
1801    /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1802    /// the connection.
1803    ///
1804    /// ```
1805    /// use std::io;
1806    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1807    ///
1808    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1809    ///
1810    /// # #[tokio::main] async fn main() -> io::Result<()> {
1811    /// let _server = ServerOptions::new()
1812    ///     .access_inbound(false)
1813    ///     .create(PIPE_NAME)?;
1814    ///
1815    /// let e = ClientOptions::new()
1816    ///     .open(PIPE_NAME)
1817    ///     .unwrap_err();
1818    ///
1819    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1820    /// # Ok(()) }
1821    /// ```
1822    ///
1823    /// Disabling writing allows a client to connect, but errors with
1824    /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1825    ///
1826    /// ```
1827    /// use std::io;
1828    /// use tokio::io::AsyncWriteExt;
1829    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1830    ///
1831    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1832    ///
1833    /// # #[tokio::main] async fn main() -> io::Result<()> {
1834    /// let server = ServerOptions::new()
1835    ///     .access_inbound(false)
1836    ///     .create(PIPE_NAME)?;
1837    ///
1838    /// let mut client = ClientOptions::new()
1839    ///     .write(false)
1840    ///     .open(PIPE_NAME)?;
1841    ///
1842    /// server.connect().await?;
1843    ///
1844    /// let e = client.write(b"ping").await.unwrap_err();
1845    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1846    /// # Ok(()) }
1847    /// ```
1848    ///
1849    /// # Examples
1850    ///
1851    /// A unidirectional named pipe that only supports server-to-client
1852    /// communication.
1853    ///
1854    /// ```
1855    /// use std::io;
1856    /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1857    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1858    ///
1859    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1860    ///
1861    /// # #[tokio::main] async fn main() -> io::Result<()> {
1862    /// let mut server = ServerOptions::new()
1863    ///     .access_inbound(false)
1864    ///     .create(PIPE_NAME)?;
1865    ///
1866    /// let mut client = ClientOptions::new()
1867    ///     .write(false)
1868    ///     .open(PIPE_NAME)?;
1869    ///
1870    /// server.connect().await?;
1871    ///
1872    /// let write = server.write_all(b"ping");
1873    ///
1874    /// let mut buf = [0u8; 4];
1875    /// let read = client.read_exact(&mut buf);
1876    ///
1877    /// let ((), read) = tokio::try_join!(write, read)?;
1878    ///
1879    /// assert_eq!(read, 4);
1880    /// assert_eq!(&buf[..], b"ping");
1881    /// # Ok(()) }
1882    /// ```
1883    pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1884        self.access_inbound = allowed;
1885        self
1886    }
1887
1888    /// The flow of data in the pipe goes from server to client only.
1889    ///
1890    /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1891    ///
1892    /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1893    ///
1894    /// # Errors
1895    ///
1896    /// Server side prevents connecting by denying outbound access, client
1897    /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1898    /// create the connection.
1899    ///
1900    /// ```
1901    /// use std::io;
1902    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1903    ///
1904    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1905    ///
1906    /// # #[tokio::main] async fn main() -> io::Result<()> {
1907    /// let server = ServerOptions::new()
1908    ///     .access_outbound(false)
1909    ///     .create(PIPE_NAME)?;
1910    ///
1911    /// let e = ClientOptions::new()
1912    ///     .open(PIPE_NAME)
1913    ///     .unwrap_err();
1914    ///
1915    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1916    /// # Ok(()) }
1917    /// ```
1918    ///
1919    /// Disabling reading allows a client to connect, but attempting to read
1920    /// will error with [`std::io::ErrorKind::PermissionDenied`].
1921    ///
1922    /// ```
1923    /// use std::io;
1924    /// use tokio::io::AsyncReadExt;
1925    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1926    ///
1927    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1928    ///
1929    /// # #[tokio::main] async fn main() -> io::Result<()> {
1930    /// let server = ServerOptions::new()
1931    ///     .access_outbound(false)
1932    ///     .create(PIPE_NAME)?;
1933    ///
1934    /// let mut client = ClientOptions::new()
1935    ///     .read(false)
1936    ///     .open(PIPE_NAME)?;
1937    ///
1938    /// server.connect().await?;
1939    ///
1940    /// let mut buf = [0u8; 4];
1941    /// let e = client.read(&mut buf).await.unwrap_err();
1942    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1943    /// # Ok(()) }
1944    /// ```
1945    ///
1946    /// # Examples
1947    ///
1948    /// A unidirectional named pipe that only supports client-to-server
1949    /// communication.
1950    ///
1951    /// ```
1952    /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1953    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1954    ///
1955    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1956    ///
1957    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1958    /// let mut server = ServerOptions::new()
1959    ///     .access_outbound(false)
1960    ///     .create(PIPE_NAME)?;
1961    ///
1962    /// let mut client = ClientOptions::new()
1963    ///     .read(false)
1964    ///     .open(PIPE_NAME)?;
1965    ///
1966    /// server.connect().await?;
1967    ///
1968    /// let write = client.write_all(b"ping");
1969    ///
1970    /// let mut buf = [0u8; 4];
1971    /// let read = server.read_exact(&mut buf);
1972    ///
1973    /// let ((), read) = tokio::try_join!(write, read)?;
1974    ///
1975    /// println!("done reading and writing");
1976    ///
1977    /// assert_eq!(read, 4);
1978    /// assert_eq!(&buf[..], b"ping");
1979    /// # Ok(()) }
1980    /// ```
1981    pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1982        self.access_outbound = allowed;
1983        self
1984    }
1985
1986    /// If you attempt to create multiple instances of a pipe with this flag
1987    /// set, creation of the first server instance succeeds, but creation of any
1988    /// subsequent instances will fail with
1989    /// [`std::io::ErrorKind::PermissionDenied`].
1990    ///
1991    /// This option is intended to be used with servers that want to ensure that
1992    /// they are the only process listening for clients on a given named pipe.
1993    /// This is accomplished by enabling it for the first server instance
1994    /// created in a process.
1995    ///
1996    /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1997    ///
1998    /// # Errors
1999    ///
2000    /// If this option is set and more than one instance of the server for a
2001    /// given named pipe exists, calling [`create`] will fail with
2002    /// [`std::io::ErrorKind::PermissionDenied`].
2003    ///
2004    /// ```
2005    /// use std::io;
2006    /// use tokio::net::windows::named_pipe::ServerOptions;
2007    ///
2008    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
2009    ///
2010    /// # #[tokio::main] async fn main() -> io::Result<()> {
2011    /// let server1 = ServerOptions::new()
2012    ///     .first_pipe_instance(true)
2013    ///     .create(PIPE_NAME)?;
2014    ///
2015    /// // Second server errs, since it's not the first instance.
2016    /// let e = ServerOptions::new()
2017    ///     .first_pipe_instance(true)
2018    ///     .create(PIPE_NAME)
2019    ///     .unwrap_err();
2020    ///
2021    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2022    /// # Ok(()) }
2023    /// ```
2024    ///
2025    /// # Examples
2026    ///
2027    /// ```
2028    /// use std::io;
2029    /// use tokio::net::windows::named_pipe::ServerOptions;
2030    ///
2031    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
2032    ///
2033    /// # #[tokio::main] async fn main() -> io::Result<()> {
2034    /// let mut builder = ServerOptions::new();
2035    /// builder.first_pipe_instance(true);
2036    ///
2037    /// let server = builder.create(PIPE_NAME)?;
2038    /// let e = builder.create(PIPE_NAME).unwrap_err();
2039    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2040    /// drop(server);
2041    ///
2042    /// // OK: since, we've closed the other instance.
2043    /// let _server2 = builder.create(PIPE_NAME)?;
2044    /// # Ok(()) }
2045    /// ```
2046    ///
2047    /// [`create`]: ServerOptions::create
2048    /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
2049    pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
2050        self.first_pipe_instance = first;
2051        self
2052    }
2053
2054    /// Requests permission to modify the pipe's discretionary access control list.
2055    ///
2056    /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
2057    ///
2058    /// # Examples
2059    ///
2060    /// ```
2061    /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2062    ///
2063    /// use tokio::net::windows::named_pipe::ServerOptions;
2064    /// use windows_sys::{
2065    ///     Win32::Foundation::ERROR_SUCCESS,
2066    ///     Win32::Security::DACL_SECURITY_INFORMATION,
2067    ///     Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2068    /// };
2069    ///
2070    /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
2071    ///
2072    /// # #[tokio::main] async fn main() -> io::Result<()> {
2073    /// let mut pipe_template = ServerOptions::new();
2074    /// pipe_template.write_dac(true);
2075    /// let pipe = pipe_template.create(PIPE_NAME)?;
2076    ///
2077    /// unsafe {
2078    ///     assert_eq!(
2079    ///         ERROR_SUCCESS,
2080    ///         SetSecurityInfo(
2081    ///             pipe.as_raw_handle() as _,
2082    ///             SE_KERNEL_OBJECT,
2083    ///             DACL_SECURITY_INFORMATION,
2084    ///             ptr::null_mut(),
2085    ///             ptr::null_mut(),
2086    ///             ptr::null_mut(),
2087    ///             ptr::null_mut(),
2088    ///         )
2089    ///     );
2090    /// }
2091    ///
2092    /// # Ok(()) }
2093    /// ```
2094    ///
2095    /// ```
2096    /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2097    ///
2098    /// use tokio::net::windows::named_pipe::ServerOptions;
2099    /// use windows_sys::{
2100    ///     Win32::Foundation::ERROR_ACCESS_DENIED,
2101    ///     Win32::Security::DACL_SECURITY_INFORMATION,
2102    ///     Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2103    /// };
2104    ///
2105    /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
2106    ///
2107    /// # #[tokio::main] async fn main() -> io::Result<()> {
2108    /// let mut pipe_template = ServerOptions::new();
2109    /// pipe_template.write_dac(false);
2110    /// let pipe = pipe_template.create(PIPE_NAME)?;
2111    ///
2112    /// unsafe {
2113    ///     assert_eq!(
2114    ///         ERROR_ACCESS_DENIED,
2115    ///         SetSecurityInfo(
2116    ///             pipe.as_raw_handle() as _,
2117    ///             SE_KERNEL_OBJECT,
2118    ///             DACL_SECURITY_INFORMATION,
2119    ///             ptr::null_mut(),
2120    ///             ptr::null_mut(),
2121    ///             ptr::null_mut(),
2122    ///             ptr::null_mut(),
2123    ///         )
2124    ///     );
2125    /// }
2126    ///
2127    /// # Ok(()) }
2128    /// ```
2129    ///
2130    /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2131    pub fn write_dac(&mut self, requested: bool) -> &mut Self {
2132        self.write_dac = requested;
2133        self
2134    }
2135
2136    /// Requests permission to modify the pipe's owner.
2137    ///
2138    /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
2139    ///
2140    /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2141    pub fn write_owner(&mut self, requested: bool) -> &mut Self {
2142        self.write_owner = requested;
2143        self
2144    }
2145
2146    /// Requests permission to modify the pipe's system access control list.
2147    ///
2148    /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
2149    ///
2150    /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2151    pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
2152        self.access_system_security = requested;
2153        self
2154    }
2155
2156    /// Indicates whether this server can accept remote clients or not. Remote
2157    /// clients are disabled by default.
2158    ///
2159    /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
2160    ///
2161    /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
2162    pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
2163        self.reject_remote_clients = reject;
2164        self
2165    }
2166
2167    /// The maximum number of instances that can be created for this pipe. The
2168    /// first instance of the pipe can specify this value; the same number must
2169    /// be specified for other instances of the pipe. Acceptable values are in
2170    /// the range 1 through 254. The default value is unlimited.
2171    ///
2172    /// This corresponds to specifying [`nMaxInstances`].
2173    ///
2174    /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2175    ///
2176    /// # Errors
2177    ///
2178    /// The same numbers of `max_instances` have to be used by all servers. Any
2179    /// additional servers trying to be built which uses a mismatching value
2180    /// might error.
2181    ///
2182    /// ```
2183    /// use std::io;
2184    /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2185    /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2186    ///
2187    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
2188    ///
2189    /// # #[tokio::main] async fn main() -> io::Result<()> {
2190    /// let mut server = ServerOptions::new();
2191    /// server.max_instances(2);
2192    ///
2193    /// let s1 = server.create(PIPE_NAME)?;
2194    /// let c1 = ClientOptions::new().open(PIPE_NAME);
2195    ///
2196    /// let s2 = server.create(PIPE_NAME)?;
2197    /// let c2 = ClientOptions::new().open(PIPE_NAME);
2198    ///
2199    /// // Too many servers!
2200    /// let e = server.create(PIPE_NAME).unwrap_err();
2201    /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2202    ///
2203    /// // Still too many servers even if we specify a higher value!
2204    /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
2205    /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2206    /// # Ok(()) }
2207    /// ```
2208    ///
2209    /// # Panics
2210    ///
2211    /// This function will panic if more than 254 instances are specified. If
2212    /// you do not wish to set an instance limit, leave it unspecified.
2213    ///
2214    /// ```should_panic
2215    /// use tokio::net::windows::named_pipe::ServerOptions;
2216    ///
2217    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2218    /// let builder = ServerOptions::new().max_instances(255);
2219    /// # Ok(()) }
2220    /// ```
2221    #[track_caller]
2222    pub fn max_instances(&mut self, instances: usize) -> &mut Self {
2223        assert!(instances < 255, "cannot specify more than 254 instances");
2224        self.max_instances = instances as u32;
2225        self
2226    }
2227
2228    /// The number of bytes to reserve for the output buffer.
2229    ///
2230    /// This corresponds to specifying [`nOutBufferSize`].
2231    ///
2232    /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2233    pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
2234        self.out_buffer_size = buffer;
2235        self
2236    }
2237
2238    /// The number of bytes to reserve for the input buffer.
2239    ///
2240    /// This corresponds to specifying [`nInBufferSize`].
2241    ///
2242    /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2243    pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
2244        self.in_buffer_size = buffer;
2245        self
2246    }
2247
2248    /// Creates the named pipe identified by `addr` for use as a server.
2249    ///
2250    /// This uses the [`CreateNamedPipe`] function.
2251    ///
2252    /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2253    ///
2254    /// # Errors
2255    ///
2256    /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2257    /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2258    ///
2259    /// [Tokio Runtime]: crate::runtime::Runtime
2260    /// [enabled I/O]: crate::runtime::Builder::enable_io
2261    ///
2262    /// # Examples
2263    ///
2264    /// ```
2265    /// use tokio::net::windows::named_pipe::ServerOptions;
2266    ///
2267    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
2268    ///
2269    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2270    /// let server = ServerOptions::new().create(PIPE_NAME)?;
2271    /// # Ok(()) }
2272    /// ```
2273    pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
2274        // Safety: We're calling create_with_security_attributes_raw w/ a null
2275        // pointer which disables it.
2276        unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
2277    }
2278
2279    /// Creates the named pipe identified by `addr` for use as a server.
2280    ///
2281    /// This is the same as [`create`] except that it supports providing the raw
2282    /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2283    /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2284    ///
2285    /// # Errors
2286    ///
2287    /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2288    /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2289    ///
2290    /// [Tokio Runtime]: crate::runtime::Runtime
2291    /// [enabled I/O]: crate::runtime::Builder::enable_io
2292    ///
2293    /// # Safety
2294    ///
2295    /// The `attrs` argument must either be null or point at a valid instance of
2296    /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2297    /// behavior is identical to calling the [`create`] method.
2298    ///
2299    /// [`create`]: ServerOptions::create
2300    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2301    /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
2302    pub unsafe fn create_with_security_attributes_raw(
2303        &self,
2304        addr: impl AsRef<OsStr>,
2305        attrs: *mut c_void,
2306    ) -> io::Result<NamedPipeServer> {
2307        let addr = encode_addr(addr);
2308
2309        let pipe_mode = {
2310            let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
2311                windows_sys::PIPE_TYPE_MESSAGE | windows_sys::PIPE_READMODE_MESSAGE
2312            } else {
2313                windows_sys::PIPE_TYPE_BYTE | windows_sys::PIPE_READMODE_BYTE
2314            };
2315            if self.reject_remote_clients {
2316                mode |= windows_sys::PIPE_REJECT_REMOTE_CLIENTS;
2317            } else {
2318                mode |= windows_sys::PIPE_ACCEPT_REMOTE_CLIENTS;
2319            }
2320            mode
2321        };
2322        let open_mode = {
2323            let mut mode = windows_sys::FILE_FLAG_OVERLAPPED;
2324            if self.access_inbound {
2325                mode |= windows_sys::PIPE_ACCESS_INBOUND;
2326            }
2327            if self.access_outbound {
2328                mode |= windows_sys::PIPE_ACCESS_OUTBOUND;
2329            }
2330            if self.first_pipe_instance {
2331                mode |= windows_sys::FILE_FLAG_FIRST_PIPE_INSTANCE;
2332            }
2333            if self.write_dac {
2334                mode |= windows_sys::WRITE_DAC;
2335            }
2336            if self.write_owner {
2337                mode |= windows_sys::WRITE_OWNER;
2338            }
2339            if self.access_system_security {
2340                mode |= windows_sys::ACCESS_SYSTEM_SECURITY;
2341            }
2342            mode
2343        };
2344
2345        let h = windows_sys::CreateNamedPipeW(
2346            addr.as_ptr(),
2347            open_mode,
2348            pipe_mode,
2349            self.max_instances,
2350            self.out_buffer_size,
2351            self.in_buffer_size,
2352            self.default_timeout,
2353            attrs as *mut _,
2354        );
2355
2356        if h == windows_sys::INVALID_HANDLE_VALUE {
2357            return Err(io::Error::last_os_error());
2358        }
2359
2360        NamedPipeServer::from_raw_handle(h as _)
2361    }
2362}
2363
2364/// A builder suitable for building and interacting with named pipes from the
2365/// client side.
2366///
2367/// See [`ClientOptions::open`].
2368#[derive(Debug, Clone)]
2369pub struct ClientOptions {
2370    generic_read: bool,
2371    generic_write: bool,
2372    security_qos_flags: u32,
2373    pipe_mode: PipeMode,
2374}
2375
2376impl ClientOptions {
2377    /// Creates a new named pipe builder with the default settings.
2378    ///
2379    /// ```
2380    /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2381    ///
2382    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
2383    ///
2384    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2385    /// // Server must be created in order for the client creation to succeed.
2386    /// let server = ServerOptions::new().create(PIPE_NAME)?;
2387    /// let client = ClientOptions::new().open(PIPE_NAME)?;
2388    /// # Ok(()) }
2389    /// ```
2390    pub fn new() -> Self {
2391        Self {
2392            generic_read: true,
2393            generic_write: true,
2394            security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
2395                | windows_sys::SECURITY_SQOS_PRESENT,
2396            pipe_mode: PipeMode::Byte,
2397        }
2398    }
2399
2400    /// If the client supports reading data. This is enabled by default.
2401    ///
2402    /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
2403    ///
2404    /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2405    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2406    pub fn read(&mut self, allowed: bool) -> &mut Self {
2407        self.generic_read = allowed;
2408        self
2409    }
2410
2411    /// If the created pipe supports writing data. This is enabled by default.
2412    ///
2413    /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2414    ///
2415    /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2416    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2417    pub fn write(&mut self, allowed: bool) -> &mut Self {
2418        self.generic_write = allowed;
2419        self
2420    }
2421
2422    /// Sets qos flags which are combined with other flags and attributes in the
2423    /// call to [`CreateFile`].
2424    ///
2425    /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2426    /// calling this function would override that value completely with the
2427    /// argument specified.
2428    ///
2429    /// When `security_qos_flags` is not set, a malicious program can gain the
2430    /// elevated privileges of a privileged Rust process when it allows opening
2431    /// user-specified paths, by tricking it into opening a named pipe. So
2432    /// arguably `security_qos_flags` should also be set when opening arbitrary
2433    /// paths. However the bits can then conflict with other flags, specifically
2434    /// `FILE_FLAG_OPEN_NO_RECALL`.
2435    ///
2436    /// For information about possible values, see [Impersonation Levels] on the
2437    /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2438    /// automatically when using this method.
2439    ///
2440    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2441    /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
2442    /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
2443    pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2444        // See: https://github.com/rust-lang/rust/pull/58216
2445        self.security_qos_flags = flags | windows_sys::SECURITY_SQOS_PRESENT;
2446        self
2447    }
2448
2449    /// The pipe mode.
2450    ///
2451    /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
2452    /// documentation of what each mode means.
2453    pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
2454        self.pipe_mode = pipe_mode;
2455        self
2456    }
2457
2458    /// Opens the named pipe identified by `addr`.
2459    ///
2460    /// This opens the client using [`CreateFile`] with the
2461    /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2462    ///
2463    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2464    ///
2465    /// # Errors
2466    ///
2467    /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2468    /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2469    ///
2470    /// There are a few errors you need to take into account when creating a
2471    /// named pipe on the client side:
2472    ///
2473    /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2474    ///   does not exist. Presumably the server is not up.
2475    /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2476    ///   but the server is not currently waiting for a connection. Please see the
2477    ///   examples for how to check for this error.
2478    ///
2479    /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
2480    /// [enabled I/O]: crate::runtime::Builder::enable_io
2481    /// [Tokio Runtime]: crate::runtime::Runtime
2482    ///
2483    /// A connect loop that waits until a pipe becomes available looks like
2484    /// this:
2485    ///
2486    /// ```no_run
2487    /// use std::time::Duration;
2488    /// use tokio::net::windows::named_pipe::ClientOptions;
2489    /// use tokio::time;
2490    /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2491    ///
2492    /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2493    ///
2494    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2495    /// let client = loop {
2496    ///     match ClientOptions::new().open(PIPE_NAME) {
2497    ///         Ok(client) => break client,
2498    ///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
2499    ///         Err(e) => return Err(e),
2500    ///     }
2501    ///
2502    ///     time::sleep(Duration::from_millis(50)).await;
2503    /// };
2504    ///
2505    /// // use the connected client.
2506    /// # Ok(()) }
2507    /// ```
2508    pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2509        // Safety: We're calling open_with_security_attributes_raw w/ a null
2510        // pointer which disables it.
2511        unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2512    }
2513
2514    /// Opens the named pipe identified by `addr`.
2515    ///
2516    /// This is the same as [`open`] except that it supports providing the raw
2517    /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2518    /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2519    ///
2520    /// # Safety
2521    ///
2522    /// The `attrs` argument must either be null or point at a valid instance of
2523    /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2524    /// behavior is identical to calling the [`open`] method.
2525    ///
2526    /// [`open`]: ClientOptions::open
2527    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2528    /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
2529    pub unsafe fn open_with_security_attributes_raw(
2530        &self,
2531        addr: impl AsRef<OsStr>,
2532        attrs: *mut c_void,
2533    ) -> io::Result<NamedPipeClient> {
2534        let addr = encode_addr(addr);
2535
2536        let desired_access = {
2537            let mut access = 0;
2538            if self.generic_read {
2539                access |= windows_sys::GENERIC_READ;
2540            }
2541            if self.generic_write {
2542                access |= windows_sys::GENERIC_WRITE;
2543            }
2544            access
2545        };
2546
2547        // NB: We could use a platform specialized `OpenOptions` here, but since
2548        // we have access to windows_sys it ultimately doesn't hurt to use
2549        // `CreateFile` explicitly since it allows the use of our already
2550        // well-structured wide `addr` to pass into CreateFileW.
2551        let h = windows_sys::CreateFileW(
2552            addr.as_ptr(),
2553            desired_access,
2554            0,
2555            attrs as *mut _,
2556            windows_sys::OPEN_EXISTING,
2557            self.get_flags(),
2558            0,
2559        );
2560
2561        if h == windows_sys::INVALID_HANDLE_VALUE {
2562            return Err(io::Error::last_os_error());
2563        }
2564
2565        if matches!(self.pipe_mode, PipeMode::Message) {
2566            let mode = windows_sys::PIPE_READMODE_MESSAGE;
2567            let result =
2568                windows_sys::SetNamedPipeHandleState(h, &mode, ptr::null_mut(), ptr::null_mut());
2569
2570            if result == 0 {
2571                return Err(io::Error::last_os_error());
2572            }
2573        }
2574
2575        NamedPipeClient::from_raw_handle(h as _)
2576    }
2577
2578    fn get_flags(&self) -> u32 {
2579        self.security_qos_flags | windows_sys::FILE_FLAG_OVERLAPPED
2580    }
2581}
2582
2583/// The pipe mode of a named pipe.
2584///
2585/// Set through [`ServerOptions::pipe_mode`].
2586#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2587#[non_exhaustive]
2588pub enum PipeMode {
2589    /// Data is written to the pipe as a stream of bytes. The pipe does not
2590    /// distinguish bytes written during different write operations.
2591    ///
2592    /// Corresponds to [`PIPE_TYPE_BYTE`].
2593    ///
2594    /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
2595    Byte,
2596    /// Data is written to the pipe as a stream of messages. The pipe treats the
2597    /// bytes written during each write operation as a message unit. Any reading
2598    /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2599    /// completely.
2600    ///
2601    /// Corresponds to [`PIPE_TYPE_MESSAGE`].
2602    ///
2603    /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
2604    /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
2605    Message,
2606}
2607
2608/// Indicates the end of a named pipe.
2609#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2610#[non_exhaustive]
2611pub enum PipeEnd {
2612    /// The named pipe refers to the client end of a named pipe instance.
2613    ///
2614    /// Corresponds to [`PIPE_CLIENT_END`].
2615    ///
2616    /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
2617    Client,
2618    /// The named pipe refers to the server end of a named pipe instance.
2619    ///
2620    /// Corresponds to [`PIPE_SERVER_END`].
2621    ///
2622    /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
2623    Server,
2624}
2625
2626/// Information about a named pipe.
2627///
2628/// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2629#[derive(Debug)]
2630#[non_exhaustive]
2631pub struct PipeInfo {
2632    /// Indicates the mode of a named pipe.
2633    pub mode: PipeMode,
2634    /// Indicates the end of a named pipe.
2635    pub end: PipeEnd,
2636    /// The maximum number of instances that can be created for this pipe.
2637    pub max_instances: u32,
2638    /// The number of bytes to reserve for the output buffer.
2639    pub out_buffer_size: u32,
2640    /// The number of bytes to reserve for the input buffer.
2641    pub in_buffer_size: u32,
2642}
2643
2644/// Encodes an address so that it is a null-terminated wide string.
2645fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2646    let len = addr.as_ref().encode_wide().count();
2647    let mut vec = Vec::with_capacity(len + 1);
2648    vec.extend(addr.as_ref().encode_wide());
2649    vec.push(0);
2650    vec.into_boxed_slice()
2651}
2652
2653/// Internal function to get the info out of a raw named pipe.
2654unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2655    let mut flags = 0;
2656    let mut out_buffer_size = 0;
2657    let mut in_buffer_size = 0;
2658    let mut max_instances = 0;
2659
2660    let result = windows_sys::GetNamedPipeInfo(
2661        handle as _,
2662        &mut flags,
2663        &mut out_buffer_size,
2664        &mut in_buffer_size,
2665        &mut max_instances,
2666    );
2667
2668    if result == 0 {
2669        return Err(io::Error::last_os_error());
2670    }
2671
2672    let mut end = PipeEnd::Client;
2673    let mut mode = PipeMode::Byte;
2674
2675    if flags & windows_sys::PIPE_SERVER_END != 0 {
2676        end = PipeEnd::Server;
2677    }
2678
2679    if flags & windows_sys::PIPE_TYPE_MESSAGE != 0 {
2680        mode = PipeMode::Message;
2681    }
2682
2683    Ok(PipeInfo {
2684        end,
2685        mode,
2686        out_buffer_size,
2687        in_buffer_size,
2688        max_instances,
2689    })
2690}