madsim_real_tokio/net/windows/named_pipe.rs
1//! Tokio support for [Windows named pipes].
2//!
3//! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4
5use std::ffi::c_void;
6use std::ffi::OsStr;
7use std::io::{self, Read, Write};
8use std::pin::Pin;
9use std::ptr;
10use std::task::{Context, Poll};
11
12use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
13use crate::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, RawHandle};
14
15cfg_io_util! {
16 use bytes::BufMut;
17}
18
19// Hide imports which are not used when generating documentation.
20#[cfg(not(docsrs))]
21mod doc {
22 pub(super) use crate::os::windows::ffi::OsStrExt;
23 pub(super) mod windows_sys {
24 pub(crate) use windows_sys::{
25 Win32::Foundation::*, Win32::Storage::FileSystem::*, Win32::System::Pipes::*,
26 Win32::System::SystemServices::*,
27 };
28 }
29 pub(super) use mio::windows as mio_windows;
30}
31
32// NB: none of these shows up in public API, so don't document them.
33#[cfg(docsrs)]
34mod doc {
35 pub(super) mod mio_windows {
36 pub type NamedPipe = crate::doc::NotDefinedHere;
37 }
38}
39
40use self::doc::*;
41
42/// A [Windows named pipe] server.
43///
44/// Accepting client connections involves creating a server with
45/// [`ServerOptions::create`] and waiting for clients to connect using
46/// [`NamedPipeServer::connect`].
47///
48/// To avoid having clients sporadically fail with
49/// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
50/// ensure that at least one server instance is available at all times. This
51/// means that the typical listen loop for a server is a bit involved, because
52/// we have to ensure that we never drop a server accidentally while a client
53/// might connect.
54///
55/// So a correctly implemented server looks like this:
56///
57/// ```no_run
58/// use std::io;
59/// use tokio::net::windows::named_pipe::ServerOptions;
60///
61/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
62///
63/// # #[tokio::main] async fn main() -> std::io::Result<()> {
64/// // The first server needs to be constructed early so that clients can
65/// // be correctly connected. Otherwise calling .wait will cause the client to
66/// // error.
67/// //
68/// // Here we also make use of `first_pipe_instance`, which will ensure that
69/// // there are no other servers up and running already.
70/// let mut server = ServerOptions::new()
71/// .first_pipe_instance(true)
72/// .create(PIPE_NAME)?;
73///
74/// // Spawn the server loop.
75/// let server = tokio::spawn(async move {
76/// loop {
77/// // Wait for a client to connect.
78/// let connected = server.connect().await?;
79///
80/// // Construct the next server to be connected before sending the one
81/// // we already have of onto a task. This ensures that the server
82/// // isn't closed (after it's done in the task) before a new one is
83/// // available. Otherwise the client might error with
84/// // `io::ErrorKind::NotFound`.
85/// server = ServerOptions::new().create(PIPE_NAME)?;
86///
87/// let client = tokio::spawn(async move {
88/// /* use the connected client */
89/// # Ok::<_, std::io::Error>(())
90/// });
91/// # if true { break } // needed for type inference to work
92/// }
93///
94/// Ok::<_, io::Error>(())
95/// });
96///
97/// /* do something else not server related here */
98/// # Ok(()) }
99/// ```
100///
101/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
102#[derive(Debug)]
103pub struct NamedPipeServer {
104 io: PollEvented<mio_windows::NamedPipe>,
105}
106
107impl NamedPipeServer {
108 /// Constructs a new named pipe server from the specified raw handle.
109 ///
110 /// This function will consume ownership of the handle given, passing
111 /// responsibility for closing the handle to the returned object.
112 ///
113 /// This function is also unsafe as the primitives currently returned have
114 /// the contract that they are the sole owner of the file descriptor they
115 /// are wrapping. Usage of this function could accidentally allow violating
116 /// this contract which can cause memory unsafety in code that relies on it
117 /// being true.
118 ///
119 /// # Errors
120 ///
121 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
122 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
123 ///
124 /// [Tokio Runtime]: crate::runtime::Runtime
125 /// [enabled I/O]: crate::runtime::Builder::enable_io
126 pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
127 let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
128
129 Ok(Self {
130 io: PollEvented::new(named_pipe)?,
131 })
132 }
133
134 /// Retrieves information about the named pipe the server is associated
135 /// with.
136 ///
137 /// ```no_run
138 /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
139 ///
140 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
141 ///
142 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
143 /// let server = ServerOptions::new()
144 /// .pipe_mode(PipeMode::Message)
145 /// .max_instances(5)
146 /// .create(PIPE_NAME)?;
147 ///
148 /// let server_info = server.info()?;
149 ///
150 /// assert_eq!(server_info.end, PipeEnd::Server);
151 /// assert_eq!(server_info.mode, PipeMode::Message);
152 /// assert_eq!(server_info.max_instances, 5);
153 /// # Ok(()) }
154 /// ```
155 pub fn info(&self) -> io::Result<PipeInfo> {
156 // Safety: we're ensuring the lifetime of the named pipe.
157 unsafe { named_pipe_info(self.io.as_raw_handle()) }
158 }
159
160 /// Enables a named pipe server process to wait for a client process to
161 /// connect to an instance of a named pipe. A client process connects by
162 /// creating a named pipe with the same name.
163 ///
164 /// This corresponds to the [`ConnectNamedPipe`] system call.
165 ///
166 /// # Cancel safety
167 ///
168 /// This method is cancellation safe in the sense that if it is used as the
169 /// event in a [`select!`](crate::select) statement and some other branch
170 /// completes first, then no connection events have been lost.
171 ///
172 /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
173 ///
174 /// # Example
175 ///
176 /// ```no_run
177 /// use tokio::net::windows::named_pipe::ServerOptions;
178 ///
179 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
180 ///
181 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
182 /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
183 ///
184 /// // Wait for a client to connect.
185 /// pipe.connect().await?;
186 ///
187 /// // Use the connected client...
188 /// # Ok(()) }
189 /// ```
190 pub async fn connect(&self) -> io::Result<()> {
191 match self.io.connect() {
192 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
193 self.io
194 .registration()
195 .async_io(Interest::WRITABLE, || self.io.connect())
196 .await
197 }
198 x => x,
199 }
200 }
201
202 /// Disconnects the server end of a named pipe instance from a client
203 /// process.
204 ///
205 /// ```
206 /// use tokio::io::AsyncWriteExt;
207 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
208 /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
209 ///
210 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
211 ///
212 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
213 /// let server = ServerOptions::new()
214 /// .create(PIPE_NAME)?;
215 ///
216 /// let mut client = ClientOptions::new()
217 /// .open(PIPE_NAME)?;
218 ///
219 /// // Wait for a client to become connected.
220 /// server.connect().await?;
221 ///
222 /// // Forcibly disconnect the client.
223 /// server.disconnect()?;
224 ///
225 /// // Write fails with an OS-specific error after client has been
226 /// // disconnected.
227 /// let e = client.write(b"ping").await.unwrap_err();
228 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_NOT_CONNECTED as i32));
229 /// # Ok(()) }
230 /// ```
231 pub fn disconnect(&self) -> io::Result<()> {
232 self.io.disconnect()
233 }
234
235 /// Waits for any of the requested ready states.
236 ///
237 /// This function is usually paired with `try_read()` or `try_write()`. It
238 /// can be used to concurrently read / write to the same pipe on a single
239 /// task without splitting the pipe.
240 ///
241 /// The function may complete without the pipe being ready. This is a
242 /// false-positive and attempting an operation will return with
243 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
244 /// [`Ready`] set, so you should always check the returned value and possibly
245 /// wait again if the requested states are not set.
246 ///
247 /// # Examples
248 ///
249 /// Concurrently read and write to the pipe on the same task without
250 /// splitting.
251 ///
252 /// ```no_run
253 /// use tokio::io::Interest;
254 /// use tokio::net::windows::named_pipe;
255 /// use std::error::Error;
256 /// use std::io;
257 ///
258 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
259 ///
260 /// #[tokio::main]
261 /// async fn main() -> Result<(), Box<dyn Error>> {
262 /// let server = named_pipe::ServerOptions::new()
263 /// .create(PIPE_NAME)?;
264 ///
265 /// loop {
266 /// let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
267 ///
268 /// if ready.is_readable() {
269 /// let mut data = vec![0; 1024];
270 /// // Try to read data, this may still fail with `WouldBlock`
271 /// // if the readiness event is a false positive.
272 /// match server.try_read(&mut data) {
273 /// Ok(n) => {
274 /// println!("read {} bytes", n);
275 /// }
276 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
277 /// continue;
278 /// }
279 /// Err(e) => {
280 /// return Err(e.into());
281 /// }
282 /// }
283 /// }
284 ///
285 /// if ready.is_writable() {
286 /// // Try to write data, this may still fail with `WouldBlock`
287 /// // if the readiness event is a false positive.
288 /// match server.try_write(b"hello world") {
289 /// Ok(n) => {
290 /// println!("write {} bytes", n);
291 /// }
292 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
293 /// continue;
294 /// }
295 /// Err(e) => {
296 /// return Err(e.into());
297 /// }
298 /// }
299 /// }
300 /// }
301 /// }
302 /// ```
303 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
304 let event = self.io.registration().readiness(interest).await?;
305 Ok(event.ready)
306 }
307
308 /// Waits for the pipe to become readable.
309 ///
310 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
311 /// paired with `try_read()`.
312 ///
313 /// # Examples
314 ///
315 /// ```no_run
316 /// use tokio::net::windows::named_pipe;
317 /// use std::error::Error;
318 /// use std::io;
319 ///
320 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
321 ///
322 /// #[tokio::main]
323 /// async fn main() -> Result<(), Box<dyn Error>> {
324 /// let server = named_pipe::ServerOptions::new()
325 /// .create(PIPE_NAME)?;
326 ///
327 /// let mut msg = vec![0; 1024];
328 ///
329 /// loop {
330 /// // Wait for the pipe to be readable
331 /// server.readable().await?;
332 ///
333 /// // Try to read data, this may still fail with `WouldBlock`
334 /// // if the readiness event is a false positive.
335 /// match server.try_read(&mut msg) {
336 /// Ok(n) => {
337 /// msg.truncate(n);
338 /// break;
339 /// }
340 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
341 /// continue;
342 /// }
343 /// Err(e) => {
344 /// return Err(e.into());
345 /// }
346 /// }
347 /// }
348 ///
349 /// println!("GOT = {:?}", msg);
350 /// Ok(())
351 /// }
352 /// ```
353 pub async fn readable(&self) -> io::Result<()> {
354 self.ready(Interest::READABLE).await?;
355 Ok(())
356 }
357
358 /// Polls for read readiness.
359 ///
360 /// If the pipe is not currently ready for reading, this method will
361 /// store a clone of the `Waker` from the provided `Context`. When the pipe
362 /// becomes ready for reading, `Waker::wake` will be called on the waker.
363 ///
364 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
365 /// the `Waker` from the `Context` passed to the most recent call is
366 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
367 /// second, independent waker.)
368 ///
369 /// This function is intended for cases where creating and pinning a future
370 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
371 /// preferred, as this supports polling from multiple tasks at once.
372 ///
373 /// # Return value
374 ///
375 /// The function returns:
376 ///
377 /// * `Poll::Pending` if the pipe is not ready for reading.
378 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
379 /// * `Poll::Ready(Err(e))` if an error is encountered.
380 ///
381 /// # Errors
382 ///
383 /// This function may encounter any standard I/O error except `WouldBlock`.
384 ///
385 /// [`readable`]: method@Self::readable
386 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
387 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
388 }
389
390 /// Tries to read data from the pipe into the provided buffer, returning how
391 /// many bytes were read.
392 ///
393 /// Receives any pending data from the pipe but does not wait for new data
394 /// to arrive. On success, returns the number of bytes read. Because
395 /// `try_read()` is non-blocking, the buffer does not have to be stored by
396 /// the async task and can exist entirely on the stack.
397 ///
398 /// Usually, [`readable()`] or [`ready()`] is used with this function.
399 ///
400 /// [`readable()`]: NamedPipeServer::readable()
401 /// [`ready()`]: NamedPipeServer::ready()
402 ///
403 /// # Return
404 ///
405 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
406 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
407 ///
408 /// 1. The pipe's read half is closed and will no longer yield data.
409 /// 2. The specified buffer was 0 bytes in length.
410 ///
411 /// If the pipe is not ready to read data,
412 /// `Err(io::ErrorKind::WouldBlock)` is returned.
413 ///
414 /// # Examples
415 ///
416 /// ```no_run
417 /// use tokio::net::windows::named_pipe;
418 /// use std::error::Error;
419 /// use std::io;
420 ///
421 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
422 ///
423 /// #[tokio::main]
424 /// async fn main() -> Result<(), Box<dyn Error>> {
425 /// let server = named_pipe::ServerOptions::new()
426 /// .create(PIPE_NAME)?;
427 ///
428 /// loop {
429 /// // Wait for the pipe to be readable
430 /// server.readable().await?;
431 ///
432 /// // Creating the buffer **after** the `await` prevents it from
433 /// // being stored in the async task.
434 /// let mut buf = [0; 4096];
435 ///
436 /// // Try to read data, this may still fail with `WouldBlock`
437 /// // if the readiness event is a false positive.
438 /// match server.try_read(&mut buf) {
439 /// Ok(0) => break,
440 /// Ok(n) => {
441 /// println!("read {} bytes", n);
442 /// }
443 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
444 /// continue;
445 /// }
446 /// Err(e) => {
447 /// return Err(e.into());
448 /// }
449 /// }
450 /// }
451 ///
452 /// Ok(())
453 /// }
454 /// ```
455 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
456 self.io
457 .registration()
458 .try_io(Interest::READABLE, || (&*self.io).read(buf))
459 }
460
461 /// Tries to read data from the pipe into the provided buffers, returning
462 /// how many bytes were read.
463 ///
464 /// Data is copied to fill each buffer in order, with the final buffer
465 /// written to possibly being only partially filled. This method behaves
466 /// equivalently to a single call to [`try_read()`] with concatenated
467 /// buffers.
468 ///
469 /// Receives any pending data from the pipe but does not wait for new data
470 /// to arrive. On success, returns the number of bytes read. Because
471 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
472 /// stored by the async task and can exist entirely on the stack.
473 ///
474 /// Usually, [`readable()`] or [`ready()`] is used with this function.
475 ///
476 /// [`try_read()`]: NamedPipeServer::try_read()
477 /// [`readable()`]: NamedPipeServer::readable()
478 /// [`ready()`]: NamedPipeServer::ready()
479 ///
480 /// # Return
481 ///
482 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
483 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
484 /// and will no longer yield data. If the pipe is not ready to read data
485 /// `Err(io::ErrorKind::WouldBlock)` is returned.
486 ///
487 /// # Examples
488 ///
489 /// ```no_run
490 /// use tokio::net::windows::named_pipe;
491 /// use std::error::Error;
492 /// use std::io::{self, IoSliceMut};
493 ///
494 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
495 ///
496 /// #[tokio::main]
497 /// async fn main() -> Result<(), Box<dyn Error>> {
498 /// let server = named_pipe::ServerOptions::new()
499 /// .create(PIPE_NAME)?;
500 ///
501 /// loop {
502 /// // Wait for the pipe to be readable
503 /// server.readable().await?;
504 ///
505 /// // Creating the buffer **after** the `await` prevents it from
506 /// // being stored in the async task.
507 /// let mut buf_a = [0; 512];
508 /// let mut buf_b = [0; 1024];
509 /// let mut bufs = [
510 /// IoSliceMut::new(&mut buf_a),
511 /// IoSliceMut::new(&mut buf_b),
512 /// ];
513 ///
514 /// // Try to read data, this may still fail with `WouldBlock`
515 /// // if the readiness event is a false positive.
516 /// match server.try_read_vectored(&mut bufs) {
517 /// Ok(0) => break,
518 /// Ok(n) => {
519 /// println!("read {} bytes", n);
520 /// }
521 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
522 /// continue;
523 /// }
524 /// Err(e) => {
525 /// return Err(e.into());
526 /// }
527 /// }
528 /// }
529 ///
530 /// Ok(())
531 /// }
532 /// ```
533 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
534 self.io
535 .registration()
536 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
537 }
538
539 cfg_io_util! {
540 /// Tries to read data from the stream into the provided buffer, advancing the
541 /// buffer's internal cursor, returning how many bytes were read.
542 ///
543 /// Receives any pending data from the pipe but does not wait for new data
544 /// to arrive. On success, returns the number of bytes read. Because
545 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
546 /// the async task and can exist entirely on the stack.
547 ///
548 /// Usually, [`readable()`] or [`ready()`] is used with this function.
549 ///
550 /// [`readable()`]: NamedPipeServer::readable()
551 /// [`ready()`]: NamedPipeServer::ready()
552 ///
553 /// # Return
554 ///
555 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
556 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
557 /// and will no longer yield data. If the stream is not ready to read data
558 /// `Err(io::ErrorKind::WouldBlock)` is returned.
559 ///
560 /// # Examples
561 ///
562 /// ```no_run
563 /// use tokio::net::windows::named_pipe;
564 /// use std::error::Error;
565 /// use std::io;
566 ///
567 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
568 ///
569 /// #[tokio::main]
570 /// async fn main() -> Result<(), Box<dyn Error>> {
571 /// let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
572 ///
573 /// loop {
574 /// // Wait for the pipe to be readable
575 /// server.readable().await?;
576 ///
577 /// let mut buf = Vec::with_capacity(4096);
578 ///
579 /// // Try to read data, this may still fail with `WouldBlock`
580 /// // if the readiness event is a false positive.
581 /// match server.try_read_buf(&mut buf) {
582 /// Ok(0) => break,
583 /// Ok(n) => {
584 /// println!("read {} bytes", n);
585 /// }
586 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
587 /// continue;
588 /// }
589 /// Err(e) => {
590 /// return Err(e.into());
591 /// }
592 /// }
593 /// }
594 ///
595 /// Ok(())
596 /// }
597 /// ```
598 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
599 self.io.registration().try_io(Interest::READABLE, || {
600 use std::io::Read;
601
602 let dst = buf.chunk_mut();
603 let dst =
604 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
605
606 // Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
607 // buffer.
608 let n = (&*self.io).read(dst)?;
609
610 unsafe {
611 buf.advance_mut(n);
612 }
613
614 Ok(n)
615 })
616 }
617 }
618
619 /// Waits for the pipe to become writable.
620 ///
621 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
622 /// paired with `try_write()`.
623 ///
624 /// # Examples
625 ///
626 /// ```no_run
627 /// use tokio::net::windows::named_pipe;
628 /// use std::error::Error;
629 /// use std::io;
630 ///
631 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
632 ///
633 /// #[tokio::main]
634 /// async fn main() -> Result<(), Box<dyn Error>> {
635 /// let server = named_pipe::ServerOptions::new()
636 /// .create(PIPE_NAME)?;
637 ///
638 /// loop {
639 /// // Wait for the pipe to be writable
640 /// server.writable().await?;
641 ///
642 /// // Try to write data, this may still fail with `WouldBlock`
643 /// // if the readiness event is a false positive.
644 /// match server.try_write(b"hello world") {
645 /// Ok(n) => {
646 /// break;
647 /// }
648 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
649 /// continue;
650 /// }
651 /// Err(e) => {
652 /// return Err(e.into());
653 /// }
654 /// }
655 /// }
656 ///
657 /// Ok(())
658 /// }
659 /// ```
660 pub async fn writable(&self) -> io::Result<()> {
661 self.ready(Interest::WRITABLE).await?;
662 Ok(())
663 }
664
665 /// Polls for write readiness.
666 ///
667 /// If the pipe is not currently ready for writing, this method will
668 /// store a clone of the `Waker` from the provided `Context`. When the pipe
669 /// becomes ready for writing, `Waker::wake` will be called on the waker.
670 ///
671 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
672 /// the `Waker` from the `Context` passed to the most recent call is
673 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
674 /// second, independent waker.)
675 ///
676 /// This function is intended for cases where creating and pinning a future
677 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
678 /// preferred, as this supports polling from multiple tasks at once.
679 ///
680 /// # Return value
681 ///
682 /// The function returns:
683 ///
684 /// * `Poll::Pending` if the pipe is not ready for writing.
685 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
686 /// * `Poll::Ready(Err(e))` if an error is encountered.
687 ///
688 /// # Errors
689 ///
690 /// This function may encounter any standard I/O error except `WouldBlock`.
691 ///
692 /// [`writable`]: method@Self::writable
693 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
694 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
695 }
696
697 /// Tries to write a buffer to the pipe, returning how many bytes were
698 /// written.
699 ///
700 /// The function will attempt to write the entire contents of `buf`, but
701 /// only part of the buffer may be written.
702 ///
703 /// This function is usually paired with `writable()`.
704 ///
705 /// # Return
706 ///
707 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
708 /// number of bytes written. If the pipe is not ready to write data,
709 /// `Err(io::ErrorKind::WouldBlock)` is returned.
710 ///
711 /// # Examples
712 ///
713 /// ```no_run
714 /// use tokio::net::windows::named_pipe;
715 /// use std::error::Error;
716 /// use std::io;
717 ///
718 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
719 ///
720 /// #[tokio::main]
721 /// async fn main() -> Result<(), Box<dyn Error>> {
722 /// let server = named_pipe::ServerOptions::new()
723 /// .create(PIPE_NAME)?;
724 ///
725 /// loop {
726 /// // Wait for the pipe to be writable
727 /// server.writable().await?;
728 ///
729 /// // Try to write data, this may still fail with `WouldBlock`
730 /// // if the readiness event is a false positive.
731 /// match server.try_write(b"hello world") {
732 /// Ok(n) => {
733 /// break;
734 /// }
735 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
736 /// continue;
737 /// }
738 /// Err(e) => {
739 /// return Err(e.into());
740 /// }
741 /// }
742 /// }
743 ///
744 /// Ok(())
745 /// }
746 /// ```
747 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
748 self.io
749 .registration()
750 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
751 }
752
753 /// Tries to write several buffers to the pipe, returning how many bytes
754 /// were written.
755 ///
756 /// Data is written from each buffer in order, with the final buffer read
757 /// from possible being only partially consumed. This method behaves
758 /// equivalently to a single call to [`try_write()`] with concatenated
759 /// buffers.
760 ///
761 /// This function is usually paired with `writable()`.
762 ///
763 /// [`try_write()`]: NamedPipeServer::try_write()
764 ///
765 /// # Return
766 ///
767 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
768 /// number of bytes written. If the pipe is not ready to write data,
769 /// `Err(io::ErrorKind::WouldBlock)` is returned.
770 ///
771 /// # Examples
772 ///
773 /// ```no_run
774 /// use tokio::net::windows::named_pipe;
775 /// use std::error::Error;
776 /// use std::io;
777 ///
778 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
779 ///
780 /// #[tokio::main]
781 /// async fn main() -> Result<(), Box<dyn Error>> {
782 /// let server = named_pipe::ServerOptions::new()
783 /// .create(PIPE_NAME)?;
784 ///
785 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
786 ///
787 /// loop {
788 /// // Wait for the pipe to be writable
789 /// server.writable().await?;
790 ///
791 /// // Try to write data, this may still fail with `WouldBlock`
792 /// // if the readiness event is a false positive.
793 /// match server.try_write_vectored(&bufs) {
794 /// Ok(n) => {
795 /// break;
796 /// }
797 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
798 /// continue;
799 /// }
800 /// Err(e) => {
801 /// return Err(e.into());
802 /// }
803 /// }
804 /// }
805 ///
806 /// Ok(())
807 /// }
808 /// ```
809 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
810 self.io
811 .registration()
812 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
813 }
814
815 /// Tries to read or write from the pipe using a user-provided IO operation.
816 ///
817 /// If the pipe is ready, the provided closure is called. The closure
818 /// should attempt to perform IO operation from the pipe by manually
819 /// calling the appropriate syscall. If the operation fails because the
820 /// pipe is not actually ready, then the closure should return a
821 /// `WouldBlock` error and the readiness flag is cleared. The return value
822 /// of the closure is then returned by `try_io`.
823 ///
824 /// If the pipe is not ready, then the closure is not called
825 /// and a `WouldBlock` error is returned.
826 ///
827 /// The closure should only return a `WouldBlock` error if it has performed
828 /// an IO operation on the pipe that failed due to the pipe not being
829 /// ready. Returning a `WouldBlock` error in any other situation will
830 /// incorrectly clear the readiness flag, which can cause the pipe to
831 /// behave incorrectly.
832 ///
833 /// The closure should not perform the IO operation using any of the
834 /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
835 /// the readiness flag and can cause the pipe to behave incorrectly.
836 ///
837 /// This method is not intended to be used with combined interests.
838 /// The closure should perform only one type of IO operation, so it should not
839 /// require more than one ready state. This method may panic or sleep forever
840 /// if it is called with a combined interest.
841 ///
842 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
843 ///
844 /// [`readable()`]: NamedPipeServer::readable()
845 /// [`writable()`]: NamedPipeServer::writable()
846 /// [`ready()`]: NamedPipeServer::ready()
847 pub fn try_io<R>(
848 &self,
849 interest: Interest,
850 f: impl FnOnce() -> io::Result<R>,
851 ) -> io::Result<R> {
852 self.io.registration().try_io(interest, f)
853 }
854
855 /// Reads or writes from the pipe using a user-provided IO operation.
856 ///
857 /// The readiness of the pipe is awaited and when the pipe is ready,
858 /// the provided closure is called. The closure should attempt to perform
859 /// IO operation on the pipe by manually calling the appropriate syscall.
860 /// If the operation fails because the pipe is not actually ready,
861 /// then the closure should return a `WouldBlock` error. In such case the
862 /// readiness flag is cleared and the pipe readiness is awaited again.
863 /// This loop is repeated until the closure returns an `Ok` or an error
864 /// other than `WouldBlock`.
865 ///
866 /// The closure should only return a `WouldBlock` error if it has performed
867 /// an IO operation on the pipe that failed due to the pipe not being
868 /// ready. Returning a `WouldBlock` error in any other situation will
869 /// incorrectly clear the readiness flag, which can cause the pipe to
870 /// behave incorrectly.
871 ///
872 /// The closure should not perform the IO operation using any of the methods
873 /// defined on the Tokio `NamedPipeServer` type, as this will mess with the
874 /// readiness flag and can cause the pipe to behave incorrectly.
875 ///
876 /// This method is not intended to be used with combined interests.
877 /// The closure should perform only one type of IO operation, so it should not
878 /// require more than one ready state. This method may panic or sleep forever
879 /// if it is called with a combined interest.
880 pub async fn async_io<R>(
881 &self,
882 interest: Interest,
883 f: impl FnMut() -> io::Result<R>,
884 ) -> io::Result<R> {
885 self.io.registration().async_io(interest, f).await
886 }
887}
888
889impl AsyncRead for NamedPipeServer {
890 fn poll_read(
891 self: Pin<&mut Self>,
892 cx: &mut Context<'_>,
893 buf: &mut ReadBuf<'_>,
894 ) -> Poll<io::Result<()>> {
895 unsafe { self.io.poll_read(cx, buf) }
896 }
897}
898
899impl AsyncWrite for NamedPipeServer {
900 fn poll_write(
901 self: Pin<&mut Self>,
902 cx: &mut Context<'_>,
903 buf: &[u8],
904 ) -> Poll<io::Result<usize>> {
905 self.io.poll_write(cx, buf)
906 }
907
908 fn poll_write_vectored(
909 self: Pin<&mut Self>,
910 cx: &mut Context<'_>,
911 bufs: &[io::IoSlice<'_>],
912 ) -> Poll<io::Result<usize>> {
913 self.io.poll_write_vectored(cx, bufs)
914 }
915
916 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
917 Poll::Ready(Ok(()))
918 }
919
920 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
921 self.poll_flush(cx)
922 }
923}
924
925impl AsRawHandle for NamedPipeServer {
926 fn as_raw_handle(&self) -> RawHandle {
927 self.io.as_raw_handle()
928 }
929}
930
931impl AsHandle for NamedPipeServer {
932 fn as_handle(&self) -> BorrowedHandle<'_> {
933 unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
934 }
935}
936
937/// A [Windows named pipe] client.
938///
939/// Constructed using [`ClientOptions::open`].
940///
941/// Connecting a client correctly involves a few steps. When connecting through
942/// [`ClientOptions::open`], it might error indicating one of two things:
943///
944/// * [`std::io::ErrorKind::NotFound`] - There is no server available.
945/// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
946/// for a while and try again.
947///
948/// So a correctly implemented client looks like this:
949///
950/// ```no_run
951/// use std::time::Duration;
952/// use tokio::net::windows::named_pipe::ClientOptions;
953/// use tokio::time;
954/// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
955///
956/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
957///
958/// # #[tokio::main] async fn main() -> std::io::Result<()> {
959/// let client = loop {
960/// match ClientOptions::new().open(PIPE_NAME) {
961/// Ok(client) => break client,
962/// Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
963/// Err(e) => return Err(e),
964/// }
965///
966/// time::sleep(Duration::from_millis(50)).await;
967/// };
968///
969/// /* use the connected client */
970/// # Ok(()) }
971/// ```
972///
973/// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
974/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
975#[derive(Debug)]
976pub struct NamedPipeClient {
977 io: PollEvented<mio_windows::NamedPipe>,
978}
979
980impl NamedPipeClient {
981 /// Constructs a new named pipe client from the specified raw handle.
982 ///
983 /// This function will consume ownership of the handle given, passing
984 /// responsibility for closing the handle to the returned object.
985 ///
986 /// This function is also unsafe as the primitives currently returned have
987 /// the contract that they are the sole owner of the file descriptor they
988 /// are wrapping. Usage of this function could accidentally allow violating
989 /// this contract which can cause memory unsafety in code that relies on it
990 /// being true.
991 ///
992 /// # Errors
993 ///
994 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
995 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
996 ///
997 /// [Tokio Runtime]: crate::runtime::Runtime
998 /// [enabled I/O]: crate::runtime::Builder::enable_io
999 pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
1000 let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
1001
1002 Ok(Self {
1003 io: PollEvented::new(named_pipe)?,
1004 })
1005 }
1006
1007 /// Retrieves information about the named pipe the client is associated
1008 /// with.
1009 ///
1010 /// ```no_run
1011 /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
1012 ///
1013 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
1014 ///
1015 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1016 /// let client = ClientOptions::new()
1017 /// .open(PIPE_NAME)?;
1018 ///
1019 /// let client_info = client.info()?;
1020 ///
1021 /// assert_eq!(client_info.end, PipeEnd::Client);
1022 /// assert_eq!(client_info.mode, PipeMode::Message);
1023 /// assert_eq!(client_info.max_instances, 5);
1024 /// # Ok(()) }
1025 /// ```
1026 pub fn info(&self) -> io::Result<PipeInfo> {
1027 // Safety: we're ensuring the lifetime of the named pipe.
1028 unsafe { named_pipe_info(self.io.as_raw_handle()) }
1029 }
1030
1031 /// Waits for any of the requested ready states.
1032 ///
1033 /// This function is usually paired with `try_read()` or `try_write()`. It
1034 /// can be used to concurrently read / write to the same pipe on a single
1035 /// task without splitting the pipe.
1036 ///
1037 /// The function may complete without the pipe being ready. This is a
1038 /// false-positive and attempting an operation will return with
1039 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1040 /// [`Ready`] set, so you should always check the returned value and possibly
1041 /// wait again if the requested states are not set.
1042 ///
1043 /// # Examples
1044 ///
1045 /// Concurrently read and write to the pipe on the same task without
1046 /// splitting.
1047 ///
1048 /// ```no_run
1049 /// use tokio::io::Interest;
1050 /// use tokio::net::windows::named_pipe;
1051 /// use std::error::Error;
1052 /// use std::io;
1053 ///
1054 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
1055 ///
1056 /// #[tokio::main]
1057 /// async fn main() -> Result<(), Box<dyn Error>> {
1058 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1059 ///
1060 /// loop {
1061 /// let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
1062 ///
1063 /// if ready.is_readable() {
1064 /// let mut data = vec![0; 1024];
1065 /// // Try to read data, this may still fail with `WouldBlock`
1066 /// // if the readiness event is a false positive.
1067 /// match client.try_read(&mut data) {
1068 /// Ok(n) => {
1069 /// println!("read {} bytes", n);
1070 /// }
1071 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1072 /// continue;
1073 /// }
1074 /// Err(e) => {
1075 /// return Err(e.into());
1076 /// }
1077 /// }
1078 /// }
1079 ///
1080 /// if ready.is_writable() {
1081 /// // Try to write data, this may still fail with `WouldBlock`
1082 /// // if the readiness event is a false positive.
1083 /// match client.try_write(b"hello world") {
1084 /// Ok(n) => {
1085 /// println!("write {} bytes", n);
1086 /// }
1087 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1088 /// continue;
1089 /// }
1090 /// Err(e) => {
1091 /// return Err(e.into());
1092 /// }
1093 /// }
1094 /// }
1095 /// }
1096 /// }
1097 /// ```
1098 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1099 let event = self.io.registration().readiness(interest).await?;
1100 Ok(event.ready)
1101 }
1102
1103 /// Waits for the pipe to become readable.
1104 ///
1105 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1106 /// paired with `try_read()`.
1107 ///
1108 /// # Examples
1109 ///
1110 /// ```no_run
1111 /// use tokio::net::windows::named_pipe;
1112 /// use std::error::Error;
1113 /// use std::io;
1114 ///
1115 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1116 ///
1117 /// #[tokio::main]
1118 /// async fn main() -> Result<(), Box<dyn Error>> {
1119 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1120 ///
1121 /// let mut msg = vec![0; 1024];
1122 ///
1123 /// loop {
1124 /// // Wait for the pipe to be readable
1125 /// client.readable().await?;
1126 ///
1127 /// // Try to read data, this may still fail with `WouldBlock`
1128 /// // if the readiness event is a false positive.
1129 /// match client.try_read(&mut msg) {
1130 /// Ok(n) => {
1131 /// msg.truncate(n);
1132 /// break;
1133 /// }
1134 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1135 /// continue;
1136 /// }
1137 /// Err(e) => {
1138 /// return Err(e.into());
1139 /// }
1140 /// }
1141 /// }
1142 ///
1143 /// println!("GOT = {:?}", msg);
1144 /// Ok(())
1145 /// }
1146 /// ```
1147 pub async fn readable(&self) -> io::Result<()> {
1148 self.ready(Interest::READABLE).await?;
1149 Ok(())
1150 }
1151
1152 /// Polls for read readiness.
1153 ///
1154 /// If the pipe is not currently ready for reading, this method will
1155 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1156 /// becomes ready for reading, `Waker::wake` will be called on the waker.
1157 ///
1158 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1159 /// the `Waker` from the `Context` passed to the most recent call is
1160 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1161 /// second, independent waker.)
1162 ///
1163 /// This function is intended for cases where creating and pinning a future
1164 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1165 /// preferred, as this supports polling from multiple tasks at once.
1166 ///
1167 /// # Return value
1168 ///
1169 /// The function returns:
1170 ///
1171 /// * `Poll::Pending` if the pipe is not ready for reading.
1172 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1173 /// * `Poll::Ready(Err(e))` if an error is encountered.
1174 ///
1175 /// # Errors
1176 ///
1177 /// This function may encounter any standard I/O error except `WouldBlock`.
1178 ///
1179 /// [`readable`]: method@Self::readable
1180 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1181 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1182 }
1183
1184 /// Tries to read data from the pipe into the provided buffer, returning how
1185 /// many bytes were read.
1186 ///
1187 /// Receives any pending data from the pipe but does not wait for new data
1188 /// to arrive. On success, returns the number of bytes read. Because
1189 /// `try_read()` is non-blocking, the buffer does not have to be stored by
1190 /// the async task and can exist entirely on the stack.
1191 ///
1192 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1193 ///
1194 /// [`readable()`]: NamedPipeClient::readable()
1195 /// [`ready()`]: NamedPipeClient::ready()
1196 ///
1197 /// # Return
1198 ///
1199 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1200 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1201 ///
1202 /// 1. The pipe's read half is closed and will no longer yield data.
1203 /// 2. The specified buffer was 0 bytes in length.
1204 ///
1205 /// If the pipe is not ready to read data,
1206 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1207 ///
1208 /// # Examples
1209 ///
1210 /// ```no_run
1211 /// use tokio::net::windows::named_pipe;
1212 /// use std::error::Error;
1213 /// use std::io;
1214 ///
1215 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1216 ///
1217 /// #[tokio::main]
1218 /// async fn main() -> Result<(), Box<dyn Error>> {
1219 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1220 ///
1221 /// loop {
1222 /// // Wait for the pipe to be readable
1223 /// client.readable().await?;
1224 ///
1225 /// // Creating the buffer **after** the `await` prevents it from
1226 /// // being stored in the async task.
1227 /// let mut buf = [0; 4096];
1228 ///
1229 /// // Try to read data, this may still fail with `WouldBlock`
1230 /// // if the readiness event is a false positive.
1231 /// match client.try_read(&mut buf) {
1232 /// Ok(0) => break,
1233 /// Ok(n) => {
1234 /// println!("read {} bytes", n);
1235 /// }
1236 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1237 /// continue;
1238 /// }
1239 /// Err(e) => {
1240 /// return Err(e.into());
1241 /// }
1242 /// }
1243 /// }
1244 ///
1245 /// Ok(())
1246 /// }
1247 /// ```
1248 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1249 self.io
1250 .registration()
1251 .try_io(Interest::READABLE, || (&*self.io).read(buf))
1252 }
1253
1254 /// Tries to read data from the pipe into the provided buffers, returning
1255 /// how many bytes were read.
1256 ///
1257 /// Data is copied to fill each buffer in order, with the final buffer
1258 /// written to possibly being only partially filled. This method behaves
1259 /// equivalently to a single call to [`try_read()`] with concatenated
1260 /// buffers.
1261 ///
1262 /// Receives any pending data from the pipe but does not wait for new data
1263 /// to arrive. On success, returns the number of bytes read. Because
1264 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1265 /// stored by the async task and can exist entirely on the stack.
1266 ///
1267 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1268 ///
1269 /// [`try_read()`]: NamedPipeClient::try_read()
1270 /// [`readable()`]: NamedPipeClient::readable()
1271 /// [`ready()`]: NamedPipeClient::ready()
1272 ///
1273 /// # Return
1274 ///
1275 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1276 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1277 /// and will no longer yield data. If the pipe is not ready to read data
1278 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1279 ///
1280 /// # Examples
1281 ///
1282 /// ```no_run
1283 /// use tokio::net::windows::named_pipe;
1284 /// use std::error::Error;
1285 /// use std::io::{self, IoSliceMut};
1286 ///
1287 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1288 ///
1289 /// #[tokio::main]
1290 /// async fn main() -> Result<(), Box<dyn Error>> {
1291 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1292 ///
1293 /// loop {
1294 /// // Wait for the pipe to be readable
1295 /// client.readable().await?;
1296 ///
1297 /// // Creating the buffer **after** the `await` prevents it from
1298 /// // being stored in the async task.
1299 /// let mut buf_a = [0; 512];
1300 /// let mut buf_b = [0; 1024];
1301 /// let mut bufs = [
1302 /// IoSliceMut::new(&mut buf_a),
1303 /// IoSliceMut::new(&mut buf_b),
1304 /// ];
1305 ///
1306 /// // Try to read data, this may still fail with `WouldBlock`
1307 /// // if the readiness event is a false positive.
1308 /// match client.try_read_vectored(&mut bufs) {
1309 /// Ok(0) => break,
1310 /// Ok(n) => {
1311 /// println!("read {} bytes", n);
1312 /// }
1313 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1314 /// continue;
1315 /// }
1316 /// Err(e) => {
1317 /// return Err(e.into());
1318 /// }
1319 /// }
1320 /// }
1321 ///
1322 /// Ok(())
1323 /// }
1324 /// ```
1325 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1326 self.io
1327 .registration()
1328 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1329 }
1330
1331 cfg_io_util! {
1332 /// Tries to read data from the stream into the provided buffer, advancing the
1333 /// buffer's internal cursor, returning how many bytes were read.
1334 ///
1335 /// Receives any pending data from the pipe but does not wait for new data
1336 /// to arrive. On success, returns the number of bytes read. Because
1337 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1338 /// the async task and can exist entirely on the stack.
1339 ///
1340 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1341 ///
1342 /// [`readable()`]: NamedPipeClient::readable()
1343 /// [`ready()`]: NamedPipeClient::ready()
1344 ///
1345 /// # Return
1346 ///
1347 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1348 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
1349 /// and will no longer yield data. If the stream is not ready to read data
1350 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1351 ///
1352 /// # Examples
1353 ///
1354 /// ```no_run
1355 /// use tokio::net::windows::named_pipe;
1356 /// use std::error::Error;
1357 /// use std::io;
1358 ///
1359 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1360 ///
1361 /// #[tokio::main]
1362 /// async fn main() -> Result<(), Box<dyn Error>> {
1363 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1364 ///
1365 /// loop {
1366 /// // Wait for the pipe to be readable
1367 /// client.readable().await?;
1368 ///
1369 /// let mut buf = Vec::with_capacity(4096);
1370 ///
1371 /// // Try to read data, this may still fail with `WouldBlock`
1372 /// // if the readiness event is a false positive.
1373 /// match client.try_read_buf(&mut buf) {
1374 /// Ok(0) => break,
1375 /// Ok(n) => {
1376 /// println!("read {} bytes", n);
1377 /// }
1378 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1379 /// continue;
1380 /// }
1381 /// Err(e) => {
1382 /// return Err(e.into());
1383 /// }
1384 /// }
1385 /// }
1386 ///
1387 /// Ok(())
1388 /// }
1389 /// ```
1390 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1391 self.io.registration().try_io(Interest::READABLE, || {
1392 use std::io::Read;
1393
1394 let dst = buf.chunk_mut();
1395 let dst =
1396 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1397
1398 // Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
1399 // buffer.
1400 let n = (&*self.io).read(dst)?;
1401
1402 unsafe {
1403 buf.advance_mut(n);
1404 }
1405
1406 Ok(n)
1407 })
1408 }
1409 }
1410
1411 /// Waits for the pipe to become writable.
1412 ///
1413 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1414 /// paired with `try_write()`.
1415 ///
1416 /// # Examples
1417 ///
1418 /// ```no_run
1419 /// use tokio::net::windows::named_pipe;
1420 /// use std::error::Error;
1421 /// use std::io;
1422 ///
1423 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1424 ///
1425 /// #[tokio::main]
1426 /// async fn main() -> Result<(), Box<dyn Error>> {
1427 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1428 ///
1429 /// loop {
1430 /// // Wait for the pipe to be writable
1431 /// client.writable().await?;
1432 ///
1433 /// // Try to write data, this may still fail with `WouldBlock`
1434 /// // if the readiness event is a false positive.
1435 /// match client.try_write(b"hello world") {
1436 /// Ok(n) => {
1437 /// break;
1438 /// }
1439 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1440 /// continue;
1441 /// }
1442 /// Err(e) => {
1443 /// return Err(e.into());
1444 /// }
1445 /// }
1446 /// }
1447 ///
1448 /// Ok(())
1449 /// }
1450 /// ```
1451 pub async fn writable(&self) -> io::Result<()> {
1452 self.ready(Interest::WRITABLE).await?;
1453 Ok(())
1454 }
1455
1456 /// Polls for write readiness.
1457 ///
1458 /// If the pipe is not currently ready for writing, this method will
1459 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1460 /// becomes ready for writing, `Waker::wake` will be called on the waker.
1461 ///
1462 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1463 /// the `Waker` from the `Context` passed to the most recent call is
1464 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1465 /// second, independent waker.)
1466 ///
1467 /// This function is intended for cases where creating and pinning a future
1468 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1469 /// preferred, as this supports polling from multiple tasks at once.
1470 ///
1471 /// # Return value
1472 ///
1473 /// The function returns:
1474 ///
1475 /// * `Poll::Pending` if the pipe is not ready for writing.
1476 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1477 /// * `Poll::Ready(Err(e))` if an error is encountered.
1478 ///
1479 /// # Errors
1480 ///
1481 /// This function may encounter any standard I/O error except `WouldBlock`.
1482 ///
1483 /// [`writable`]: method@Self::writable
1484 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1485 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1486 }
1487
1488 /// Tries to write a buffer to the pipe, returning how many bytes were
1489 /// written.
1490 ///
1491 /// The function will attempt to write the entire contents of `buf`, but
1492 /// only part of the buffer may be written.
1493 ///
1494 /// This function is usually paired with `writable()`.
1495 ///
1496 /// # Return
1497 ///
1498 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1499 /// number of bytes written. If the pipe is not ready to write data,
1500 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1501 ///
1502 /// # Examples
1503 ///
1504 /// ```no_run
1505 /// use tokio::net::windows::named_pipe;
1506 /// use std::error::Error;
1507 /// use std::io;
1508 ///
1509 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1510 ///
1511 /// #[tokio::main]
1512 /// async fn main() -> Result<(), Box<dyn Error>> {
1513 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1514 ///
1515 /// loop {
1516 /// // Wait for the pipe to be writable
1517 /// client.writable().await?;
1518 ///
1519 /// // Try to write data, this may still fail with `WouldBlock`
1520 /// // if the readiness event is a false positive.
1521 /// match client.try_write(b"hello world") {
1522 /// Ok(n) => {
1523 /// break;
1524 /// }
1525 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1526 /// continue;
1527 /// }
1528 /// Err(e) => {
1529 /// return Err(e.into());
1530 /// }
1531 /// }
1532 /// }
1533 ///
1534 /// Ok(())
1535 /// }
1536 /// ```
1537 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1538 self.io
1539 .registration()
1540 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1541 }
1542
1543 /// Tries to write several buffers to the pipe, returning how many bytes
1544 /// were written.
1545 ///
1546 /// Data is written from each buffer in order, with the final buffer read
1547 /// from possible being only partially consumed. This method behaves
1548 /// equivalently to a single call to [`try_write()`] with concatenated
1549 /// buffers.
1550 ///
1551 /// This function is usually paired with `writable()`.
1552 ///
1553 /// [`try_write()`]: NamedPipeClient::try_write()
1554 ///
1555 /// # Return
1556 ///
1557 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1558 /// number of bytes written. If the pipe is not ready to write data,
1559 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1560 ///
1561 /// # Examples
1562 ///
1563 /// ```no_run
1564 /// use tokio::net::windows::named_pipe;
1565 /// use std::error::Error;
1566 /// use std::io;
1567 ///
1568 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1569 ///
1570 /// #[tokio::main]
1571 /// async fn main() -> Result<(), Box<dyn Error>> {
1572 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1573 ///
1574 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1575 ///
1576 /// loop {
1577 /// // Wait for the pipe to be writable
1578 /// client.writable().await?;
1579 ///
1580 /// // Try to write data, this may still fail with `WouldBlock`
1581 /// // if the readiness event is a false positive.
1582 /// match client.try_write_vectored(&bufs) {
1583 /// Ok(n) => {
1584 /// break;
1585 /// }
1586 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1587 /// continue;
1588 /// }
1589 /// Err(e) => {
1590 /// return Err(e.into());
1591 /// }
1592 /// }
1593 /// }
1594 ///
1595 /// Ok(())
1596 /// }
1597 /// ```
1598 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1599 self.io
1600 .registration()
1601 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1602 }
1603
1604 /// Tries to read or write from the pipe using a user-provided IO operation.
1605 ///
1606 /// If the pipe is ready, the provided closure is called. The closure
1607 /// should attempt to perform IO operation from the pipe by manually
1608 /// calling the appropriate syscall. If the operation fails because the
1609 /// pipe is not actually ready, then the closure should return a
1610 /// `WouldBlock` error and the readiness flag is cleared. The return value
1611 /// of the closure is then returned by `try_io`.
1612 ///
1613 /// If the pipe is not ready, then the closure is not called
1614 /// and a `WouldBlock` error is returned.
1615 ///
1616 /// The closure should only return a `WouldBlock` error if it has performed
1617 /// an IO operation on the pipe that failed due to the pipe not being
1618 /// ready. Returning a `WouldBlock` error in any other situation will
1619 /// incorrectly clear the readiness flag, which can cause the pipe to
1620 /// behave incorrectly.
1621 ///
1622 /// The closure should not perform the IO operation using any of the methods
1623 /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1624 /// readiness flag and can cause the pipe to behave incorrectly.
1625 ///
1626 /// This method is not intended to be used with combined interests.
1627 /// The closure should perform only one type of IO operation, so it should not
1628 /// require more than one ready state. This method may panic or sleep forever
1629 /// if it is called with a combined interest.
1630 ///
1631 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1632 ///
1633 /// [`readable()`]: NamedPipeClient::readable()
1634 /// [`writable()`]: NamedPipeClient::writable()
1635 /// [`ready()`]: NamedPipeClient::ready()
1636 pub fn try_io<R>(
1637 &self,
1638 interest: Interest,
1639 f: impl FnOnce() -> io::Result<R>,
1640 ) -> io::Result<R> {
1641 self.io.registration().try_io(interest, f)
1642 }
1643
1644 /// Reads or writes from the pipe using a user-provided IO operation.
1645 ///
1646 /// The readiness of the pipe is awaited and when the pipe is ready,
1647 /// the provided closure is called. The closure should attempt to perform
1648 /// IO operation on the pipe by manually calling the appropriate syscall.
1649 /// If the operation fails because the pipe is not actually ready,
1650 /// then the closure should return a `WouldBlock` error. In such case the
1651 /// readiness flag is cleared and the pipe readiness is awaited again.
1652 /// This loop is repeated until the closure returns an `Ok` or an error
1653 /// other than `WouldBlock`.
1654 ///
1655 /// The closure should only return a `WouldBlock` error if it has performed
1656 /// an IO operation on the pipe that failed due to the pipe not being
1657 /// ready. Returning a `WouldBlock` error in any other situation will
1658 /// incorrectly clear the readiness flag, which can cause the pipe to
1659 /// behave incorrectly.
1660 ///
1661 /// The closure should not perform the IO operation using any of the methods
1662 /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1663 /// readiness flag and can cause the pipe to behave incorrectly.
1664 ///
1665 /// This method is not intended to be used with combined interests.
1666 /// The closure should perform only one type of IO operation, so it should not
1667 /// require more than one ready state. This method may panic or sleep forever
1668 /// if it is called with a combined interest.
1669 pub async fn async_io<R>(
1670 &self,
1671 interest: Interest,
1672 f: impl FnMut() -> io::Result<R>,
1673 ) -> io::Result<R> {
1674 self.io.registration().async_io(interest, f).await
1675 }
1676}
1677
1678impl AsyncRead for NamedPipeClient {
1679 fn poll_read(
1680 self: Pin<&mut Self>,
1681 cx: &mut Context<'_>,
1682 buf: &mut ReadBuf<'_>,
1683 ) -> Poll<io::Result<()>> {
1684 unsafe { self.io.poll_read(cx, buf) }
1685 }
1686}
1687
1688impl AsyncWrite for NamedPipeClient {
1689 fn poll_write(
1690 self: Pin<&mut Self>,
1691 cx: &mut Context<'_>,
1692 buf: &[u8],
1693 ) -> Poll<io::Result<usize>> {
1694 self.io.poll_write(cx, buf)
1695 }
1696
1697 fn poll_write_vectored(
1698 self: Pin<&mut Self>,
1699 cx: &mut Context<'_>,
1700 bufs: &[io::IoSlice<'_>],
1701 ) -> Poll<io::Result<usize>> {
1702 self.io.poll_write_vectored(cx, bufs)
1703 }
1704
1705 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1706 Poll::Ready(Ok(()))
1707 }
1708
1709 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1710 self.poll_flush(cx)
1711 }
1712}
1713
1714impl AsRawHandle for NamedPipeClient {
1715 fn as_raw_handle(&self) -> RawHandle {
1716 self.io.as_raw_handle()
1717 }
1718}
1719
1720impl AsHandle for NamedPipeClient {
1721 fn as_handle(&self) -> BorrowedHandle<'_> {
1722 unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
1723 }
1724}
1725
1726/// A builder structure for construct a named pipe with named pipe-specific
1727/// options. This is required to use for named pipe servers who wants to modify
1728/// pipe-related options.
1729///
1730/// See [`ServerOptions::create`].
1731#[derive(Debug, Clone)]
1732pub struct ServerOptions {
1733 // dwOpenMode
1734 access_inbound: bool,
1735 access_outbound: bool,
1736 first_pipe_instance: bool,
1737 write_dac: bool,
1738 write_owner: bool,
1739 access_system_security: bool,
1740 // dwPipeMode
1741 pipe_mode: PipeMode,
1742 reject_remote_clients: bool,
1743 // other options
1744 max_instances: u32,
1745 out_buffer_size: u32,
1746 in_buffer_size: u32,
1747 default_timeout: u32,
1748}
1749
1750impl ServerOptions {
1751 /// Creates a new named pipe builder with the default settings.
1752 ///
1753 /// ```
1754 /// use tokio::net::windows::named_pipe::ServerOptions;
1755 ///
1756 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1757 ///
1758 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1759 /// let server = ServerOptions::new().create(PIPE_NAME)?;
1760 /// # Ok(()) }
1761 /// ```
1762 pub fn new() -> ServerOptions {
1763 ServerOptions {
1764 access_inbound: true,
1765 access_outbound: true,
1766 first_pipe_instance: false,
1767 write_dac: false,
1768 write_owner: false,
1769 access_system_security: false,
1770 pipe_mode: PipeMode::Byte,
1771 reject_remote_clients: true,
1772 max_instances: windows_sys::PIPE_UNLIMITED_INSTANCES,
1773 out_buffer_size: 65536,
1774 in_buffer_size: 65536,
1775 default_timeout: 0,
1776 }
1777 }
1778
1779 /// The pipe mode.
1780 ///
1781 /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1782 /// documentation of what each mode means.
1783 ///
1784 /// This corresponds to specifying `PIPE_TYPE_` and `PIPE_READMODE_` in [`dwPipeMode`].
1785 ///
1786 /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
1787 pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1788 self.pipe_mode = pipe_mode;
1789 self
1790 }
1791
1792 /// The flow of data in the pipe goes from client to server only.
1793 ///
1794 /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1795 ///
1796 /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1797 ///
1798 /// # Errors
1799 ///
1800 /// Server side prevents connecting by denying inbound access, client errors
1801 /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1802 /// the connection.
1803 ///
1804 /// ```
1805 /// use std::io;
1806 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1807 ///
1808 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1809 ///
1810 /// # #[tokio::main] async fn main() -> io::Result<()> {
1811 /// let _server = ServerOptions::new()
1812 /// .access_inbound(false)
1813 /// .create(PIPE_NAME)?;
1814 ///
1815 /// let e = ClientOptions::new()
1816 /// .open(PIPE_NAME)
1817 /// .unwrap_err();
1818 ///
1819 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1820 /// # Ok(()) }
1821 /// ```
1822 ///
1823 /// Disabling writing allows a client to connect, but errors with
1824 /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1825 ///
1826 /// ```
1827 /// use std::io;
1828 /// use tokio::io::AsyncWriteExt;
1829 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1830 ///
1831 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1832 ///
1833 /// # #[tokio::main] async fn main() -> io::Result<()> {
1834 /// let server = ServerOptions::new()
1835 /// .access_inbound(false)
1836 /// .create(PIPE_NAME)?;
1837 ///
1838 /// let mut client = ClientOptions::new()
1839 /// .write(false)
1840 /// .open(PIPE_NAME)?;
1841 ///
1842 /// server.connect().await?;
1843 ///
1844 /// let e = client.write(b"ping").await.unwrap_err();
1845 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1846 /// # Ok(()) }
1847 /// ```
1848 ///
1849 /// # Examples
1850 ///
1851 /// A unidirectional named pipe that only supports server-to-client
1852 /// communication.
1853 ///
1854 /// ```
1855 /// use std::io;
1856 /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1857 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1858 ///
1859 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1860 ///
1861 /// # #[tokio::main] async fn main() -> io::Result<()> {
1862 /// let mut server = ServerOptions::new()
1863 /// .access_inbound(false)
1864 /// .create(PIPE_NAME)?;
1865 ///
1866 /// let mut client = ClientOptions::new()
1867 /// .write(false)
1868 /// .open(PIPE_NAME)?;
1869 ///
1870 /// server.connect().await?;
1871 ///
1872 /// let write = server.write_all(b"ping");
1873 ///
1874 /// let mut buf = [0u8; 4];
1875 /// let read = client.read_exact(&mut buf);
1876 ///
1877 /// let ((), read) = tokio::try_join!(write, read)?;
1878 ///
1879 /// assert_eq!(read, 4);
1880 /// assert_eq!(&buf[..], b"ping");
1881 /// # Ok(()) }
1882 /// ```
1883 pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1884 self.access_inbound = allowed;
1885 self
1886 }
1887
1888 /// The flow of data in the pipe goes from server to client only.
1889 ///
1890 /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1891 ///
1892 /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1893 ///
1894 /// # Errors
1895 ///
1896 /// Server side prevents connecting by denying outbound access, client
1897 /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1898 /// create the connection.
1899 ///
1900 /// ```
1901 /// use std::io;
1902 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1903 ///
1904 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1905 ///
1906 /// # #[tokio::main] async fn main() -> io::Result<()> {
1907 /// let server = ServerOptions::new()
1908 /// .access_outbound(false)
1909 /// .create(PIPE_NAME)?;
1910 ///
1911 /// let e = ClientOptions::new()
1912 /// .open(PIPE_NAME)
1913 /// .unwrap_err();
1914 ///
1915 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1916 /// # Ok(()) }
1917 /// ```
1918 ///
1919 /// Disabling reading allows a client to connect, but attempting to read
1920 /// will error with [`std::io::ErrorKind::PermissionDenied`].
1921 ///
1922 /// ```
1923 /// use std::io;
1924 /// use tokio::io::AsyncReadExt;
1925 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1926 ///
1927 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1928 ///
1929 /// # #[tokio::main] async fn main() -> io::Result<()> {
1930 /// let server = ServerOptions::new()
1931 /// .access_outbound(false)
1932 /// .create(PIPE_NAME)?;
1933 ///
1934 /// let mut client = ClientOptions::new()
1935 /// .read(false)
1936 /// .open(PIPE_NAME)?;
1937 ///
1938 /// server.connect().await?;
1939 ///
1940 /// let mut buf = [0u8; 4];
1941 /// let e = client.read(&mut buf).await.unwrap_err();
1942 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1943 /// # Ok(()) }
1944 /// ```
1945 ///
1946 /// # Examples
1947 ///
1948 /// A unidirectional named pipe that only supports client-to-server
1949 /// communication.
1950 ///
1951 /// ```
1952 /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1953 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1954 ///
1955 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1956 ///
1957 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1958 /// let mut server = ServerOptions::new()
1959 /// .access_outbound(false)
1960 /// .create(PIPE_NAME)?;
1961 ///
1962 /// let mut client = ClientOptions::new()
1963 /// .read(false)
1964 /// .open(PIPE_NAME)?;
1965 ///
1966 /// server.connect().await?;
1967 ///
1968 /// let write = client.write_all(b"ping");
1969 ///
1970 /// let mut buf = [0u8; 4];
1971 /// let read = server.read_exact(&mut buf);
1972 ///
1973 /// let ((), read) = tokio::try_join!(write, read)?;
1974 ///
1975 /// println!("done reading and writing");
1976 ///
1977 /// assert_eq!(read, 4);
1978 /// assert_eq!(&buf[..], b"ping");
1979 /// # Ok(()) }
1980 /// ```
1981 pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1982 self.access_outbound = allowed;
1983 self
1984 }
1985
1986 /// If you attempt to create multiple instances of a pipe with this flag
1987 /// set, creation of the first server instance succeeds, but creation of any
1988 /// subsequent instances will fail with
1989 /// [`std::io::ErrorKind::PermissionDenied`].
1990 ///
1991 /// This option is intended to be used with servers that want to ensure that
1992 /// they are the only process listening for clients on a given named pipe.
1993 /// This is accomplished by enabling it for the first server instance
1994 /// created in a process.
1995 ///
1996 /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1997 ///
1998 /// # Errors
1999 ///
2000 /// If this option is set and more than one instance of the server for a
2001 /// given named pipe exists, calling [`create`] will fail with
2002 /// [`std::io::ErrorKind::PermissionDenied`].
2003 ///
2004 /// ```
2005 /// use std::io;
2006 /// use tokio::net::windows::named_pipe::ServerOptions;
2007 ///
2008 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
2009 ///
2010 /// # #[tokio::main] async fn main() -> io::Result<()> {
2011 /// let server1 = ServerOptions::new()
2012 /// .first_pipe_instance(true)
2013 /// .create(PIPE_NAME)?;
2014 ///
2015 /// // Second server errs, since it's not the first instance.
2016 /// let e = ServerOptions::new()
2017 /// .first_pipe_instance(true)
2018 /// .create(PIPE_NAME)
2019 /// .unwrap_err();
2020 ///
2021 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2022 /// # Ok(()) }
2023 /// ```
2024 ///
2025 /// # Examples
2026 ///
2027 /// ```
2028 /// use std::io;
2029 /// use tokio::net::windows::named_pipe::ServerOptions;
2030 ///
2031 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
2032 ///
2033 /// # #[tokio::main] async fn main() -> io::Result<()> {
2034 /// let mut builder = ServerOptions::new();
2035 /// builder.first_pipe_instance(true);
2036 ///
2037 /// let server = builder.create(PIPE_NAME)?;
2038 /// let e = builder.create(PIPE_NAME).unwrap_err();
2039 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2040 /// drop(server);
2041 ///
2042 /// // OK: since, we've closed the other instance.
2043 /// let _server2 = builder.create(PIPE_NAME)?;
2044 /// # Ok(()) }
2045 /// ```
2046 ///
2047 /// [`create`]: ServerOptions::create
2048 /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
2049 pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
2050 self.first_pipe_instance = first;
2051 self
2052 }
2053
2054 /// Requests permission to modify the pipe's discretionary access control list.
2055 ///
2056 /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
2057 ///
2058 /// # Examples
2059 ///
2060 /// ```
2061 /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2062 ///
2063 /// use tokio::net::windows::named_pipe::ServerOptions;
2064 /// use windows_sys::{
2065 /// Win32::Foundation::ERROR_SUCCESS,
2066 /// Win32::Security::DACL_SECURITY_INFORMATION,
2067 /// Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2068 /// };
2069 ///
2070 /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
2071 ///
2072 /// # #[tokio::main] async fn main() -> io::Result<()> {
2073 /// let mut pipe_template = ServerOptions::new();
2074 /// pipe_template.write_dac(true);
2075 /// let pipe = pipe_template.create(PIPE_NAME)?;
2076 ///
2077 /// unsafe {
2078 /// assert_eq!(
2079 /// ERROR_SUCCESS,
2080 /// SetSecurityInfo(
2081 /// pipe.as_raw_handle() as _,
2082 /// SE_KERNEL_OBJECT,
2083 /// DACL_SECURITY_INFORMATION,
2084 /// ptr::null_mut(),
2085 /// ptr::null_mut(),
2086 /// ptr::null_mut(),
2087 /// ptr::null_mut(),
2088 /// )
2089 /// );
2090 /// }
2091 ///
2092 /// # Ok(()) }
2093 /// ```
2094 ///
2095 /// ```
2096 /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2097 ///
2098 /// use tokio::net::windows::named_pipe::ServerOptions;
2099 /// use windows_sys::{
2100 /// Win32::Foundation::ERROR_ACCESS_DENIED,
2101 /// Win32::Security::DACL_SECURITY_INFORMATION,
2102 /// Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2103 /// };
2104 ///
2105 /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
2106 ///
2107 /// # #[tokio::main] async fn main() -> io::Result<()> {
2108 /// let mut pipe_template = ServerOptions::new();
2109 /// pipe_template.write_dac(false);
2110 /// let pipe = pipe_template.create(PIPE_NAME)?;
2111 ///
2112 /// unsafe {
2113 /// assert_eq!(
2114 /// ERROR_ACCESS_DENIED,
2115 /// SetSecurityInfo(
2116 /// pipe.as_raw_handle() as _,
2117 /// SE_KERNEL_OBJECT,
2118 /// DACL_SECURITY_INFORMATION,
2119 /// ptr::null_mut(),
2120 /// ptr::null_mut(),
2121 /// ptr::null_mut(),
2122 /// ptr::null_mut(),
2123 /// )
2124 /// );
2125 /// }
2126 ///
2127 /// # Ok(()) }
2128 /// ```
2129 ///
2130 /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2131 pub fn write_dac(&mut self, requested: bool) -> &mut Self {
2132 self.write_dac = requested;
2133 self
2134 }
2135
2136 /// Requests permission to modify the pipe's owner.
2137 ///
2138 /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
2139 ///
2140 /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2141 pub fn write_owner(&mut self, requested: bool) -> &mut Self {
2142 self.write_owner = requested;
2143 self
2144 }
2145
2146 /// Requests permission to modify the pipe's system access control list.
2147 ///
2148 /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
2149 ///
2150 /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2151 pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
2152 self.access_system_security = requested;
2153 self
2154 }
2155
2156 /// Indicates whether this server can accept remote clients or not. Remote
2157 /// clients are disabled by default.
2158 ///
2159 /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
2160 ///
2161 /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
2162 pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
2163 self.reject_remote_clients = reject;
2164 self
2165 }
2166
2167 /// The maximum number of instances that can be created for this pipe. The
2168 /// first instance of the pipe can specify this value; the same number must
2169 /// be specified for other instances of the pipe. Acceptable values are in
2170 /// the range 1 through 254. The default value is unlimited.
2171 ///
2172 /// This corresponds to specifying [`nMaxInstances`].
2173 ///
2174 /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2175 ///
2176 /// # Errors
2177 ///
2178 /// The same numbers of `max_instances` have to be used by all servers. Any
2179 /// additional servers trying to be built which uses a mismatching value
2180 /// might error.
2181 ///
2182 /// ```
2183 /// use std::io;
2184 /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2185 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2186 ///
2187 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
2188 ///
2189 /// # #[tokio::main] async fn main() -> io::Result<()> {
2190 /// let mut server = ServerOptions::new();
2191 /// server.max_instances(2);
2192 ///
2193 /// let s1 = server.create(PIPE_NAME)?;
2194 /// let c1 = ClientOptions::new().open(PIPE_NAME);
2195 ///
2196 /// let s2 = server.create(PIPE_NAME)?;
2197 /// let c2 = ClientOptions::new().open(PIPE_NAME);
2198 ///
2199 /// // Too many servers!
2200 /// let e = server.create(PIPE_NAME).unwrap_err();
2201 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2202 ///
2203 /// // Still too many servers even if we specify a higher value!
2204 /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
2205 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2206 /// # Ok(()) }
2207 /// ```
2208 ///
2209 /// # Panics
2210 ///
2211 /// This function will panic if more than 254 instances are specified. If
2212 /// you do not wish to set an instance limit, leave it unspecified.
2213 ///
2214 /// ```should_panic
2215 /// use tokio::net::windows::named_pipe::ServerOptions;
2216 ///
2217 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2218 /// let builder = ServerOptions::new().max_instances(255);
2219 /// # Ok(()) }
2220 /// ```
2221 #[track_caller]
2222 pub fn max_instances(&mut self, instances: usize) -> &mut Self {
2223 assert!(instances < 255, "cannot specify more than 254 instances");
2224 self.max_instances = instances as u32;
2225 self
2226 }
2227
2228 /// The number of bytes to reserve for the output buffer.
2229 ///
2230 /// This corresponds to specifying [`nOutBufferSize`].
2231 ///
2232 /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2233 pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
2234 self.out_buffer_size = buffer;
2235 self
2236 }
2237
2238 /// The number of bytes to reserve for the input buffer.
2239 ///
2240 /// This corresponds to specifying [`nInBufferSize`].
2241 ///
2242 /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2243 pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
2244 self.in_buffer_size = buffer;
2245 self
2246 }
2247
2248 /// Creates the named pipe identified by `addr` for use as a server.
2249 ///
2250 /// This uses the [`CreateNamedPipe`] function.
2251 ///
2252 /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2253 ///
2254 /// # Errors
2255 ///
2256 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2257 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2258 ///
2259 /// [Tokio Runtime]: crate::runtime::Runtime
2260 /// [enabled I/O]: crate::runtime::Builder::enable_io
2261 ///
2262 /// # Examples
2263 ///
2264 /// ```
2265 /// use tokio::net::windows::named_pipe::ServerOptions;
2266 ///
2267 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
2268 ///
2269 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2270 /// let server = ServerOptions::new().create(PIPE_NAME)?;
2271 /// # Ok(()) }
2272 /// ```
2273 pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
2274 // Safety: We're calling create_with_security_attributes_raw w/ a null
2275 // pointer which disables it.
2276 unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
2277 }
2278
2279 /// Creates the named pipe identified by `addr` for use as a server.
2280 ///
2281 /// This is the same as [`create`] except that it supports providing the raw
2282 /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2283 /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2284 ///
2285 /// # Errors
2286 ///
2287 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2288 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2289 ///
2290 /// [Tokio Runtime]: crate::runtime::Runtime
2291 /// [enabled I/O]: crate::runtime::Builder::enable_io
2292 ///
2293 /// # Safety
2294 ///
2295 /// The `attrs` argument must either be null or point at a valid instance of
2296 /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2297 /// behavior is identical to calling the [`create`] method.
2298 ///
2299 /// [`create`]: ServerOptions::create
2300 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2301 /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
2302 pub unsafe fn create_with_security_attributes_raw(
2303 &self,
2304 addr: impl AsRef<OsStr>,
2305 attrs: *mut c_void,
2306 ) -> io::Result<NamedPipeServer> {
2307 let addr = encode_addr(addr);
2308
2309 let pipe_mode = {
2310 let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
2311 windows_sys::PIPE_TYPE_MESSAGE | windows_sys::PIPE_READMODE_MESSAGE
2312 } else {
2313 windows_sys::PIPE_TYPE_BYTE | windows_sys::PIPE_READMODE_BYTE
2314 };
2315 if self.reject_remote_clients {
2316 mode |= windows_sys::PIPE_REJECT_REMOTE_CLIENTS;
2317 } else {
2318 mode |= windows_sys::PIPE_ACCEPT_REMOTE_CLIENTS;
2319 }
2320 mode
2321 };
2322 let open_mode = {
2323 let mut mode = windows_sys::FILE_FLAG_OVERLAPPED;
2324 if self.access_inbound {
2325 mode |= windows_sys::PIPE_ACCESS_INBOUND;
2326 }
2327 if self.access_outbound {
2328 mode |= windows_sys::PIPE_ACCESS_OUTBOUND;
2329 }
2330 if self.first_pipe_instance {
2331 mode |= windows_sys::FILE_FLAG_FIRST_PIPE_INSTANCE;
2332 }
2333 if self.write_dac {
2334 mode |= windows_sys::WRITE_DAC;
2335 }
2336 if self.write_owner {
2337 mode |= windows_sys::WRITE_OWNER;
2338 }
2339 if self.access_system_security {
2340 mode |= windows_sys::ACCESS_SYSTEM_SECURITY;
2341 }
2342 mode
2343 };
2344
2345 let h = windows_sys::CreateNamedPipeW(
2346 addr.as_ptr(),
2347 open_mode,
2348 pipe_mode,
2349 self.max_instances,
2350 self.out_buffer_size,
2351 self.in_buffer_size,
2352 self.default_timeout,
2353 attrs as *mut _,
2354 );
2355
2356 if h == windows_sys::INVALID_HANDLE_VALUE {
2357 return Err(io::Error::last_os_error());
2358 }
2359
2360 NamedPipeServer::from_raw_handle(h as _)
2361 }
2362}
2363
2364/// A builder suitable for building and interacting with named pipes from the
2365/// client side.
2366///
2367/// See [`ClientOptions::open`].
2368#[derive(Debug, Clone)]
2369pub struct ClientOptions {
2370 generic_read: bool,
2371 generic_write: bool,
2372 security_qos_flags: u32,
2373 pipe_mode: PipeMode,
2374}
2375
2376impl ClientOptions {
2377 /// Creates a new named pipe builder with the default settings.
2378 ///
2379 /// ```
2380 /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2381 ///
2382 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
2383 ///
2384 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2385 /// // Server must be created in order for the client creation to succeed.
2386 /// let server = ServerOptions::new().create(PIPE_NAME)?;
2387 /// let client = ClientOptions::new().open(PIPE_NAME)?;
2388 /// # Ok(()) }
2389 /// ```
2390 pub fn new() -> Self {
2391 Self {
2392 generic_read: true,
2393 generic_write: true,
2394 security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
2395 | windows_sys::SECURITY_SQOS_PRESENT,
2396 pipe_mode: PipeMode::Byte,
2397 }
2398 }
2399
2400 /// If the client supports reading data. This is enabled by default.
2401 ///
2402 /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
2403 ///
2404 /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2405 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2406 pub fn read(&mut self, allowed: bool) -> &mut Self {
2407 self.generic_read = allowed;
2408 self
2409 }
2410
2411 /// If the created pipe supports writing data. This is enabled by default.
2412 ///
2413 /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2414 ///
2415 /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2416 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2417 pub fn write(&mut self, allowed: bool) -> &mut Self {
2418 self.generic_write = allowed;
2419 self
2420 }
2421
2422 /// Sets qos flags which are combined with other flags and attributes in the
2423 /// call to [`CreateFile`].
2424 ///
2425 /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2426 /// calling this function would override that value completely with the
2427 /// argument specified.
2428 ///
2429 /// When `security_qos_flags` is not set, a malicious program can gain the
2430 /// elevated privileges of a privileged Rust process when it allows opening
2431 /// user-specified paths, by tricking it into opening a named pipe. So
2432 /// arguably `security_qos_flags` should also be set when opening arbitrary
2433 /// paths. However the bits can then conflict with other flags, specifically
2434 /// `FILE_FLAG_OPEN_NO_RECALL`.
2435 ///
2436 /// For information about possible values, see [Impersonation Levels] on the
2437 /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2438 /// automatically when using this method.
2439 ///
2440 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2441 /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
2442 /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
2443 pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2444 // See: https://github.com/rust-lang/rust/pull/58216
2445 self.security_qos_flags = flags | windows_sys::SECURITY_SQOS_PRESENT;
2446 self
2447 }
2448
2449 /// The pipe mode.
2450 ///
2451 /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
2452 /// documentation of what each mode means.
2453 pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
2454 self.pipe_mode = pipe_mode;
2455 self
2456 }
2457
2458 /// Opens the named pipe identified by `addr`.
2459 ///
2460 /// This opens the client using [`CreateFile`] with the
2461 /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2462 ///
2463 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2464 ///
2465 /// # Errors
2466 ///
2467 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2468 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2469 ///
2470 /// There are a few errors you need to take into account when creating a
2471 /// named pipe on the client side:
2472 ///
2473 /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2474 /// does not exist. Presumably the server is not up.
2475 /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2476 /// but the server is not currently waiting for a connection. Please see the
2477 /// examples for how to check for this error.
2478 ///
2479 /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
2480 /// [enabled I/O]: crate::runtime::Builder::enable_io
2481 /// [Tokio Runtime]: crate::runtime::Runtime
2482 ///
2483 /// A connect loop that waits until a pipe becomes available looks like
2484 /// this:
2485 ///
2486 /// ```no_run
2487 /// use std::time::Duration;
2488 /// use tokio::net::windows::named_pipe::ClientOptions;
2489 /// use tokio::time;
2490 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2491 ///
2492 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2493 ///
2494 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2495 /// let client = loop {
2496 /// match ClientOptions::new().open(PIPE_NAME) {
2497 /// Ok(client) => break client,
2498 /// Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
2499 /// Err(e) => return Err(e),
2500 /// }
2501 ///
2502 /// time::sleep(Duration::from_millis(50)).await;
2503 /// };
2504 ///
2505 /// // use the connected client.
2506 /// # Ok(()) }
2507 /// ```
2508 pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2509 // Safety: We're calling open_with_security_attributes_raw w/ a null
2510 // pointer which disables it.
2511 unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2512 }
2513
2514 /// Opens the named pipe identified by `addr`.
2515 ///
2516 /// This is the same as [`open`] except that it supports providing the raw
2517 /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2518 /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2519 ///
2520 /// # Safety
2521 ///
2522 /// The `attrs` argument must either be null or point at a valid instance of
2523 /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2524 /// behavior is identical to calling the [`open`] method.
2525 ///
2526 /// [`open`]: ClientOptions::open
2527 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2528 /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
2529 pub unsafe fn open_with_security_attributes_raw(
2530 &self,
2531 addr: impl AsRef<OsStr>,
2532 attrs: *mut c_void,
2533 ) -> io::Result<NamedPipeClient> {
2534 let addr = encode_addr(addr);
2535
2536 let desired_access = {
2537 let mut access = 0;
2538 if self.generic_read {
2539 access |= windows_sys::GENERIC_READ;
2540 }
2541 if self.generic_write {
2542 access |= windows_sys::GENERIC_WRITE;
2543 }
2544 access
2545 };
2546
2547 // NB: We could use a platform specialized `OpenOptions` here, but since
2548 // we have access to windows_sys it ultimately doesn't hurt to use
2549 // `CreateFile` explicitly since it allows the use of our already
2550 // well-structured wide `addr` to pass into CreateFileW.
2551 let h = windows_sys::CreateFileW(
2552 addr.as_ptr(),
2553 desired_access,
2554 0,
2555 attrs as *mut _,
2556 windows_sys::OPEN_EXISTING,
2557 self.get_flags(),
2558 0,
2559 );
2560
2561 if h == windows_sys::INVALID_HANDLE_VALUE {
2562 return Err(io::Error::last_os_error());
2563 }
2564
2565 if matches!(self.pipe_mode, PipeMode::Message) {
2566 let mode = windows_sys::PIPE_READMODE_MESSAGE;
2567 let result =
2568 windows_sys::SetNamedPipeHandleState(h, &mode, ptr::null_mut(), ptr::null_mut());
2569
2570 if result == 0 {
2571 return Err(io::Error::last_os_error());
2572 }
2573 }
2574
2575 NamedPipeClient::from_raw_handle(h as _)
2576 }
2577
2578 fn get_flags(&self) -> u32 {
2579 self.security_qos_flags | windows_sys::FILE_FLAG_OVERLAPPED
2580 }
2581}
2582
2583/// The pipe mode of a named pipe.
2584///
2585/// Set through [`ServerOptions::pipe_mode`].
2586#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2587#[non_exhaustive]
2588pub enum PipeMode {
2589 /// Data is written to the pipe as a stream of bytes. The pipe does not
2590 /// distinguish bytes written during different write operations.
2591 ///
2592 /// Corresponds to [`PIPE_TYPE_BYTE`].
2593 ///
2594 /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
2595 Byte,
2596 /// Data is written to the pipe as a stream of messages. The pipe treats the
2597 /// bytes written during each write operation as a message unit. Any reading
2598 /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2599 /// completely.
2600 ///
2601 /// Corresponds to [`PIPE_TYPE_MESSAGE`].
2602 ///
2603 /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
2604 /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
2605 Message,
2606}
2607
2608/// Indicates the end of a named pipe.
2609#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2610#[non_exhaustive]
2611pub enum PipeEnd {
2612 /// The named pipe refers to the client end of a named pipe instance.
2613 ///
2614 /// Corresponds to [`PIPE_CLIENT_END`].
2615 ///
2616 /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
2617 Client,
2618 /// The named pipe refers to the server end of a named pipe instance.
2619 ///
2620 /// Corresponds to [`PIPE_SERVER_END`].
2621 ///
2622 /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
2623 Server,
2624}
2625
2626/// Information about a named pipe.
2627///
2628/// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2629#[derive(Debug)]
2630#[non_exhaustive]
2631pub struct PipeInfo {
2632 /// Indicates the mode of a named pipe.
2633 pub mode: PipeMode,
2634 /// Indicates the end of a named pipe.
2635 pub end: PipeEnd,
2636 /// The maximum number of instances that can be created for this pipe.
2637 pub max_instances: u32,
2638 /// The number of bytes to reserve for the output buffer.
2639 pub out_buffer_size: u32,
2640 /// The number of bytes to reserve for the input buffer.
2641 pub in_buffer_size: u32,
2642}
2643
2644/// Encodes an address so that it is a null-terminated wide string.
2645fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2646 let len = addr.as_ref().encode_wide().count();
2647 let mut vec = Vec::with_capacity(len + 1);
2648 vec.extend(addr.as_ref().encode_wide());
2649 vec.push(0);
2650 vec.into_boxed_slice()
2651}
2652
2653/// Internal function to get the info out of a raw named pipe.
2654unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2655 let mut flags = 0;
2656 let mut out_buffer_size = 0;
2657 let mut in_buffer_size = 0;
2658 let mut max_instances = 0;
2659
2660 let result = windows_sys::GetNamedPipeInfo(
2661 handle as _,
2662 &mut flags,
2663 &mut out_buffer_size,
2664 &mut in_buffer_size,
2665 &mut max_instances,
2666 );
2667
2668 if result == 0 {
2669 return Err(io::Error::last_os_error());
2670 }
2671
2672 let mut end = PipeEnd::Client;
2673 let mut mode = PipeMode::Byte;
2674
2675 if flags & windows_sys::PIPE_SERVER_END != 0 {
2676 end = PipeEnd::Server;
2677 }
2678
2679 if flags & windows_sys::PIPE_TYPE_MESSAGE != 0 {
2680 mode = PipeMode::Message;
2681 }
2682
2683 Ok(PipeInfo {
2684 end,
2685 mode,
2686 out_buffer_size,
2687 in_buffer_size,
2688 max_instances,
2689 })
2690}