named_pipes/named_pipes.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
/// One of them main reasons to use named pipes instead of stdout is the ability
/// to support multiple outputs from a single FFmpeg command. The creation and
/// behavior of named pipes is platform-specific, and some of the
/// synchronization logic can be a bit tricky. This example provides a starting
/// point and some cross-platform abstractions over named pipes to make things
/// easier.
///
/// If you need even more granular control over the output streams, you might
/// consider using local TCP sockets instead, which can be more flexible and
/// reliable with the same performance profile, if not better. See `examples/sockets.rs`.
#[cfg(feature = "named_pipes")]
fn main() -> anyhow::Result<()> {
use anyhow::Result;
use ffmpeg_sidecar::command::FfmpegCommand;
use ffmpeg_sidecar::event::{FfmpegEvent, LogLevel};
use ffmpeg_sidecar::named_pipes::NamedPipe;
use ffmpeg_sidecar::pipe_name;
use std::io::Read;
use std::sync::mpsc;
use std::thread;
const VIDEO_PIPE_NAME: &str = pipe_name!("ffmpeg_video");
const AUDIO_PIPE_NAME: &str = pipe_name!("ffmpeg_audio");
const SUBTITLES_PIPE_NAME: &str = pipe_name!("ffmpeg_subtitles");
// Prepare an FFmpeg command with separate outputs for video, audio, and subtitles.
let mut command = FfmpegCommand::new();
command
// Global flags
.hide_banner()
.overwrite() // <- overwrite required on windows
// Generate test video
.format("lavfi")
.input("testsrc=size=1920x1080:rate=60:duration=10")
// Generate test audio
.format("lavfi")
.input("sine=frequency=1000:duration=10")
// Generate test subtitles
.format("srt")
.input(
"data:text/plain;base64,MQ0KMDA6MDA6MDAsMDAwIC0tPiAwMDowMDoxMCw1MDANCkhlbGxvIFdvcmxkIQ==",
)
// Video output
.map("0:v")
.format("rawvideo")
.pix_fmt("rgb24")
.output(VIDEO_PIPE_NAME)
// Audio output
.map("1:a")
.format("s16le")
.output(AUDIO_PIPE_NAME)
// Subtitles output
.map("2:s")
.format("srt")
.output(SUBTITLES_PIPE_NAME);
// Create a separate thread for each output pipe
let threads = [VIDEO_PIPE_NAME, AUDIO_PIPE_NAME, SUBTITLES_PIPE_NAME]
.iter()
.cloned()
.map(|pipe_name| {
// It's important to create the named pipe on the main thread before
// sending it elsewhere so that any errors are caught at the top level.
let mut pipe = NamedPipe::new(pipe_name)?;
println!("[{pipe_name}] pipe created");
let (ready_sender, ready_receiver) = mpsc::channel::<()>();
let thread = thread::spawn(move || -> Result<()> {
// Wait for FFmpeg to start writing
// Only needed for Windows, since Unix will block until a writer has connected
println!("[{pipe_name}] waiting for ready signal");
ready_receiver.recv()?;
// Read continuously until finished
// Note that if the stream of output is interrupted or paused,
// you may need additional logic to keep the read loop alive.
println!("[{pipe_name}] reading from pipe");
let mut buf = vec![0; 1920 * 1080 * 3];
let mut total_bytes_read = 0;
// In the case of subtitles, we'll decode the string contents directly
let mut text_content = if pipe_name == SUBTITLES_PIPE_NAME {
Some("".to_string())
} else {
None
};
loop {
match pipe.read(&mut buf) {
Ok(bytes_read) => {
total_bytes_read += bytes_read;
// read bytes into string
if let Some(cur_str) = &mut text_content {
let s = std::str::from_utf8(&buf[..bytes_read]).unwrap();
text_content = Some(format!("{}{}", cur_str, s));
}
if bytes_read == 0 {
break;
}
}
Err(err) => {
if err.kind() != std::io::ErrorKind::BrokenPipe {
return Err(err.into());
} else {
break;
}
}
}
}
// Log how many bytes were received over this pipe.
// You can visually compare this to the FFmpeg log output to confirm
// that all the expected bytes were captured.
let size_str = if total_bytes_read < 1024 {
format!("{}B", total_bytes_read)
} else {
format!("{}KiB", total_bytes_read / 1024)
};
if let Some(text_content) = text_content {
println!("[{pipe_name}] subtitle text content: ");
println!("{}", text_content.trim());
}
println!("[{pipe_name}] done reading ({size_str} total)");
Ok(())
});
Ok((thread, ready_sender))
})
.collect::<Result<Vec<_>>>()?;
// Start FFmpeg
let mut ready_signal_sent = false;
command
.print_command()
.spawn()?
.iter()?
.for_each(|event| match event {
// Signal threads when output is ready
FfmpegEvent::Progress(_) if !ready_signal_sent => {
threads.iter().for_each(|(_, sender)| {
sender.send(()).ok();
});
ready_signal_sent = true;
}
// Verify output size from FFmpeg logs (video/audio KiB)
FfmpegEvent::Log(LogLevel::Info, msg) if msg.starts_with("[out#") => {
println!("{msg}");
}
// Log any unexpected errors
FfmpegEvent::Log(LogLevel::Warning | LogLevel::Error | LogLevel::Fatal, msg) => {
eprintln!("{msg}");
}
_ => {}
});
for (thread, _) in threads {
thread.join().unwrap()?;
}
Ok(())
}
#[cfg(not(feature = "named_pipes"))]
fn main() {
eprintln!(r#"Enable the "named_pipes" feature to run this example."#);
println!("cargo run --features named_pipes --example named_pipes")
}