webrtc_media/io/ogg_writer/
mod.rs

1#[cfg(test)]
2mod ogg_writer_test;
3
4use std::io::{BufWriter, Seek, Write};
5
6use byteorder::{LittleEndian, WriteBytesExt};
7use bytes::Bytes;
8use rtp::packetizer::Depacketizer;
9
10use crate::error::Result;
11use crate::io::ogg_reader::*;
12use crate::io::Writer;
13
14/// OggWriter is used to take RTP packets and write them to an OGG on disk
15pub struct OggWriter<W: Write + Seek> {
16    writer: W,
17    sample_rate: u32,
18    channel_count: u8,
19    serial: u32,
20    page_index: u32,
21    checksum_table: [u32; 256],
22    previous_granule_position: u64,
23    previous_timestamp: u32,
24    last_payload_size: usize,
25    last_payload: Bytes,
26}
27
28impl<W: Write + Seek> OggWriter<W> {
29    /// new initialize a new OGG Opus writer with an io.Writer output
30    pub fn new(writer: W, sample_rate: u32, channel_count: u8) -> Result<Self> {
31        let mut w = OggWriter {
32            writer,
33            sample_rate,
34            channel_count,
35            serial: rand::random::<u32>(),
36            page_index: 0,
37            checksum_table: generate_checksum_table(),
38
39            // Timestamp and Granule MUST start from 1
40            // Only headers can have 0 values
41            previous_timestamp: 1,
42            previous_granule_position: 1,
43            last_payload_size: 0,
44            last_payload: Bytes::new(),
45        };
46
47        w.write_headers()?;
48
49        Ok(w)
50    }
51
52    /*
53        ref: https://tools.ietf.org/html/rfc7845.html
54        https://git.xiph.org/?p=opus-tools.git;a=blob;f=src/opus_header.c#l219
55
56           Page 0         Pages 1 ... n        Pages (n+1) ...
57        +------------+ +---+ +---+ ... +---+ +-----------+ +---------+ +--
58        |            | |   | |   |     |   | |           | |         | |
59        |+----------+| |+-----------------+| |+-------------------+ +-----
60        |||ID Header|| ||  Comment Header || ||Audio Data Packet 1| | ...
61        |+----------+| |+-----------------+| |+-------------------+ +-----
62        |            | |   | |   |     |   | |           | |         | |
63        +------------+ +---+ +---+ ... +---+ +-----------+ +---------+ +--
64        ^      ^                           ^
65        |      |                           |
66        |      |                           Mandatory Page Break
67        |      |
68        |      ID header is contained on a single page
69        |
70        'Beginning Of Stream'
71
72       Figure 1: Example Packet Organization for a Logical Ogg Opus Stream
73    */
74
75    fn write_headers(&mut self) -> Result<()> {
76        // ID Header
77        let mut ogg_id_header = Vec::with_capacity(19);
78        {
79            let mut header_writer = BufWriter::new(&mut ogg_id_header);
80            header_writer.write_all(ID_PAGE_SIGNATURE)?; // Magic Signature 'OpusHead'
81            header_writer.write_u8(1)?; // Version //8
82            header_writer.write_u8(self.channel_count)?; // Channel count //9
83            header_writer.write_u16::<LittleEndian>(DEFAULT_PRE_SKIP)?; // pre-skip //10-11
84            header_writer.write_u32::<LittleEndian>(self.sample_rate)?; // original sample rate, any valid sample e.g 48000, //12-15
85            header_writer.write_u16::<LittleEndian>(0)?; // output gain // 16-17
86            header_writer.write_u8(0)?; // channel map 0 = one stream: mono or stereo, //18
87        }
88
89        // Reference: https://tools.ietf.org/html/rfc7845.html#page-6
90        // RFC specifies that the ID Header page should have a granule position of 0 and a Header Type set to 2 (StartOfStream)
91        self.write_page(
92            &Bytes::from(ogg_id_header),
93            PAGE_HEADER_TYPE_BEGINNING_OF_STREAM,
94            0,
95            self.page_index,
96        )?;
97        self.page_index += 1;
98
99        // Comment Header
100        let mut ogg_comment_header = Vec::with_capacity(25);
101        {
102            let mut header_writer = BufWriter::new(&mut ogg_comment_header);
103            header_writer.write_all(COMMENT_PAGE_SIGNATURE)?; // Magic Signature 'OpusTags' //0-7
104            header_writer.write_u32::<LittleEndian>(10)?; // Vendor Length //8-11
105            header_writer.write_all(b"WebRTC.rs")?; // Vendor name 'WebRTC.rs' //12-20
106            header_writer.write_u32::<LittleEndian>(0)?; // User Comment List Length //21-24
107        }
108
109        // RFC specifies that the page where the CommentHeader completes should have a granule position of 0
110        self.write_page(
111            &Bytes::from(ogg_comment_header),
112            PAGE_HEADER_TYPE_CONTINUATION_OF_STREAM,
113            0,
114            self.page_index,
115        )?;
116        self.page_index += 1;
117
118        Ok(())
119    }
120
121    fn write_page(
122        &mut self,
123        payload: &Bytes,
124        header_type: u8,
125        granule_pos: u64,
126        page_index: u32,
127    ) -> Result<()> {
128        self.last_payload_size = payload.len();
129        self.last_payload = payload.clone();
130        let n_segments = self.last_payload_size.div_ceil(255);
131
132        let mut page =
133            Vec::with_capacity(PAGE_HEADER_SIZE + 1 + self.last_payload_size + n_segments);
134        {
135            let mut header_writer = BufWriter::new(&mut page);
136            header_writer.write_all(PAGE_HEADER_SIGNATURE)?; // page headers starts with 'OggS'//0-3
137            header_writer.write_u8(0)?; // Version//4
138            header_writer.write_u8(header_type)?; // 1 = continuation, 2 = beginning of stream, 4 = end of stream//5
139            header_writer.write_u64::<LittleEndian>(granule_pos)?; // granule position //6-13
140            header_writer.write_u32::<LittleEndian>(self.serial)?; // Bitstream serial number//14-17
141            header_writer.write_u32::<LittleEndian>(page_index)?; // Page sequence number//18-21
142            header_writer.write_u32::<LittleEndian>(0)?; //Checksum reserve //22-25
143            header_writer.write_u8(n_segments as u8)?; // Number of segments in page //26
144
145            // Filling the segment table with the lacing values.
146            // First (n_segments - 1) values will always be 255.
147            for _ in 0..n_segments - 1 {
148                header_writer.write_u8(255)?;
149            }
150            // The last value will be the remainder.
151            header_writer.write_u8((self.last_payload_size - (n_segments * 255 - 255)) as u8)?;
152
153            header_writer.write_all(payload)?; // inserting at 28th since Segment Table(1) + header length(27)
154        }
155
156        let mut checksum = 0u32;
157        for v in &page {
158            checksum =
159                (checksum << 8) ^ self.checksum_table[(((checksum >> 24) as u8) ^ (*v)) as usize];
160        }
161        page[22..26].copy_from_slice(&checksum.to_le_bytes()); // Checksum - generating for page data and inserting at 22th position into 32 bits
162
163        self.writer.write_all(&page)?;
164
165        Ok(())
166    }
167}
168
169impl<W: Write + Seek> Writer for OggWriter<W> {
170    /// write_rtp adds a new packet and writes the appropriate headers for it
171    fn write_rtp(&mut self, packet: &rtp::packet::Packet) -> Result<()> {
172        if packet.payload.is_empty() {
173            return Ok(());
174        }
175
176        let mut opus_packet = rtp::codecs::opus::OpusPacket;
177        let payload = opus_packet.depacketize(&packet.payload)?;
178
179        // Should be equivalent to sample_rate * duration
180        if self.previous_timestamp != 1 {
181            let increment = packet.header.timestamp - self.previous_timestamp;
182            self.previous_granule_position += increment as u64;
183        }
184        self.previous_timestamp = packet.header.timestamp;
185
186        self.write_page(
187            &payload,
188            PAGE_HEADER_TYPE_CONTINUATION_OF_STREAM,
189            self.previous_granule_position,
190            self.page_index,
191        )?;
192        self.page_index += 1;
193
194        Ok(())
195    }
196
197    /// close stops the recording
198    fn close(&mut self) -> Result<()> {
199        let payload = self.last_payload.clone();
200        self.write_page(
201            &payload,
202            PAGE_HEADER_TYPE_END_OF_STREAM,
203            self.previous_granule_position,
204            self.page_index - 1,
205        )?;
206
207        self.writer.flush()?;
208        Ok(())
209    }
210}