named_pipes/
named_pipes.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/// One of them main reasons to use named pipes instead of stdout is the ability
/// to support multiple outputs from a single FFmpeg command. The creation and
/// behavior of named pipes is platform-specific, and some of the
/// synchronization logic can be a bit tricky. This example provides a starting
/// point and some cross-platform abstractions over named pipes to make things
/// easier.
///
/// If you need even more granular control over the output streams, you might
/// consider using local TCP sockets instead, which can be more flexible and
/// reliable with the same performance profile, if not better. See `examples/sockets.rs`.
#[cfg(feature = "named_pipes")]
fn main() -> anyhow::Result<()> {
  use anyhow::Result;
  use ffmpeg_sidecar::command::FfmpegCommand;
  use ffmpeg_sidecar::event::{FfmpegEvent, LogLevel};
  use ffmpeg_sidecar::named_pipes::NamedPipe;
  use ffmpeg_sidecar::pipe_name;
  use std::io::Read;
  use std::sync::mpsc;
  use std::thread;

  const VIDEO_PIPE_NAME: &str = pipe_name!("ffmpeg_video");
  const AUDIO_PIPE_NAME: &str = pipe_name!("ffmpeg_audio");
  const SUBTITLES_PIPE_NAME: &str = pipe_name!("ffmpeg_subtitles");

  // Prepare an FFmpeg command with separate outputs for video, audio, and subtitles.
  let mut command = FfmpegCommand::new();
  command
    // Global flags
    .hide_banner()
    .overwrite() // <- overwrite required on windows
    // Generate test video
    .format("lavfi")
    .input("testsrc=size=1920x1080:rate=60:duration=10")
    // Generate test audio
    .format("lavfi")
    .input("sine=frequency=1000:duration=10")
    // Generate test subtitles
    .format("srt")
    .input(
      "data:text/plain;base64,MQ0KMDA6MDA6MDAsMDAwIC0tPiAwMDowMDoxMCw1MDANCkhlbGxvIFdvcmxkIQ==",
    )
    // Video output
    .map("0:v")
    .format("rawvideo")
    .pix_fmt("rgb24")
    .output(VIDEO_PIPE_NAME)
    // Audio output
    .map("1:a")
    .format("s16le")
    .output(AUDIO_PIPE_NAME)
    // Subtitles output
    .map("2:s")
    .format("srt")
    .output(SUBTITLES_PIPE_NAME);

  // Create a separate thread for each output pipe
  let threads = [VIDEO_PIPE_NAME, AUDIO_PIPE_NAME, SUBTITLES_PIPE_NAME]
    .iter()
    .cloned()
    .map(|pipe_name| {
      // It's important to create the named pipe on the main thread before
      // sending it elsewhere so that any errors are caught at the top level.
      let mut pipe = NamedPipe::new(pipe_name)?;
      println!("[{pipe_name}] pipe created");
      let (ready_sender, ready_receiver) = mpsc::channel::<()>();
      let thread = thread::spawn(move || -> Result<()> {
        // Wait for FFmpeg to start writing
        // Only needed for Windows, since Unix will block until a writer has connected
        println!("[{pipe_name}] waiting for ready signal");
        ready_receiver.recv()?;

        // Read continuously until finished
        // Note that if the stream of output is interrupted or paused,
        // you may need additional logic to keep the read loop alive.
        println!("[{pipe_name}] reading from pipe");
        let mut buf = vec![0; 1920 * 1080 * 3];
        let mut total_bytes_read = 0;

        // In the case of subtitles, we'll decode the string contents directly
        let mut text_content = if pipe_name == SUBTITLES_PIPE_NAME {
          Some("".to_string())
        } else {
          None
        };

        loop {
          match pipe.read(&mut buf) {
            Ok(bytes_read) => {
              total_bytes_read += bytes_read;

              // read bytes into string
              if let Some(cur_str) = &mut text_content {
                let s = std::str::from_utf8(&buf[..bytes_read]).unwrap();
                text_content = Some(format!("{}{}", cur_str, s));
              }

              if bytes_read == 0 {
                break;
              }
            }
            Err(err) => {
              if err.kind() != std::io::ErrorKind::BrokenPipe {
                return Err(err.into());
              } else {
                break;
              }
            }
          }
        }

        // Log how many bytes were received over this pipe.
        // You can visually compare this to the FFmpeg log output to confirm
        // that all the expected bytes were captured.
        let size_str = if total_bytes_read < 1024 {
          format!("{}B", total_bytes_read)
        } else {
          format!("{}KiB", total_bytes_read / 1024)
        };

        if let Some(text_content) = text_content {
          println!("[{pipe_name}] subtitle text content: ");
          println!("{}", text_content.trim());
        }

        println!("[{pipe_name}] done reading ({size_str} total)");
        Ok(())
      });

      Ok((thread, ready_sender))
    })
    .collect::<Result<Vec<_>>>()?;

  // Start FFmpeg
  let mut ready_signal_sent = false;
  command
    .print_command()
    .spawn()?
    .iter()?
    .for_each(|event| match event {
      // Signal threads when output is ready
      FfmpegEvent::Progress(_) if !ready_signal_sent => {
        threads.iter().for_each(|(_, sender)| {
          sender.send(()).ok();
        });
        ready_signal_sent = true;
      }

      // Verify output size from FFmpeg logs (video/audio KiB)
      FfmpegEvent::Log(LogLevel::Info, msg) if msg.starts_with("[out#") => {
        println!("{msg}");
      }

      // Log any unexpected errors
      FfmpegEvent::Log(LogLevel::Warning | LogLevel::Error | LogLevel::Fatal, msg) => {
        eprintln!("{msg}");
      }

      _ => {}
    });

  for (thread, _) in threads {
    thread.join().unwrap()?;
  }

  Ok(())
}

#[cfg(not(feature = "named_pipes"))]
fn main() {
  eprintln!(r#"Enable the "named_pipes" feature to run this example."#);
  println!("cargo run --features named_pipes --example named_pipes")
}