ffmpeg_sidecar/
named_pipes.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
//! Cross-platform abstraction over Windows async named pipes and Unix FIFO.
//!
//! The primary use-case is streaming multiple outputs from FFmpeg into a Rust program.
//! For more commentary and end-to-end usage, see `examples/named_pipes.rs`:
//! <https://github.com/nathanbabcock/ffmpeg-sidecar/blob/main/examples/named_pipes.rs>

use anyhow::Result;
use std::io::Read;

/// On Windows, prepend the pipe name with `\\.\pipe\`.
/// On Unix, return the name as-is.
#[macro_export]
macro_rules! pipe_name {
  ($name:expr) => {
    if cfg!(windows) {
      concat!(r#"\\.\pipe\"#, $name)
    } else {
      $name
    }
  };
}

/// Windows-only; an FFI pointer to a named pipe handle.
#[cfg(windows)]
pub struct NamedPipeHandle(*mut winapi::ctypes::c_void);

/// <https://github.com/retep998/winapi-rs/issues/396>
#[cfg(windows)]
unsafe impl Send for NamedPipeHandle {}

/// Cross-platform abstraction over Windows async named pipes and Unix FIFO.
pub struct NamedPipe {
  /// The name that the pipe was opened with. It will start with `\\.\pipe\` on Windows.
  pub name: String,

  /// Windows-only; an FFI pointer to a named pipe handle.
  #[cfg(windows)]
  pub handle: NamedPipeHandle,

  /// Unix-only; a blocking file handle to the FIFO.
  #[cfg(unix)]
  pub file: std::fs::File,
}

#[cfg(windows)]
impl NamedPipe {
  /// On Windows the pipe name must be in the format `\\.\pipe\{pipe_name}`.
  /// @see <https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createnamedpipew>
  pub fn new<S: AsRef<str>>(pipe_name: S) -> Result<Self> {
    use std::ffi::OsStr;
    use std::os::windows::ffi::OsStrExt;
    use std::ptr::null_mut;
    use winapi::um::namedpipeapi::CreateNamedPipeW;
    use winapi::um::winbase::{PIPE_ACCESS_DUPLEX, PIPE_TYPE_BYTE, PIPE_WAIT};

    let path_wide: Vec<u16> = OsStr::new(pipe_name.as_ref())
      .encode_wide()
      .chain(Some(0))
      .collect();

    let handle = unsafe {
      CreateNamedPipeW(
        path_wide.as_ptr(),
        PIPE_ACCESS_DUPLEX,
        PIPE_TYPE_BYTE | PIPE_WAIT,
        1,
        1024 * 1024 * 64,
        1024 * 1024 * 64,
        0, // "A value of zero will result in a default time-out of 50 milliseconds."
        null_mut(),
      )
    };

    if handle == winapi::um::handleapi::INVALID_HANDLE_VALUE {
      anyhow::bail!("Failed to create named pipe");
    }

    Ok(Self {
      handle: NamedPipeHandle(handle),
      name: pipe_name.as_ref().to_string(),
    })
  }
}

#[cfg(windows)]
impl Drop for NamedPipe {
  fn drop(&mut self) {
    unsafe {
      winapi::um::handleapi::CloseHandle(self.handle.0);
    }
  }
}

#[cfg(windows)]
impl Read for NamedPipe {
  fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
    use std::io::Error;
    use std::ptr::null_mut;
    use winapi::{
      shared::minwindef::{DWORD, LPVOID},
      um::fileapi::ReadFile,
    };

    let mut bytes_read: DWORD = 0;
    unsafe {
      let read_status = ReadFile(
        self.handle.0,
        buf.as_mut_ptr() as LPVOID,
        buf.len() as DWORD,
        &mut bytes_read,
        null_mut(),
      );
      if read_status == 0 {
        let error = Error::last_os_error();
        if error.raw_os_error() == Some(109) {
          // pipe has been closed since last read
          return Ok(0);
        } else {
          return std::io::Result::Err(error);
        }
      }
    };

    Ok(bytes_read as usize)
  }
}

// The unix implementation is comparatively quite simple...

#[cfg(unix)]
impl NamedPipe {
  pub fn new<S: AsRef<str>>(pipe_name: S) -> Result<Self> {
    use nix::{fcntl::OFlag, sys::stat, unistd};
    use std::os::fd::AsRawFd;
    use std::os::unix::fs::OpenOptionsExt;
    unistd::mkfifo(pipe_name.as_ref(), stat::Mode::S_IRWXU)?;

    // Open in non-blocking mode so the function completes
    let file = std::fs::OpenOptions::new()
      .read(true)
      .custom_flags(OFlag::O_NONBLOCK.bits())
      .open(pipe_name.as_ref())?;

    // Switch to blocking mode so it doesn't read too early
    let fd = AsRawFd::as_raw_fd(&file);
    nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_SETFL(OFlag::empty()))?;

    Ok(Self {
      file,
      name: pipe_name.as_ref().to_string(),
    })
  }
}

#[cfg(unix)]
impl Read for NamedPipe {
  fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
    self.file.read(buf)
  }
}

#[cfg(unix)]
impl Drop for NamedPipe {
  fn drop(&mut self) {
    use nix::unistd;
    use std::path::Path;
    unistd::unlink(Path::new(&self.name)).ok();
  }
}