1use anyhow::{bail, Result};
2use ffmpeg_sidecar::command::FfmpegCommand;
3use ffmpeg_sidecar::event::{FfmpegEvent, LogLevel};
4use std::io::Read;
5use std::net::{TcpListener, TcpStream};
6use std::sync::mpsc::{channel, Receiver};
7use std::thread;
8use std::time::Duration;
9
10fn main() -> Result<()> {
11 const TCP_PORT: u32 = 3000;
13 let (exit_sender, exit_receiver) = channel::<()>();
14 let listener_thread = thread::spawn(|| listen_for_connections(TCP_PORT, exit_receiver));
15
16 thread::sleep(Duration::from_millis(1000));
18
19 FfmpegCommand::new()
21 .hide_banner()
23 .overwrite() .format("lavfi")
26 .input("testsrc=size=1920x1080:rate=60:duration=10")
27 .format("lavfi")
29 .input("sine=frequency=1000:duration=10")
30 .format("srt")
32 .input(
33 "data:text/plain;base64,MQ0KMDA6MDA6MDAsMDAwIC0tPiAwMDowMDoxMCw1MDANCkhlbGxvIFdvcmxkIQ==",
34 )
35 .map("0:v")
37 .format("rawvideo")
38 .pix_fmt("rgb24")
39 .output(format!("tcp://127.0.0.1:{TCP_PORT}"))
40 .map("1:a")
42 .format("s16le")
43 .output(format!("tcp://127.0.0.1:{TCP_PORT}"))
44 .map("2:s")
46 .format("srt")
47 .output(format!("tcp://127.0.0.1:{TCP_PORT}"))
48 .print_command()
49 .spawn()?
50 .iter()?
51 .for_each(|event| match event {
52 FfmpegEvent::Log(LogLevel::Info, msg) if msg.starts_with("[out#") => {
54 println!("{msg}");
55 }
56
57 FfmpegEvent::Log(LogLevel::Warning | LogLevel::Error | LogLevel::Fatal, msg) => {
59 eprintln!("{msg}");
60 }
61
62 e => {
64 println!("{:?}", e);
65 }
66 });
67 exit_sender.send(())?;
68 listener_thread.join().unwrap()?;
69 Ok(())
70}
71
72fn listen_for_connections(tcp_port: u32, exit_receiver: Receiver<()>) -> Result<()> {
73 let listener = TcpListener::bind(format!("127.0.0.1:{tcp_port}"))?;
74 listener.set_nonblocking(true)?;
75 println!("Server listening on port {tcp_port}");
76
77 let mut handler_threads = Vec::new();
78 loop {
79 if exit_receiver.try_recv().is_ok() {
80 break;
81 }
82 match listener.accept() {
83 Ok((stream, _)) => {
84 handler_threads.push(thread::spawn(move || handle_connection(stream)));
85 }
86 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
87 thread::sleep(Duration::from_millis(10));
88 }
89 Err(e) => {
90 bail!(e);
91 }
92 }
93 }
94
95 for handler in handler_threads {
96 handler.join().unwrap()?;
97 }
98
99 println!("Listener thread exiting");
100 Ok(())
101}
102
103fn handle_connection(mut stream: TcpStream) -> Result<()> {
104 let mut buffer = [0; 1024];
105 let mut total_bytes_read = 0;
106 loop {
107 match stream.read(&mut buffer) {
108 Ok(bytes_read) if bytes_read > 0 => {
109 total_bytes_read += bytes_read;
110 }
111 Ok(0) => {
112 break;
113 }
114 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
115 thread::sleep(Duration::from_millis(10));
116 }
117 Err(e) => {
118 bail!(e);
119 }
120 _ => {}
121 }
122 }
123 let bytes_str = if total_bytes_read < 1024 {
124 format!("{total_bytes_read}B")
125 } else {
126 format!("{}KiB", total_bytes_read / 1024)
127 };
128 println!("Read {bytes_str} from client");
129 Ok(())
130}