1use std::{
4 io::{BufReader, ErrorKind, Read},
5 process::{ChildStderr, ChildStdout},
6 sync::mpsc::{sync_channel, Receiver, SyncSender},
7 thread::JoinHandle,
8};
9
10use anyhow::Context;
11
12use crate::{
13 child::FfmpegChild,
14 event::{FfmpegEvent, FfmpegOutput, FfmpegProgress, LogLevel, OutputVideoFrame, Stream},
15 log_parser::FfmpegLogParser,
16 metadata::FfmpegMetadata,
17 pix_fmt::get_bytes_per_frame,
18};
19
20pub struct FfmpegIterator {
22 rx: Receiver<FfmpegEvent>,
23 tx: Option<SyncSender<FfmpegEvent>>,
24 stdout: Option<ChildStdout>,
25 metadata: FfmpegMetadata,
26}
27
28impl FfmpegIterator {
29 pub fn new(child: &mut FfmpegChild) -> anyhow::Result<Self> {
30 let stderr = child.take_stderr().context("No stderr channel\n - Did you call `take_stderr` elsewhere?\n - Did you forget to call `.stderr(Stdio::piped)` on the `ChildProcess`?")?;
31 let (tx, rx) = sync_channel::<FfmpegEvent>(0);
32 spawn_stderr_thread(stderr, tx.clone());
33 let stdout = child.take_stdout();
34
35 Ok(Self {
36 rx,
37 tx: Some(tx),
38 stdout,
39 metadata: FfmpegMetadata::new(),
40 })
41 }
42
43 fn start_stdout(&mut self) -> anyhow::Result<()> {
47 if self.metadata.output_streams.is_empty() || self.metadata.outputs.is_empty() {
49 let err = "No output streams found";
50 self.tx.take(); anyhow::bail!(err)
52 }
53
54 if let Some(stdout) = self.stdout.take() {
56 spawn_stdout_thread(
57 stdout,
58 self.tx.take().context("missing channel tx")?,
59 self.metadata.output_streams.clone(),
60 self.metadata.outputs.clone(),
61 );
62 }
63
64 Ok(())
65 }
66
67 pub fn collect_metadata(&mut self) -> anyhow::Result<FfmpegMetadata> {
69 let mut event_queue: Vec<FfmpegEvent> = Vec::new();
70
71 while !self.metadata.is_completed() {
72 let event = self.next();
73 match event {
74 Some(e) => event_queue.push(e),
75 None => {
76 let errors = event_queue
77 .iter()
78 .filter_map(|e| match e {
79 FfmpegEvent::Error(e) | FfmpegEvent::Log(LogLevel::Error, e) => Some(e.to_string()),
80 _ => None,
81 })
82 .collect::<Vec<String>>()
83 .join("");
84
85 anyhow::bail!(
86 "Iterator ran out before metadata was gathered. The following errors occurred: {errors}",
87 )
88 }
89 }
90 }
91
92 Ok(self.metadata.clone())
93 }
94
95 pub fn filter_errors(self) -> impl Iterator<Item = String> {
99 self.filter_map(|event| match event {
100 FfmpegEvent::Error(e) | FfmpegEvent::Log(LogLevel::Error, e) => Some(e),
101 _ => None,
102 })
103 }
104
105 pub fn filter_progress(self) -> impl Iterator<Item = FfmpegProgress> {
107 self.filter_map(|event| match event {
108 FfmpegEvent::Progress(p) => Some(p),
109 _ => None,
110 })
111 }
112
113 pub fn filter_frames(self) -> impl Iterator<Item = OutputVideoFrame> {
115 self.filter_map(|event| match event {
116 FfmpegEvent::OutputFrame(o) => Some(o),
117 _ => None,
118 })
119 }
120
121 pub fn filter_chunks(self) -> impl Iterator<Item = Vec<u8>> {
123 self.filter_map(|event| match event {
124 FfmpegEvent::OutputChunk(vec) => Some(vec),
125 _ => None,
126 })
127 }
128
129 pub fn into_ffmpeg_stderr(self) -> impl Iterator<Item = String> {
132 self.filter_map(|event| match event {
133 FfmpegEvent::ParsedVersion(x) => Some(x.raw_log_message),
134 FfmpegEvent::ParsedConfiguration(x) => Some(x.raw_log_message),
135 FfmpegEvent::ParsedStreamMapping(x) => Some(x),
136 FfmpegEvent::ParsedOutput(x) => Some(x.raw_log_message),
137 FfmpegEvent::ParsedInputStream(x) => Some(x.raw_log_message),
138 FfmpegEvent::ParsedOutputStream(x) => Some(x.raw_log_message),
139 FfmpegEvent::Log(_, x) => Some(x),
140 FfmpegEvent::LogEOF => None,
141 FfmpegEvent::Error(_) => None,
142 FfmpegEvent::Progress(x) => Some(x.raw_log_message),
143 FfmpegEvent::OutputFrame(_) => None,
144 FfmpegEvent::OutputChunk(_) => None,
145 FfmpegEvent::Done => None,
146 FfmpegEvent::ParsedInput(input) => Some(input.raw_log_message),
147 FfmpegEvent::ParsedDuration(duration) => Some(duration.raw_log_message),
148 })
149 }
150}
151
152impl Iterator for FfmpegIterator {
153 type Item = FfmpegEvent;
154
155 fn next(&mut self) -> Option<Self::Item> {
156 let item = self.rx.recv().ok();
157
158 if let Some(FfmpegEvent::LogEOF) = item {
159 self.tx.take(); }
161
162 if !self.metadata.is_completed() {
163 match self.metadata.handle_event(&item) {
164 Err(e) => return Some(FfmpegEvent::Error(e.to_string())),
165 Ok(()) if self.metadata.is_completed() => {
168 if let Err(e) = self.start_stdout() {
169 return Some(FfmpegEvent::Error(e.to_string()));
170 }
172 }
173
174 _ => {}
175 }
176 }
177
178 item
179 }
180}
181
182pub fn spawn_stdout_thread(
184 stdout: ChildStdout,
185 tx: SyncSender<FfmpegEvent>,
186 output_streams: Vec<Stream>,
187 outputs: Vec<FfmpegOutput>,
188) -> JoinHandle<()> {
189 std::thread::spawn(move || {
190 let stdout_streams = output_streams.iter().filter(|stream| {
192 outputs
193 .get(stream.parent_index as usize)
194 .map(|o| o.is_stdout())
195 .unwrap_or(false)
196 });
197
198 if stdout_streams.clone().count() == 0 {
200 tx.send(FfmpegEvent::Error("No streams found".to_owned()))
201 .ok();
202 return;
203 }
204
205 let mut chunked_mode = false;
207
208 let stdout_video_streams = stdout_streams.clone().filter(|stream| stream.is_video());
210 if stdout_video_streams.clone().count() == 0 {
211 chunked_mode = true;
212 }
213
214 let frame_buffer_sizes: Vec<usize> = stdout_video_streams
217 .clone()
218 .map(|video_stream| {
219 if video_stream.format != "rawvideo" {
222 chunked_mode = true;
223 return 0;
224 }
225
226 let Some(video_data) = video_stream.video_data() else {
228 chunked_mode = true;
229 return 0;
230 };
231
232 let Some(bytes_per_frame) = get_bytes_per_frame(video_data) else {
236 chunked_mode = true;
237 return 0;
238 };
239
240 bytes_per_frame as usize
241 })
242 .collect();
243
244 let output_framerates: Vec<f32> = stdout_video_streams
249 .clone()
250 .filter(|s| s.format == "rawvideo")
251 .map(|video_stream| {
252 if let Some(video_data) = video_stream.video_data() {
253 video_data.fps
254 } else {
255 -1.0
256 }
257 })
258 .collect();
259 let any_mismatched_framerates = output_framerates
260 .iter()
261 .any(|&fps| fps != output_framerates[0] || fps == -1.0);
262 if any_mismatched_framerates {
263 tx.send(FfmpegEvent::Error(
266 "Multiple output streams with different framerates are not supported when outputting to stdout. Falling back to chunked mode.".to_owned()
267 )).ok();
268 chunked_mode = true;
269 }
270
271 let mut reader = BufReader::new(stdout);
272 if chunked_mode {
273 let mut chunk_buffer = vec![0u8; 65_536];
276 loop {
277 match reader.read(chunk_buffer.as_mut_slice()) {
278 Ok(0) => break,
279 Ok(bytes_read) => {
280 let mut data = vec![0; bytes_read];
281 data.clone_from_slice(&chunk_buffer[..bytes_read]);
282 tx.send(FfmpegEvent::OutputChunk(data)).ok()
283 }
284 Err(e) => match e.kind() {
285 ErrorKind::UnexpectedEof => break,
286 e => tx.send(FfmpegEvent::Error(e.to_string())).ok(),
287 },
288 };
289 }
290 } else {
291 let mut frame_buffers = frame_buffer_sizes
293 .iter()
294 .map(|&size| vec![0u8; size])
295 .collect::<Vec<Vec<u8>>>();
296
297 if frame_buffers.is_empty() {
300 tx.send(FfmpegEvent::Error("No frame buffers found".to_owned()))
301 .ok();
302 return;
303 }
304
305 let num_frame_buffers = frame_buffers.len();
307 let mut frame_buffer_index = (0..frame_buffers.len()).cycle();
308 let mut frame_num = 0;
309 loop {
310 let i = frame_buffer_index.next().unwrap();
311 let video_stream = &output_streams[i];
312 let video_data = video_stream.video_data().unwrap();
313 let buffer = &mut frame_buffers[i];
314 let output_frame_num = frame_num / num_frame_buffers;
315 let timestamp = output_frame_num as f32 / video_data.fps;
316 frame_num += 1;
317
318 match reader.read_exact(buffer.as_mut_slice()) {
319 Ok(_) => tx
320 .send(FfmpegEvent::OutputFrame(OutputVideoFrame {
321 width: video_data.width,
322 height: video_data.height,
323 pix_fmt: video_data.pix_fmt.clone(),
324 output_index: i as u32,
325 data: buffer.clone(),
326 frame_num: output_frame_num as u32,
327 timestamp,
328 }))
329 .ok(),
330 Err(e) => match e.kind() {
331 ErrorKind::UnexpectedEof => break,
332 e => tx.send(FfmpegEvent::Error(e.to_string())).ok(),
333 },
334 };
335 }
336 }
337
338 tx.send(FfmpegEvent::Done).ok();
339 })
340}
341
342pub fn spawn_stderr_thread(stderr: ChildStderr, tx: SyncSender<FfmpegEvent>) -> JoinHandle<()> {
346 std::thread::spawn(move || {
347 let reader = BufReader::new(stderr);
348 let mut parser = FfmpegLogParser::new(reader);
349 loop {
350 match parser.parse_next_event() {
351 Ok(FfmpegEvent::LogEOF) => {
352 tx.send(FfmpegEvent::LogEOF).ok();
353 break;
354 }
355 Ok(event) => tx.send(event).ok(),
356 Err(e) => {
357 eprintln!("Error parsing ffmpeg output: {}", e);
358 break;
359 }
360 };
361 }
362 })
363}