sockets/
sockets.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use anyhow::{bail, Result};
use ffmpeg_sidecar::command::FfmpegCommand;
use ffmpeg_sidecar::event::{FfmpegEvent, LogLevel};
use std::io::Read;
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc::{channel, Receiver};
use std::thread;
use std::time::Duration;

fn main() -> Result<()> {
  // Set up a TCP listener
  const TCP_PORT: u32 = 3000;
  let (exit_sender, exit_receiver) = channel::<()>();
  let listener_thread = thread::spawn(|| listen_for_connections(TCP_PORT, exit_receiver));

  // Wait for the listener to start
  thread::sleep(Duration::from_millis(1000));

  // Prepare an FFmpeg command with separate outputs for video, audio, and subtitles.
  FfmpegCommand::new()
    // Global flags
    .hide_banner()
    .overwrite() // <- overwrite required on windows
    // Generate test video
    .format("lavfi")
    .input("testsrc=size=1920x1080:rate=60:duration=10")
    // Generate test audio
    .format("lavfi")
    .input("sine=frequency=1000:duration=10")
    // Generate test subtitles
    .format("srt")
    .input(
      "data:text/plain;base64,MQ0KMDA6MDA6MDAsMDAwIC0tPiAwMDowMDoxMCw1MDANCkhlbGxvIFdvcmxkIQ==",
    )
    // Video output
    .map("0:v")
    .format("rawvideo")
    .pix_fmt("rgb24")
    .output(format!("tcp://127.0.0.1:{TCP_PORT}"))
    // Audio output
    .map("1:a")
    .format("s16le")
    .output(format!("tcp://127.0.0.1:{TCP_PORT}"))
    // Subtitles output
    .map("2:s")
    .format("srt")
    .output(format!("tcp://127.0.0.1:{TCP_PORT}"))
    .print_command()
    .spawn()?
    .iter()?
    .for_each(|event| match event {
      // Verify output size from FFmpeg logs (video/audio KiB)
      FfmpegEvent::Log(LogLevel::Info, msg) if msg.starts_with("[out#") => {
        println!("{msg}");
      }

      // Log any unexpected errors
      FfmpegEvent::Log(LogLevel::Warning | LogLevel::Error | LogLevel::Fatal, msg) => {
        eprintln!("{msg}");
      }

      // _ => {}
      e => {
        println!("{:?}", e);
      }
    });
  exit_sender.send(())?;
  listener_thread.join().unwrap()?;
  Ok(())
}

fn listen_for_connections(tcp_port: u32, exit_receiver: Receiver<()>) -> Result<()> {
  let listener = TcpListener::bind(format!("127.0.0.1:{tcp_port}"))?;
  listener.set_nonblocking(true)?;
  println!("Server listening on port {tcp_port}");

  let mut handler_threads = Vec::new();
  loop {
    if exit_receiver.try_recv().is_ok() {
      break;
    }
    match listener.accept() {
      Ok((stream, _)) => {
        handler_threads.push(thread::spawn(move || handle_connection(stream)));
      }
      Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
        thread::sleep(Duration::from_millis(10));
      }
      Err(e) => {
        bail!(e);
      }
    }
  }

  for handler in handler_threads {
    handler.join().unwrap()?;
  }

  println!("Listener thread exiting");
  Ok(())
}

fn handle_connection(mut stream: TcpStream) -> Result<()> {
  let mut buffer = [0; 1024];
  let mut total_bytes_read = 0;
  loop {
    match stream.read(&mut buffer) {
      Ok(bytes_read) if bytes_read > 0 => {
        total_bytes_read += bytes_read;
      }
      Ok(0) => {
        break;
      }
      Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
        thread::sleep(Duration::from_millis(10));
      }
      Err(e) => {
        bail!(e);
      }
      _ => {}
    }
  }
  let bytes_str = if total_bytes_read < 1024 {
    format!("{total_bytes_read}B")
  } else {
    format!("{}KiB", total_bytes_read / 1024)
  };
  println!("Read {bytes_str} from client");
  Ok(())
}