compio_fs/pipe.rs
1//! Unix pipe types.
2
3use std::{
4 future::Future,
5 io,
6 os::fd::{FromRawFd, IntoRawFd},
7 path::Path,
8};
9
10use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
11use compio_driver::{
12 AsRawFd, ToSharedFd, impl_raw_fd,
13 op::{BufResultExt, Recv, RecvVectored, Send, SendVectored},
14 syscall,
15};
16use compio_io::{AsyncRead, AsyncWrite};
17
18use crate::File;
19
20/// Creates a pair of anonymous pipe.
21///
22/// ```
23/// use compio_fs::pipe::anonymous;
24/// use compio_io::{AsyncReadExt, AsyncWriteExt};
25///
26/// # compio_runtime::Runtime::new().unwrap().block_on(async {
27/// let (mut rx, mut tx) = anonymous().unwrap();
28///
29/// tx.write_all("Hello world!").await.unwrap();
30/// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap();
31/// assert_eq!(&buf, b"Hello world!");
32/// # });
33/// ```
34pub fn anonymous() -> io::Result<(Receiver, Sender)> {
35 let (receiver, sender) = os_pipe::pipe()?;
36 let receiver = Receiver::from_file(File::from_std(unsafe {
37 std::fs::File::from_raw_fd(receiver.into_raw_fd())
38 })?)?;
39 let sender = Sender::from_file(File::from_std(unsafe {
40 std::fs::File::from_raw_fd(sender.into_raw_fd())
41 })?)?;
42 Ok((receiver, sender))
43}
44
45/// Options and flags which can be used to configure how a FIFO file is opened.
46///
47/// This builder allows configuring how to create a pipe end from a FIFO file.
48/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
49/// then chain calls to methods to set each option, then call either
50/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
51/// are trying to open. This will give you a [`io::Result`] with a pipe end
52/// inside that you can further operate on.
53///
54/// [`new`]: OpenOptions::new
55/// [`open_receiver`]: OpenOptions::open_receiver
56/// [`open_sender`]: OpenOptions::open_sender
57///
58/// # Examples
59///
60/// Opening a pair of pipe ends from a FIFO file:
61///
62/// ```no_run
63/// use compio_fs::pipe;
64///
65/// const FIFO_NAME: &str = "path/to/a/fifo";
66///
67/// # async fn dox() -> std::io::Result<()> {
68/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
69/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME).await?;
70/// # Ok(())
71/// # }
72/// ```
73///
74/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
75///
76/// ```ignore
77/// use compio_fs::pipe;
78/// use nix::{sys::stat::Mode, unistd::mkfifo};
79///
80/// // Our program has exclusive access to this path.
81/// const FIFO_NAME: &str = "path/to/a/new/fifo";
82///
83/// # async fn dox() -> std::io::Result<()> {
84/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
85/// let tx = pipe::OpenOptions::new()
86/// .read_write(true)
87/// .unchecked(true)
88/// .open_sender(FIFO_NAME)?;
89/// # Ok(())
90/// # }
91/// ```
92#[derive(Clone, Debug)]
93pub struct OpenOptions {
94 #[cfg(target_os = "linux")]
95 read_write: bool,
96 unchecked: bool,
97}
98
99impl OpenOptions {
100 /// Creates a blank new set of options ready for configuration.
101 ///
102 /// All options are initially set to `false`.
103 pub fn new() -> OpenOptions {
104 OpenOptions {
105 #[cfg(target_os = "linux")]
106 read_write: false,
107 unchecked: false,
108 }
109 }
110
111 /// Sets the option for read-write access.
112 ///
113 /// This option, when true, will indicate that a FIFO file will be opened
114 /// in read-write access mode. This operation is not defined by the POSIX
115 /// standard and is only guaranteed to work on Linux.
116 ///
117 /// # Examples
118 ///
119 /// Opening a [`Sender`] even if there are no open reading ends:
120 ///
121 /// ```
122 /// use compio_fs::pipe;
123 ///
124 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
125 /// let tx = pipe::OpenOptions::new()
126 /// .read_write(true)
127 /// .open_sender("path/to/a/fifo")
128 /// .await;
129 /// # });
130 /// ```
131 ///
132 /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
133 /// fail with [`UnexpectedEof`] during reading if all writing ends of the
134 /// pipe close the FIFO file.
135 ///
136 /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
137 ///
138 /// ```
139 /// use compio_fs::pipe;
140 ///
141 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
142 /// let tx = pipe::OpenOptions::new()
143 /// .read_write(true)
144 /// .open_receiver("path/to/a/fifo")
145 /// .await;
146 /// # });
147 /// ```
148 #[cfg(target_os = "linux")]
149 #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
150 pub fn read_write(&mut self, value: bool) -> &mut Self {
151 self.read_write = value;
152 self
153 }
154
155 /// Sets the option to skip the check for FIFO file type.
156 ///
157 /// By default, [`open_receiver`] and [`open_sender`] functions will check
158 /// if the opened file is a FIFO file. Set this option to `true` if you are
159 /// sure the file is a FIFO file.
160 ///
161 /// [`open_receiver`]: OpenOptions::open_receiver
162 /// [`open_sender`]: OpenOptions::open_sender
163 ///
164 /// # Examples
165 ///
166 /// ```no_run
167 /// use compio_fs::pipe;
168 /// use nix::{sys::stat::Mode, unistd::mkfifo};
169 ///
170 /// // Our program has exclusive access to this path.
171 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
172 ///
173 /// # async fn dox() -> std::io::Result<()> {
174 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
175 /// let rx = pipe::OpenOptions::new()
176 /// .unchecked(true)
177 /// .open_receiver(FIFO_NAME)
178 /// .await?;
179 /// # Ok(())
180 /// # }
181 /// ```
182 pub fn unchecked(&mut self, value: bool) -> &mut Self {
183 self.unchecked = value;
184 self
185 }
186
187 /// Creates a [`Receiver`] from a FIFO file with the options specified by
188 /// `self`.
189 ///
190 /// This function will open the FIFO file at the specified path, possibly
191 /// check if it is a pipe, and associate the pipe with the default event
192 /// loop for reading.
193 ///
194 /// # Errors
195 ///
196 /// If the file type check fails, this function will fail with
197 /// `io::ErrorKind::InvalidInput`. This function may also fail with
198 /// other standard OS errors.
199 pub async fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
200 let file = self.open(path.as_ref(), PipeEnd::Receiver).await?;
201 Receiver::from_file(file)
202 }
203
204 /// Creates a [`Sender`] from a FIFO file with the options specified by
205 /// `self`.
206 ///
207 /// This function will open the FIFO file at the specified path, possibly
208 /// check if it is a pipe, and associate the pipe with the default event
209 /// loop for writing.
210 ///
211 /// # Errors
212 ///
213 /// If the file type check fails, this function will fail with
214 /// `io::ErrorKind::InvalidInput`. If the file is not opened in
215 /// read-write access mode and the file is not currently open for
216 /// reading, this function will fail with `ENXIO`. This function may
217 /// also fail with other standard OS errors.
218 pub async fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
219 let file = self.open(path.as_ref(), PipeEnd::Sender).await?;
220 Sender::from_file(file)
221 }
222
223 async fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
224 let mut options = crate::OpenOptions::new();
225 options
226 .read(pipe_end == PipeEnd::Receiver)
227 .write(pipe_end == PipeEnd::Sender);
228
229 #[cfg(target_os = "linux")]
230 if self.read_write {
231 options.read(true).write(true);
232 }
233
234 let file = options.open(path).await?;
235
236 if !self.unchecked && !is_fifo(&file).await? {
237 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
238 }
239
240 Ok(file)
241 }
242}
243
244impl Default for OpenOptions {
245 fn default() -> OpenOptions {
246 OpenOptions::new()
247 }
248}
249
250#[derive(Clone, Copy, PartialEq, Eq, Debug)]
251enum PipeEnd {
252 Sender,
253 Receiver,
254}
255
256/// Writing end of a Unix pipe.
257///
258/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
259///
260/// Opening a named pipe for writing involves a few steps.
261/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
262/// different things:
263///
264/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
265/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
266/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
267/// Sleep for a while and try again.
268/// * Other OS errors not specific to opening FIFO files.
269///
270/// Opening a `Sender` from a FIFO file should look like this:
271///
272/// ```no_run
273/// use std::time::Duration;
274///
275/// use compio_fs::pipe;
276/// use compio_runtime::time;
277///
278/// const FIFO_NAME: &str = "path/to/a/fifo";
279///
280/// # async fn dox() -> std::io::Result<()> {
281/// // Wait for a reader to open the file.
282/// let tx = loop {
283/// match pipe::OpenOptions::new().open_sender(FIFO_NAME).await {
284/// Ok(tx) => break tx,
285/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}
286/// Err(e) => return Err(e.into()),
287/// }
288///
289/// time::sleep(Duration::from_millis(50)).await;
290/// };
291/// # Ok(())
292/// # }
293/// ```
294///
295/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
296/// loop. This is done by opening a named pipe in read-write access mode with
297/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
298/// both a writing end and a reading end, and the latter allows to open a FIFO
299/// without [`ENXIO`] error since the pipe is open for reading as well.
300///
301/// `Sender` cannot be used to read from a pipe, so in practice the read access
302/// is only used when a FIFO is opened. However, using a `Sender` in read-write
303/// mode **may lead to lost data**, because written data will be dropped by the
304/// system as soon as all pipe ends are closed. To avoid lost data you have to
305/// make sure that a reading end has been opened before dropping a `Sender`.
306///
307/// Note that using read-write access mode with FIFO files is not defined by
308/// the POSIX standard and it is only guaranteed to work on Linux.
309///
310/// ```ignore
311/// use compio_fs::pipe;
312/// use compio_io::AsyncWriteExt;
313///
314/// const FIFO_NAME: &str = "path/to/a/fifo";
315///
316/// # async fn dox() {
317/// let mut tx = pipe::OpenOptions::new()
318/// .read_write(true)
319/// .open_sender(FIFO_NAME)
320/// .unwrap();
321///
322/// // Asynchronously write to the pipe before a reader.
323/// tx.write_all("hello world").await.unwrap();
324/// # }
325/// ```
326///
327/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
328#[derive(Debug, Clone)]
329pub struct Sender {
330 file: File,
331}
332
333impl Sender {
334 pub(crate) fn from_file(file: File) -> io::Result<Sender> {
335 set_nonblocking(&file)?;
336 Ok(Sender { file })
337 }
338
339 /// Close the pipe. If the returned future is dropped before polling, the
340 /// pipe won't be closed.
341 pub fn close(self) -> impl Future<Output = io::Result<()>> {
342 self.file.close()
343 }
344}
345
346impl AsyncWrite for Sender {
347 #[inline]
348 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
349 (&*self).write(buf).await
350 }
351
352 #[inline]
353 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
354 (&*self).write_vectored(buf).await
355 }
356
357 #[inline]
358 async fn flush(&mut self) -> io::Result<()> {
359 (&*self).flush().await
360 }
361
362 #[inline]
363 async fn shutdown(&mut self) -> io::Result<()> {
364 (&*self).shutdown().await
365 }
366}
367
368impl AsyncWrite for &Sender {
369 async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
370 let fd = self.to_shared_fd();
371 let op = Send::new(fd, buffer);
372 compio_runtime::submit(op).await.into_inner()
373 }
374
375 async fn write_vectored<T: IoVectoredBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
376 let fd = self.to_shared_fd();
377 let op = SendVectored::new(fd, buffer);
378 compio_runtime::submit(op).await.into_inner()
379 }
380
381 #[inline]
382 async fn flush(&mut self) -> io::Result<()> {
383 Ok(())
384 }
385
386 #[inline]
387 async fn shutdown(&mut self) -> io::Result<()> {
388 Ok(())
389 }
390}
391
392impl_raw_fd!(Sender, std::fs::File, file, file);
393
394/// Reading end of a Unix pipe.
395///
396/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
397///
398/// # Examples
399///
400/// Receiving messages from a named pipe in a loop:
401///
402/// ```no_run
403/// use std::io;
404///
405/// use compio_buf::BufResult;
406/// use compio_fs::pipe;
407/// use compio_io::AsyncReadExt;
408///
409/// const FIFO_NAME: &str = "path/to/a/fifo";
410///
411/// # async fn dox() -> io::Result<()> {
412/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
413/// loop {
414/// let mut msg = Vec::with_capacity(256);
415/// let BufResult(res, msg) = rx.read_exact(msg).await;
416/// match res {
417/// Ok(_) => { /* handle the message */ }
418/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
419/// // Writing end has been closed, we should reopen the pipe.
420/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
421/// }
422/// Err(e) => return Err(e.into()),
423/// }
424/// }
425/// # }
426/// ```
427///
428/// On Linux, you can use a `Receiver` in read-write access mode to implement
429/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
430/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
431/// when the writing end is closed. This way, a `Receiver` can asynchronously
432/// wait for the next writer to open the pipe.
433///
434/// You should not use functions waiting for EOF such as [`read_to_end`] with
435/// a `Receiver` in read-write access mode, since it **may wait forever**.
436/// `Receiver` in this mode also holds an open writing end, which prevents
437/// receiving EOF.
438///
439/// To set the read-write access mode you can use `OpenOptions::read_write`.
440/// Note that using read-write access mode with FIFO files is not defined by
441/// the POSIX standard and it is only guaranteed to work on Linux.
442///
443/// ```ignore
444/// use compio_fs::pipe;
445/// use compio_io::AsyncReadExt;
446///
447/// const FIFO_NAME: &str = "path/to/a/fifo";
448///
449/// # async fn dox() {
450/// let mut rx = pipe::OpenOptions::new()
451/// .read_write(true)
452/// .open_receiver(FIFO_NAME)
453/// .unwrap();
454/// loop {
455/// let mut msg = Vec::with_capacity(256);
456/// rx.read_exact(msg).await.unwrap();
457/// // handle the message
458/// }
459/// # }
460/// ```
461///
462/// [`read_to_end`]: compio_io::AsyncReadExt::read_to_end
463#[derive(Debug, Clone)]
464pub struct Receiver {
465 file: File,
466}
467
468impl Receiver {
469 pub(crate) fn from_file(file: File) -> io::Result<Receiver> {
470 set_nonblocking(&file)?;
471 Ok(Receiver { file })
472 }
473
474 /// Close the pipe. If the returned future is dropped before polling, the
475 /// pipe won't be closed.
476 pub fn close(self) -> impl Future<Output = io::Result<()>> {
477 self.file.close()
478 }
479}
480
481impl AsyncRead for Receiver {
482 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
483 (&*self).read(buf).await
484 }
485
486 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
487 (&*self).read_vectored(buf).await
488 }
489}
490
491impl AsyncRead for &Receiver {
492 async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
493 let fd = self.to_shared_fd();
494 let op = Recv::new(fd, buffer);
495 compio_runtime::submit(op).await.into_inner().map_advanced()
496 }
497
498 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buffer: V) -> BufResult<usize, V> {
499 let fd = self.to_shared_fd();
500 let op = RecvVectored::new(fd, buffer);
501 compio_runtime::submit(op).await.into_inner().map_advanced()
502 }
503}
504
505impl_raw_fd!(Receiver, std::fs::File, file, file);
506
507/// Checks if file is a FIFO
508async fn is_fifo(file: &File) -> io::Result<bool> {
509 use std::os::unix::prelude::FileTypeExt;
510
511 Ok(file.metadata().await?.file_type().is_fifo())
512}
513
514/// Sets file's flags with O_NONBLOCK by fcntl.
515fn set_nonblocking(file: &impl AsRawFd) -> io::Result<()> {
516 if cfg!(not(all(target_os = "linux", feature = "io-uring"))) {
517 let fd = file.as_raw_fd();
518 let current_flags = syscall!(libc::fcntl(fd, libc::F_GETFL))?;
519 let flags = current_flags | libc::O_NONBLOCK;
520 if flags != current_flags {
521 syscall!(libc::fcntl(fd, libc::F_SETFL, flags))?;
522 }
523 }
524 Ok(())
525}