compio_fs/
pipe.rs

1//! Unix pipe types.
2
3use std::{
4    future::Future,
5    io,
6    os::fd::{FromRawFd, IntoRawFd},
7    path::Path,
8};
9
10use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
11use compio_driver::{
12    AsRawFd, ToSharedFd, impl_raw_fd,
13    op::{BufResultExt, Recv, RecvVectored, Send, SendVectored},
14    syscall,
15};
16use compio_io::{AsyncRead, AsyncWrite};
17
18use crate::File;
19
20/// Creates a pair of anonymous pipe.
21///
22/// ```
23/// use compio_fs::pipe::anonymous;
24/// use compio_io::{AsyncReadExt, AsyncWriteExt};
25///
26/// # compio_runtime::Runtime::new().unwrap().block_on(async {
27/// let (mut rx, mut tx) = anonymous().unwrap();
28///
29/// tx.write_all("Hello world!").await.unwrap();
30/// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap();
31/// assert_eq!(&buf, b"Hello world!");
32/// # });
33/// ```
34pub fn anonymous() -> io::Result<(Receiver, Sender)> {
35    let (receiver, sender) = os_pipe::pipe()?;
36    let receiver = Receiver::from_file(File::from_std(unsafe {
37        std::fs::File::from_raw_fd(receiver.into_raw_fd())
38    })?)?;
39    let sender = Sender::from_file(File::from_std(unsafe {
40        std::fs::File::from_raw_fd(sender.into_raw_fd())
41    })?)?;
42    Ok((receiver, sender))
43}
44
45/// Options and flags which can be used to configure how a FIFO file is opened.
46///
47/// This builder allows configuring how to create a pipe end from a FIFO file.
48/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
49/// then chain calls to methods to set each option, then call either
50/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
51/// are trying to open. This will give you a [`io::Result`] with a pipe end
52/// inside that you can further operate on.
53///
54/// [`new`]: OpenOptions::new
55/// [`open_receiver`]: OpenOptions::open_receiver
56/// [`open_sender`]: OpenOptions::open_sender
57///
58/// # Examples
59///
60/// Opening a pair of pipe ends from a FIFO file:
61///
62/// ```no_run
63/// use compio_fs::pipe;
64///
65/// const FIFO_NAME: &str = "path/to/a/fifo";
66///
67/// # async fn dox() -> std::io::Result<()> {
68/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
69/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME).await?;
70/// # Ok(())
71/// # }
72/// ```
73///
74/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
75///
76/// ```ignore
77/// use compio_fs::pipe;
78/// use nix::{sys::stat::Mode, unistd::mkfifo};
79///
80/// // Our program has exclusive access to this path.
81/// const FIFO_NAME: &str = "path/to/a/new/fifo";
82///
83/// # async fn dox() -> std::io::Result<()> {
84/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
85/// let tx = pipe::OpenOptions::new()
86///     .read_write(true)
87///     .unchecked(true)
88///     .open_sender(FIFO_NAME)?;
89/// # Ok(())
90/// # }
91/// ```
92#[derive(Clone, Debug)]
93pub struct OpenOptions {
94    #[cfg(target_os = "linux")]
95    read_write: bool,
96    unchecked: bool,
97}
98
99impl OpenOptions {
100    /// Creates a blank new set of options ready for configuration.
101    ///
102    /// All options are initially set to `false`.
103    pub fn new() -> OpenOptions {
104        OpenOptions {
105            #[cfg(target_os = "linux")]
106            read_write: false,
107            unchecked: false,
108        }
109    }
110
111    /// Sets the option for read-write access.
112    ///
113    /// This option, when true, will indicate that a FIFO file will be opened
114    /// in read-write access mode. This operation is not defined by the POSIX
115    /// standard and is only guaranteed to work on Linux.
116    ///
117    /// # Examples
118    ///
119    /// Opening a [`Sender`] even if there are no open reading ends:
120    ///
121    /// ```
122    /// use compio_fs::pipe;
123    ///
124    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
125    /// let tx = pipe::OpenOptions::new()
126    ///     .read_write(true)
127    ///     .open_sender("path/to/a/fifo")
128    ///     .await;
129    /// # });
130    /// ```
131    ///
132    /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
133    /// fail with [`UnexpectedEof`] during reading if all writing ends of the
134    /// pipe close the FIFO file.
135    ///
136    /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
137    ///
138    /// ```
139    /// use compio_fs::pipe;
140    ///
141    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
142    /// let tx = pipe::OpenOptions::new()
143    ///     .read_write(true)
144    ///     .open_receiver("path/to/a/fifo")
145    ///     .await;
146    /// # });
147    /// ```
148    #[cfg(target_os = "linux")]
149    #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
150    pub fn read_write(&mut self, value: bool) -> &mut Self {
151        self.read_write = value;
152        self
153    }
154
155    /// Sets the option to skip the check for FIFO file type.
156    ///
157    /// By default, [`open_receiver`] and [`open_sender`] functions will check
158    /// if the opened file is a FIFO file. Set this option to `true` if you are
159    /// sure the file is a FIFO file.
160    ///
161    /// [`open_receiver`]: OpenOptions::open_receiver
162    /// [`open_sender`]: OpenOptions::open_sender
163    ///
164    /// # Examples
165    ///
166    /// ```no_run
167    /// use compio_fs::pipe;
168    /// use nix::{sys::stat::Mode, unistd::mkfifo};
169    ///
170    /// // Our program has exclusive access to this path.
171    /// const FIFO_NAME: &str = "path/to/a/new/fifo";
172    ///
173    /// # async fn dox() -> std::io::Result<()> {
174    /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
175    /// let rx = pipe::OpenOptions::new()
176    ///     .unchecked(true)
177    ///     .open_receiver(FIFO_NAME)
178    ///     .await?;
179    /// # Ok(())
180    /// # }
181    /// ```
182    pub fn unchecked(&mut self, value: bool) -> &mut Self {
183        self.unchecked = value;
184        self
185    }
186
187    /// Creates a [`Receiver`] from a FIFO file with the options specified by
188    /// `self`.
189    ///
190    /// This function will open the FIFO file at the specified path, possibly
191    /// check if it is a pipe, and associate the pipe with the default event
192    /// loop for reading.
193    ///
194    /// # Errors
195    ///
196    /// If the file type check fails, this function will fail with
197    /// `io::ErrorKind::InvalidInput`. This function may also fail with
198    /// other standard OS errors.
199    pub async fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
200        let file = self.open(path.as_ref(), PipeEnd::Receiver).await?;
201        Receiver::from_file(file)
202    }
203
204    /// Creates a [`Sender`] from a FIFO file with the options specified by
205    /// `self`.
206    ///
207    /// This function will open the FIFO file at the specified path, possibly
208    /// check if it is a pipe, and associate the pipe with the default event
209    /// loop for writing.
210    ///
211    /// # Errors
212    ///
213    /// If the file type check fails, this function will fail with
214    /// `io::ErrorKind::InvalidInput`. If the file is not opened in
215    /// read-write access mode and the file is not currently open for
216    /// reading, this function will fail with `ENXIO`. This function may
217    /// also fail with other standard OS errors.
218    pub async fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
219        let file = self.open(path.as_ref(), PipeEnd::Sender).await?;
220        Sender::from_file(file)
221    }
222
223    async fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
224        let mut options = crate::OpenOptions::new();
225        options
226            .read(pipe_end == PipeEnd::Receiver)
227            .write(pipe_end == PipeEnd::Sender);
228
229        #[cfg(target_os = "linux")]
230        if self.read_write {
231            options.read(true).write(true);
232        }
233
234        let file = options.open(path).await?;
235
236        if !self.unchecked && !is_fifo(&file).await? {
237            return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
238        }
239
240        Ok(file)
241    }
242}
243
244impl Default for OpenOptions {
245    fn default() -> OpenOptions {
246        OpenOptions::new()
247    }
248}
249
250#[derive(Clone, Copy, PartialEq, Eq, Debug)]
251enum PipeEnd {
252    Sender,
253    Receiver,
254}
255
256/// Writing end of a Unix pipe.
257///
258/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
259///
260/// Opening a named pipe for writing involves a few steps.
261/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
262/// different things:
263///
264/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
265/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
266/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
267///   Sleep for a while and try again.
268/// * Other OS errors not specific to opening FIFO files.
269///
270/// Opening a `Sender` from a FIFO file should look like this:
271///
272/// ```no_run
273/// use std::time::Duration;
274///
275/// use compio_fs::pipe;
276/// use compio_runtime::time;
277///
278/// const FIFO_NAME: &str = "path/to/a/fifo";
279///
280/// # async fn dox() -> std::io::Result<()> {
281/// // Wait for a reader to open the file.
282/// let tx = loop {
283///     match pipe::OpenOptions::new().open_sender(FIFO_NAME).await {
284///         Ok(tx) => break tx,
285///         Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}
286///         Err(e) => return Err(e.into()),
287///     }
288///
289///     time::sleep(Duration::from_millis(50)).await;
290/// };
291/// # Ok(())
292/// # }
293/// ```
294///
295/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
296/// loop. This is done by opening a named pipe in read-write access mode with
297/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
298/// both a writing end and a reading end, and the latter allows to open a FIFO
299/// without [`ENXIO`] error since the pipe is open for reading as well.
300///
301/// `Sender` cannot be used to read from a pipe, so in practice the read access
302/// is only used when a FIFO is opened. However, using a `Sender` in read-write
303/// mode **may lead to lost data**, because written data will be dropped by the
304/// system as soon as all pipe ends are closed. To avoid lost data you have to
305/// make sure that a reading end has been opened before dropping a `Sender`.
306///
307/// Note that using read-write access mode with FIFO files is not defined by
308/// the POSIX standard and it is only guaranteed to work on Linux.
309///
310/// ```ignore
311/// use compio_fs::pipe;
312/// use compio_io::AsyncWriteExt;
313///
314/// const FIFO_NAME: &str = "path/to/a/fifo";
315///
316/// # async fn dox() {
317/// let mut tx = pipe::OpenOptions::new()
318///     .read_write(true)
319///     .open_sender(FIFO_NAME)
320///     .unwrap();
321///
322/// // Asynchronously write to the pipe before a reader.
323/// tx.write_all("hello world").await.unwrap();
324/// # }
325/// ```
326///
327/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
328#[derive(Debug, Clone)]
329pub struct Sender {
330    file: File,
331}
332
333impl Sender {
334    pub(crate) fn from_file(file: File) -> io::Result<Sender> {
335        set_nonblocking(&file)?;
336        Ok(Sender { file })
337    }
338
339    /// Close the pipe. If the returned future is dropped before polling, the
340    /// pipe won't be closed.
341    pub fn close(self) -> impl Future<Output = io::Result<()>> {
342        self.file.close()
343    }
344}
345
346impl AsyncWrite for Sender {
347    #[inline]
348    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
349        (&*self).write(buf).await
350    }
351
352    #[inline]
353    async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
354        (&*self).write_vectored(buf).await
355    }
356
357    #[inline]
358    async fn flush(&mut self) -> io::Result<()> {
359        (&*self).flush().await
360    }
361
362    #[inline]
363    async fn shutdown(&mut self) -> io::Result<()> {
364        (&*self).shutdown().await
365    }
366}
367
368impl AsyncWrite for &Sender {
369    async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
370        let fd = self.to_shared_fd();
371        let op = Send::new(fd, buffer);
372        compio_runtime::submit(op).await.into_inner()
373    }
374
375    async fn write_vectored<T: IoVectoredBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
376        let fd = self.to_shared_fd();
377        let op = SendVectored::new(fd, buffer);
378        compio_runtime::submit(op).await.into_inner()
379    }
380
381    #[inline]
382    async fn flush(&mut self) -> io::Result<()> {
383        Ok(())
384    }
385
386    #[inline]
387    async fn shutdown(&mut self) -> io::Result<()> {
388        Ok(())
389    }
390}
391
392impl_raw_fd!(Sender, std::fs::File, file, file);
393
394/// Reading end of a Unix pipe.
395///
396/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
397///
398/// # Examples
399///
400/// Receiving messages from a named pipe in a loop:
401///
402/// ```no_run
403/// use std::io;
404///
405/// use compio_buf::BufResult;
406/// use compio_fs::pipe;
407/// use compio_io::AsyncReadExt;
408///
409/// const FIFO_NAME: &str = "path/to/a/fifo";
410///
411/// # async fn dox() -> io::Result<()> {
412/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
413/// loop {
414///     let mut msg = Vec::with_capacity(256);
415///     let BufResult(res, msg) = rx.read_exact(msg).await;
416///     match res {
417///         Ok(_) => { /* handle the message */ }
418///         Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
419///             // Writing end has been closed, we should reopen the pipe.
420///             rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
421///         }
422///         Err(e) => return Err(e.into()),
423///     }
424/// }
425/// # }
426/// ```
427///
428/// On Linux, you can use a `Receiver` in read-write access mode to implement
429/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
430/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
431/// when the writing end is closed. This way, a `Receiver` can asynchronously
432/// wait for the next writer to open the pipe.
433///
434/// You should not use functions waiting for EOF such as [`read_to_end`] with
435/// a `Receiver` in read-write access mode, since it **may wait forever**.
436/// `Receiver` in this mode also holds an open writing end, which prevents
437/// receiving EOF.
438///
439/// To set the read-write access mode you can use `OpenOptions::read_write`.
440/// Note that using read-write access mode with FIFO files is not defined by
441/// the POSIX standard and it is only guaranteed to work on Linux.
442///
443/// ```ignore
444/// use compio_fs::pipe;
445/// use compio_io::AsyncReadExt;
446///
447/// const FIFO_NAME: &str = "path/to/a/fifo";
448///
449/// # async fn dox() {
450/// let mut rx = pipe::OpenOptions::new()
451///     .read_write(true)
452///     .open_receiver(FIFO_NAME)
453///     .unwrap();
454/// loop {
455///     let mut msg = Vec::with_capacity(256);
456///     rx.read_exact(msg).await.unwrap();
457///     // handle the message
458/// }
459/// # }
460/// ```
461///
462/// [`read_to_end`]: compio_io::AsyncReadExt::read_to_end
463#[derive(Debug, Clone)]
464pub struct Receiver {
465    file: File,
466}
467
468impl Receiver {
469    pub(crate) fn from_file(file: File) -> io::Result<Receiver> {
470        set_nonblocking(&file)?;
471        Ok(Receiver { file })
472    }
473
474    /// Close the pipe. If the returned future is dropped before polling, the
475    /// pipe won't be closed.
476    pub fn close(self) -> impl Future<Output = io::Result<()>> {
477        self.file.close()
478    }
479}
480
481impl AsyncRead for Receiver {
482    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
483        (&*self).read(buf).await
484    }
485
486    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
487        (&*self).read_vectored(buf).await
488    }
489}
490
491impl AsyncRead for &Receiver {
492    async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
493        let fd = self.to_shared_fd();
494        let op = Recv::new(fd, buffer);
495        compio_runtime::submit(op).await.into_inner().map_advanced()
496    }
497
498    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buffer: V) -> BufResult<usize, V> {
499        let fd = self.to_shared_fd();
500        let op = RecvVectored::new(fd, buffer);
501        compio_runtime::submit(op).await.into_inner().map_advanced()
502    }
503}
504
505impl_raw_fd!(Receiver, std::fs::File, file, file);
506
507/// Checks if file is a FIFO
508async fn is_fifo(file: &File) -> io::Result<bool> {
509    use std::os::unix::prelude::FileTypeExt;
510
511    Ok(file.metadata().await?.file_type().is_fifo())
512}
513
514/// Sets file's flags with O_NONBLOCK by fcntl.
515fn set_nonblocking(file: &impl AsRawFd) -> io::Result<()> {
516    if cfg!(not(all(target_os = "linux", feature = "io-uring"))) {
517        let fd = file.as_raw_fd();
518        let current_flags = syscall!(libc::fcntl(fd, libc::F_GETFL))?;
519        let flags = current_flags | libc::O_NONBLOCK;
520        if flags != current_flags {
521            syscall!(libc::fcntl(fd, libc::F_SETFL, flags))?;
522        }
523    }
524    Ok(())
525}