webrtc_media/io/ogg_writer/
mod.rs1#[cfg(test)]
2mod ogg_writer_test;
3
4use std::io::{BufWriter, Seek, Write};
5
6use byteorder::{LittleEndian, WriteBytesExt};
7use bytes::Bytes;
8use rtp::packetizer::Depacketizer;
9
10use crate::error::Result;
11use crate::io::ogg_reader::*;
12use crate::io::Writer;
13
14pub struct OggWriter<W: Write + Seek> {
16 writer: W,
17 sample_rate: u32,
18 channel_count: u8,
19 serial: u32,
20 page_index: u32,
21 checksum_table: [u32; 256],
22 previous_granule_position: u64,
23 previous_timestamp: u32,
24 last_payload_size: usize,
25 last_payload: Bytes,
26}
27
28impl<W: Write + Seek> OggWriter<W> {
29 pub fn new(writer: W, sample_rate: u32, channel_count: u8) -> Result<Self> {
31 let mut w = OggWriter {
32 writer,
33 sample_rate,
34 channel_count,
35 serial: rand::random::<u32>(),
36 page_index: 0,
37 checksum_table: generate_checksum_table(),
38
39 previous_timestamp: 1,
42 previous_granule_position: 1,
43 last_payload_size: 0,
44 last_payload: Bytes::new(),
45 };
46
47 w.write_headers()?;
48
49 Ok(w)
50 }
51
52 fn write_headers(&mut self) -> Result<()> {
76 let mut ogg_id_header = Vec::with_capacity(19);
78 {
79 let mut header_writer = BufWriter::new(&mut ogg_id_header);
80 header_writer.write_all(ID_PAGE_SIGNATURE)?; header_writer.write_u8(1)?; header_writer.write_u8(self.channel_count)?; header_writer.write_u16::<LittleEndian>(DEFAULT_PRE_SKIP)?; header_writer.write_u32::<LittleEndian>(self.sample_rate)?; header_writer.write_u16::<LittleEndian>(0)?; header_writer.write_u8(0)?; }
88
89 self.write_page(
92 &Bytes::from(ogg_id_header),
93 PAGE_HEADER_TYPE_BEGINNING_OF_STREAM,
94 0,
95 self.page_index,
96 )?;
97 self.page_index += 1;
98
99 let mut ogg_comment_header = Vec::with_capacity(25);
101 {
102 let mut header_writer = BufWriter::new(&mut ogg_comment_header);
103 header_writer.write_all(COMMENT_PAGE_SIGNATURE)?; header_writer.write_u32::<LittleEndian>(10)?; header_writer.write_all(b"WebRTC.rs")?; header_writer.write_u32::<LittleEndian>(0)?; }
108
109 self.write_page(
111 &Bytes::from(ogg_comment_header),
112 PAGE_HEADER_TYPE_CONTINUATION_OF_STREAM,
113 0,
114 self.page_index,
115 )?;
116 self.page_index += 1;
117
118 Ok(())
119 }
120
121 fn write_page(
122 &mut self,
123 payload: &Bytes,
124 header_type: u8,
125 granule_pos: u64,
126 page_index: u32,
127 ) -> Result<()> {
128 self.last_payload_size = payload.len();
129 self.last_payload = payload.clone();
130 let n_segments = self.last_payload_size.div_ceil(255);
131
132 let mut page =
133 Vec::with_capacity(PAGE_HEADER_SIZE + 1 + self.last_payload_size + n_segments);
134 {
135 let mut header_writer = BufWriter::new(&mut page);
136 header_writer.write_all(PAGE_HEADER_SIGNATURE)?; header_writer.write_u8(0)?; header_writer.write_u8(header_type)?; header_writer.write_u64::<LittleEndian>(granule_pos)?; header_writer.write_u32::<LittleEndian>(self.serial)?; header_writer.write_u32::<LittleEndian>(page_index)?; header_writer.write_u32::<LittleEndian>(0)?; header_writer.write_u8(n_segments as u8)?; for _ in 0..n_segments - 1 {
148 header_writer.write_u8(255)?;
149 }
150 header_writer.write_u8((self.last_payload_size - (n_segments * 255 - 255)) as u8)?;
152
153 header_writer.write_all(payload)?; }
155
156 let mut checksum = 0u32;
157 for v in &page {
158 checksum =
159 (checksum << 8) ^ self.checksum_table[(((checksum >> 24) as u8) ^ (*v)) as usize];
160 }
161 page[22..26].copy_from_slice(&checksum.to_le_bytes()); self.writer.write_all(&page)?;
164
165 Ok(())
166 }
167}
168
169impl<W: Write + Seek> Writer for OggWriter<W> {
170 fn write_rtp(&mut self, packet: &rtp::packet::Packet) -> Result<()> {
172 if packet.payload.is_empty() {
173 return Ok(());
174 }
175
176 let mut opus_packet = rtp::codecs::opus::OpusPacket;
177 let payload = opus_packet.depacketize(&packet.payload)?;
178
179 if self.previous_timestamp != 1 {
181 let increment = packet.header.timestamp - self.previous_timestamp;
182 self.previous_granule_position += increment as u64;
183 }
184 self.previous_timestamp = packet.header.timestamp;
185
186 self.write_page(
187 &payload,
188 PAGE_HEADER_TYPE_CONTINUATION_OF_STREAM,
189 self.previous_granule_position,
190 self.page_index,
191 )?;
192 self.page_index += 1;
193
194 Ok(())
195 }
196
197 fn close(&mut self) -> Result<()> {
199 let payload = self.last_payload.clone();
200 self.write_page(
201 &payload,
202 PAGE_HEADER_TYPE_END_OF_STREAM,
203 self.previous_granule_position,
204 self.page_index - 1,
205 )?;
206
207 self.writer.flush()?;
208 Ok(())
209 }
210}