use anyhow::{bail, Result};
use ffmpeg_sidecar::command::FfmpegCommand;
use ffmpeg_sidecar::event::{FfmpegEvent, LogLevel};
use std::io::Read;
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc::{channel, Receiver};
use std::thread;
use std::time::Duration;
fn main() -> Result<()> {
const TCP_PORT: u32 = 3000;
let (exit_sender, exit_receiver) = channel::<()>();
let listener_thread = thread::spawn(|| listen_for_connections(TCP_PORT, exit_receiver));
thread::sleep(Duration::from_millis(1000));
FfmpegCommand::new()
.hide_banner()
.overwrite() .format("lavfi")
.input("testsrc=size=1920x1080:rate=60:duration=10")
.format("lavfi")
.input("sine=frequency=1000:duration=10")
.format("srt")
.input(
"data:text/plain;base64,MQ0KMDA6MDA6MDAsMDAwIC0tPiAwMDowMDoxMCw1MDANCkhlbGxvIFdvcmxkIQ==",
)
.map("0:v")
.format("rawvideo")
.pix_fmt("rgb24")
.output(format!("tcp://127.0.0.1:{TCP_PORT}"))
.map("1:a")
.format("s16le")
.output(format!("tcp://127.0.0.1:{TCP_PORT}"))
.map("2:s")
.format("srt")
.output(format!("tcp://127.0.0.1:{TCP_PORT}"))
.print_command()
.spawn()?
.iter()?
.for_each(|event| match event {
FfmpegEvent::Log(LogLevel::Info, msg) if msg.starts_with("[out#") => {
println!("{msg}");
}
FfmpegEvent::Log(LogLevel::Warning | LogLevel::Error | LogLevel::Fatal, msg) => {
eprintln!("{msg}");
}
e => {
println!("{:?}", e);
}
});
exit_sender.send(())?;
listener_thread.join().unwrap()?;
Ok(())
}
fn listen_for_connections(tcp_port: u32, exit_receiver: Receiver<()>) -> Result<()> {
let listener = TcpListener::bind(format!("127.0.0.1:{tcp_port}"))?;
listener.set_nonblocking(true)?;
println!("Server listening on port {tcp_port}");
let mut handler_threads = Vec::new();
loop {
if exit_receiver.try_recv().is_ok() {
break;
}
match listener.accept() {
Ok((stream, _)) => {
handler_threads.push(thread::spawn(move || handle_connection(stream)));
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(e) => {
bail!(e);
}
}
}
for handler in handler_threads {
handler.join().unwrap()?;
}
println!("Listener thread exiting");
Ok(())
}
fn handle_connection(mut stream: TcpStream) -> Result<()> {
let mut buffer = [0; 1024];
let mut total_bytes_read = 0;
loop {
match stream.read(&mut buffer) {
Ok(bytes_read) if bytes_read > 0 => {
total_bytes_read += bytes_read;
}
Ok(0) => {
break;
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(e) => {
bail!(e);
}
_ => {}
}
}
let bytes_str = if total_bytes_read < 1024 {
format!("{total_bytes_read}B")
} else {
format!("{}KiB", total_bytes_read / 1024)
};
println!("Read {bytes_str} from client");
Ok(())
}