ffmpeg_sidecar/
iter.rs

1//! A stream of events from an FFmpeg process.
2
3use std::{
4  io::{BufReader, ErrorKind, Read},
5  process::{ChildStderr, ChildStdout},
6  sync::mpsc::{sync_channel, Receiver, SyncSender},
7  thread::JoinHandle,
8};
9
10use anyhow::Context;
11
12use crate::{
13  child::FfmpegChild,
14  event::{FfmpegEvent, FfmpegOutput, FfmpegProgress, LogLevel, OutputVideoFrame, Stream},
15  log_parser::FfmpegLogParser,
16  metadata::FfmpegMetadata,
17  pix_fmt::get_bytes_per_frame,
18};
19
20/// An iterator over events from an ffmpeg process, including parsed metadata, progress, and raw video frames.
21pub struct FfmpegIterator {
22  rx: Receiver<FfmpegEvent>,
23  tx: Option<SyncSender<FfmpegEvent>>,
24  stdout: Option<ChildStdout>,
25  metadata: FfmpegMetadata,
26}
27
28impl FfmpegIterator {
29  pub fn new(child: &mut FfmpegChild) -> anyhow::Result<Self> {
30    let stderr = child.take_stderr().context("No stderr channel\n - Did you call `take_stderr` elsewhere?\n - Did you forget to call `.stderr(Stdio::piped)` on the `ChildProcess`?")?;
31    let (tx, rx) = sync_channel::<FfmpegEvent>(0);
32    spawn_stderr_thread(stderr, tx.clone());
33    let stdout = child.take_stdout();
34
35    Ok(Self {
36      rx,
37      tx: Some(tx),
38      stdout,
39      metadata: FfmpegMetadata::new(),
40    })
41  }
42
43  /// Called after all metadata has been obtained to spawn the thread that will
44  /// handle output. The metadata is needed to determine the output format and
45  /// other parameters.
46  fn start_stdout(&mut self) -> anyhow::Result<()> {
47    // No output detected
48    if self.metadata.output_streams.is_empty() || self.metadata.outputs.is_empty() {
49      let err = "No output streams found";
50      self.tx.take(); // drop the tx so that the channel closes
51      anyhow::bail!(err)
52    }
53
54    // Handle stdout
55    if let Some(stdout) = self.stdout.take() {
56      spawn_stdout_thread(
57        stdout,
58        self.tx.take().context("missing channel tx")?,
59        self.metadata.output_streams.clone(),
60        self.metadata.outputs.clone(),
61      );
62    }
63
64    Ok(())
65  }
66
67  /// Advance the iterator until all metadata has been collected, returning it.
68  pub fn collect_metadata(&mut self) -> anyhow::Result<FfmpegMetadata> {
69    let mut event_queue: Vec<FfmpegEvent> = Vec::new();
70
71    while !self.metadata.is_completed() {
72      let event = self.next();
73      match event {
74        Some(e) => event_queue.push(e),
75        None => {
76          let errors = event_queue
77            .iter()
78            .filter_map(|e| match e {
79              FfmpegEvent::Error(e) | FfmpegEvent::Log(LogLevel::Error, e) => Some(e.to_string()),
80              _ => None,
81            })
82            .collect::<Vec<String>>()
83            .join("");
84
85          anyhow::bail!(
86            "Iterator ran out before metadata was gathered. The following errors occurred: {errors}",
87          )
88        }
89      }
90    }
91
92    Ok(self.metadata.clone())
93  }
94
95  //// Iterator filters
96
97  /// Returns an iterator over error messages (`FfmpegEvent::Error` and `FfmpegEvent::LogError`).
98  pub fn filter_errors(self) -> impl Iterator<Item = String> {
99    self.filter_map(|event| match event {
100      FfmpegEvent::Error(e) | FfmpegEvent::Log(LogLevel::Error, e) => Some(e),
101      _ => None,
102    })
103  }
104
105  /// Filter out all events except for progress (`FfmpegEvent::Progress`).
106  pub fn filter_progress(self) -> impl Iterator<Item = FfmpegProgress> {
107    self.filter_map(|event| match event {
108      FfmpegEvent::Progress(p) => Some(p),
109      _ => None,
110    })
111  }
112
113  /// Filter out all events except for output frames (`FfmpegEvent::OutputFrame`).
114  pub fn filter_frames(self) -> impl Iterator<Item = OutputVideoFrame> {
115    self.filter_map(|event| match event {
116      FfmpegEvent::OutputFrame(o) => Some(o),
117      _ => None,
118    })
119  }
120
121  /// Filter out all events except for output chunks (`FfmpegEvent::OutputChunk`).
122  pub fn filter_chunks(self) -> impl Iterator<Item = Vec<u8>> {
123    self.filter_map(|event| match event {
124      FfmpegEvent::OutputChunk(vec) => Some(vec),
125      _ => None,
126    })
127  }
128
129  /// Iterator over every message from ffmpeg's stderr as a raw string.
130  /// Conceptually equivalent to `BufReader::new(ffmpeg_stderr).lines()`.
131  pub fn into_ffmpeg_stderr(self) -> impl Iterator<Item = String> {
132    self.filter_map(|event| match event {
133      FfmpegEvent::ParsedVersion(x) => Some(x.raw_log_message),
134      FfmpegEvent::ParsedConfiguration(x) => Some(x.raw_log_message),
135      FfmpegEvent::ParsedStreamMapping(x) => Some(x),
136      FfmpegEvent::ParsedOutput(x) => Some(x.raw_log_message),
137      FfmpegEvent::ParsedInputStream(x) => Some(x.raw_log_message),
138      FfmpegEvent::ParsedOutputStream(x) => Some(x.raw_log_message),
139      FfmpegEvent::Log(_, x) => Some(x),
140      FfmpegEvent::LogEOF => None,
141      FfmpegEvent::Error(_) => None,
142      FfmpegEvent::Progress(x) => Some(x.raw_log_message),
143      FfmpegEvent::OutputFrame(_) => None,
144      FfmpegEvent::OutputChunk(_) => None,
145      FfmpegEvent::Done => None,
146      FfmpegEvent::ParsedInput(input) => Some(input.raw_log_message),
147      FfmpegEvent::ParsedDuration(duration) => Some(duration.raw_log_message),
148    })
149  }
150}
151
152impl Iterator for FfmpegIterator {
153  type Item = FfmpegEvent;
154
155  fn next(&mut self) -> Option<Self::Item> {
156    let item = self.rx.recv().ok();
157
158    if let Some(FfmpegEvent::LogEOF) = item {
159      self.tx.take(); // drop the tx so that the receiver can close
160    }
161
162    if !self.metadata.is_completed() {
163      match self.metadata.handle_event(&item) {
164        Err(e) => return Some(FfmpegEvent::Error(e.to_string())),
165        // TODO in this case, the preceding `item` is lost;
166        // Probably better to queue it as the next item.
167        Ok(()) if self.metadata.is_completed() => {
168          if let Err(e) = self.start_stdout() {
169            return Some(FfmpegEvent::Error(e.to_string()));
170            // Same problem as above
171          }
172        }
173
174        _ => {}
175      }
176    }
177
178    item
179  }
180}
181
182/// Spawn a thread to read raw output frames from ffmpeg's stdout.
183pub fn spawn_stdout_thread(
184  stdout: ChildStdout,
185  tx: SyncSender<FfmpegEvent>,
186  output_streams: Vec<Stream>,
187  outputs: Vec<FfmpegOutput>,
188) -> JoinHandle<()> {
189  std::thread::spawn(move || {
190    // Filter streams which are sent to stdout
191    let stdout_streams = output_streams.iter().filter(|stream| {
192      outputs
193        .get(stream.parent_index as usize)
194        .map(|o| o.is_stdout())
195        .unwrap_or(false)
196    });
197
198    // Exit early if nothing is being sent to stdout
199    if stdout_streams.clone().count() == 0 {
200      tx.send(FfmpegEvent::Error("No streams found".to_owned()))
201        .ok();
202      return;
203    }
204
205    // If the size of a frame can't be determined, it will be read in arbitrary chunks.
206    let mut chunked_mode = false;
207
208    // Immediately default to chunked mode for non-video streams
209    let stdout_video_streams = stdout_streams.clone().filter(|stream| stream.is_video());
210    if stdout_video_streams.clone().count() == 0 {
211      chunked_mode = true;
212    }
213
214    // Calculate frame buffer sizes up front.
215    // Any sizes that cannot be calculated will trigger chunked mode.
216    let frame_buffer_sizes: Vec<usize> = stdout_video_streams
217      .clone()
218      .map(|video_stream| {
219        // Any non-rawvideo streams instantly enable chunked mode, since it's
220        // impossible to tell when one chunk ends and another begins.
221        if video_stream.format != "rawvideo" {
222          chunked_mode = true;
223          return 0;
224        }
225
226        // This is an unexpected error since we've already filtered for video streams.
227        let Some(video_data) = video_stream.video_data() else {
228          chunked_mode = true;
229          return 0;
230        };
231
232        // This may trigger either on an unsupported pixel format, or
233        // framebuffers with non-byte-aligned sizes. FFmpeg will pad these with
234        // zeroes, but we can't predict the exact padding or end size on every format.
235        let Some(bytes_per_frame) = get_bytes_per_frame(video_data) else {
236          chunked_mode = true;
237          return 0;
238        };
239
240        bytes_per_frame as usize
241      })
242      .collect();
243
244    // Final check: FFmpeg supports multiple outputs interleaved on stdout,
245    // but we can only keep track of them if the framerates match. It's
246    // theoretically still possible to determine the expected frame order,
247    // but it's not currently supported.
248    let output_framerates: Vec<f32> = stdout_video_streams
249      .clone()
250      .filter(|s| s.format == "rawvideo")
251      .map(|video_stream| {
252        if let Some(video_data) = video_stream.video_data() {
253          video_data.fps
254        } else {
255          -1.0
256        }
257      })
258      .collect();
259    let any_mismatched_framerates = output_framerates
260      .iter()
261      .any(|&fps| fps != output_framerates[0] || fps == -1.0);
262    if any_mismatched_framerates {
263      // This edge case is probably not what the user was intending,
264      // so we'll notify with an error.
265      tx.send(FfmpegEvent::Error(
266        "Multiple output streams with different framerates are not supported when outputting to stdout. Falling back to chunked mode.".to_owned()
267      )).ok();
268      chunked_mode = true;
269    }
270
271    let mut reader = BufReader::new(stdout);
272    if chunked_mode {
273      // Arbitrary default buffer size for receiving indeterminate chunks
274      // of any encoder or container output, when frame boundaries are unknown
275      let mut chunk_buffer = vec![0u8; 65_536];
276      loop {
277        match reader.read(chunk_buffer.as_mut_slice()) {
278          Ok(0) => break,
279          Ok(bytes_read) => {
280            let mut data = vec![0; bytes_read];
281            data.clone_from_slice(&chunk_buffer[..bytes_read]);
282            tx.send(FfmpegEvent::OutputChunk(data)).ok()
283          }
284          Err(e) => match e.kind() {
285            ErrorKind::UnexpectedEof => break,
286            e => tx.send(FfmpegEvent::Error(e.to_string())).ok(),
287          },
288        };
289      }
290    } else {
291      // Prepare frame buffers
292      let mut frame_buffers = frame_buffer_sizes
293        .iter()
294        .map(|&size| vec![0u8; size])
295        .collect::<Vec<Vec<u8>>>();
296
297      // Empty buffer array is unexpected at this point, since we've already ruled out
298      // both chunked mode and non-stdout streams.
299      if frame_buffers.is_empty() {
300        tx.send(FfmpegEvent::Error("No frame buffers found".to_owned()))
301          .ok();
302        return;
303      }
304
305      // Read into buffers
306      let num_frame_buffers = frame_buffers.len();
307      let mut frame_buffer_index = (0..frame_buffers.len()).cycle();
308      let mut frame_num = 0;
309      loop {
310        let i = frame_buffer_index.next().unwrap();
311        let video_stream = &output_streams[i];
312        let video_data = video_stream.video_data().unwrap();
313        let buffer = &mut frame_buffers[i];
314        let output_frame_num = frame_num / num_frame_buffers;
315        let timestamp = output_frame_num as f32 / video_data.fps;
316        frame_num += 1;
317
318        match reader.read_exact(buffer.as_mut_slice()) {
319          Ok(_) => tx
320            .send(FfmpegEvent::OutputFrame(OutputVideoFrame {
321              width: video_data.width,
322              height: video_data.height,
323              pix_fmt: video_data.pix_fmt.clone(),
324              output_index: i as u32,
325              data: buffer.clone(),
326              frame_num: output_frame_num as u32,
327              timestamp,
328            }))
329            .ok(),
330          Err(e) => match e.kind() {
331            ErrorKind::UnexpectedEof => break,
332            e => tx.send(FfmpegEvent::Error(e.to_string())).ok(),
333          },
334        };
335      }
336    }
337
338    tx.send(FfmpegEvent::Done).ok();
339  })
340}
341
342/// Spawn a thread which reads and parses lines from ffmpeg's stderr channel.
343/// The cadence is controlled by the synchronous `tx` channel, which blocks
344/// until a receiver is ready to receive the next event.
345pub fn spawn_stderr_thread(stderr: ChildStderr, tx: SyncSender<FfmpegEvent>) -> JoinHandle<()> {
346  std::thread::spawn(move || {
347    let reader = BufReader::new(stderr);
348    let mut parser = FfmpegLogParser::new(reader);
349    loop {
350      match parser.parse_next_event() {
351        Ok(FfmpegEvent::LogEOF) => {
352          tx.send(FfmpegEvent::LogEOF).ok();
353          break;
354        }
355        Ok(event) => tx.send(event).ok(),
356        Err(e) => {
357          eprintln!("Error parsing ffmpeg output: {}", e);
358          break;
359        }
360      };
361    }
362  })
363}