sockets/
sockets.rs

1use anyhow::{bail, Result};
2use ffmpeg_sidecar::command::FfmpegCommand;
3use ffmpeg_sidecar::event::{FfmpegEvent, LogLevel};
4use std::io::Read;
5use std::net::{TcpListener, TcpStream};
6use std::sync::mpsc::{channel, Receiver};
7use std::thread;
8use std::time::Duration;
9
10fn main() -> Result<()> {
11  // Set up a TCP listener
12  const TCP_PORT: u32 = 3000;
13  let (exit_sender, exit_receiver) = channel::<()>();
14  let listener_thread = thread::spawn(|| listen_for_connections(TCP_PORT, exit_receiver));
15
16  // Wait for the listener to start
17  thread::sleep(Duration::from_millis(1000));
18
19  // Prepare an FFmpeg command with separate outputs for video, audio, and subtitles.
20  FfmpegCommand::new()
21    // Global flags
22    .hide_banner()
23    .overwrite() // <- overwrite required on windows
24    // Generate test video
25    .format("lavfi")
26    .input("testsrc=size=1920x1080:rate=60:duration=10")
27    // Generate test audio
28    .format("lavfi")
29    .input("sine=frequency=1000:duration=10")
30    // Generate test subtitles
31    .format("srt")
32    .input(
33      "data:text/plain;base64,MQ0KMDA6MDA6MDAsMDAwIC0tPiAwMDowMDoxMCw1MDANCkhlbGxvIFdvcmxkIQ==",
34    )
35    // Video output
36    .map("0:v")
37    .format("rawvideo")
38    .pix_fmt("rgb24")
39    .output(format!("tcp://127.0.0.1:{TCP_PORT}"))
40    // Audio output
41    .map("1:a")
42    .format("s16le")
43    .output(format!("tcp://127.0.0.1:{TCP_PORT}"))
44    // Subtitles output
45    .map("2:s")
46    .format("srt")
47    .output(format!("tcp://127.0.0.1:{TCP_PORT}"))
48    .print_command()
49    .spawn()?
50    .iter()?
51    .for_each(|event| match event {
52      // Verify output size from FFmpeg logs (video/audio KiB)
53      FfmpegEvent::Log(LogLevel::Info, msg) if msg.starts_with("[out#") => {
54        println!("{msg}");
55      }
56
57      // Log any unexpected errors
58      FfmpegEvent::Log(LogLevel::Warning | LogLevel::Error | LogLevel::Fatal, msg) => {
59        eprintln!("{msg}");
60      }
61
62      // _ => {}
63      e => {
64        println!("{:?}", e);
65      }
66    });
67  exit_sender.send(())?;
68  listener_thread.join().unwrap()?;
69  Ok(())
70}
71
72fn listen_for_connections(tcp_port: u32, exit_receiver: Receiver<()>) -> Result<()> {
73  let listener = TcpListener::bind(format!("127.0.0.1:{tcp_port}"))?;
74  listener.set_nonblocking(true)?;
75  println!("Server listening on port {tcp_port}");
76
77  let mut handler_threads = Vec::new();
78  loop {
79    if exit_receiver.try_recv().is_ok() {
80      break;
81    }
82    match listener.accept() {
83      Ok((stream, _)) => {
84        handler_threads.push(thread::spawn(move || handle_connection(stream)));
85      }
86      Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
87        thread::sleep(Duration::from_millis(10));
88      }
89      Err(e) => {
90        bail!(e);
91      }
92    }
93  }
94
95  for handler in handler_threads {
96    handler.join().unwrap()?;
97  }
98
99  println!("Listener thread exiting");
100  Ok(())
101}
102
103fn handle_connection(mut stream: TcpStream) -> Result<()> {
104  let mut buffer = [0; 1024];
105  let mut total_bytes_read = 0;
106  loop {
107    match stream.read(&mut buffer) {
108      Ok(bytes_read) if bytes_read > 0 => {
109        total_bytes_read += bytes_read;
110      }
111      Ok(0) => {
112        break;
113      }
114      Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
115        thread::sleep(Duration::from_millis(10));
116      }
117      Err(e) => {
118        bail!(e);
119      }
120      _ => {}
121    }
122  }
123  let bytes_str = if total_bytes_read < 1024 {
124    format!("{total_bytes_read}B")
125  } else {
126    format!("{}KiB", total_bytes_read / 1024)
127  };
128  println!("Read {bytes_str} from client");
129  Ok(())
130}