webrtc_media/io/sample_builder/
mod.rs

1#[cfg(test)]
2mod sample_builder_test;
3#[cfg(test)]
4mod sample_sequence_location_test;
5
6pub mod sample_sequence_location;
7
8use std::time::{Duration, SystemTime};
9
10use bytes::Bytes;
11use rtp::packet::Packet;
12use rtp::packetizer::Depacketizer;
13
14use self::sample_sequence_location::{Comparison, SampleSequenceLocation};
15use crate::Sample;
16
17/// SampleBuilder buffers packets until media frames are complete.
18pub struct SampleBuilder<T: Depacketizer> {
19    /// how many packets to wait until we get a valid Sample
20    max_late: u16,
21    /// max timestamp between old and new timestamps before dropping packets
22    max_late_timestamp: u32,
23    buffer: Vec<Option<Packet>>,
24    prepared_samples: Vec<Option<Sample>>,
25    last_sample_timestamp: Option<u32>,
26
27    /// Interface that allows us to take RTP packets to samples
28    depacketizer: T,
29
30    /// sample_rate allows us to compute duration of media.SamplecA
31    sample_rate: u32,
32
33    /// filled contains the head/tail of the packets inserted into the buffer
34    filled: SampleSequenceLocation,
35
36    /// active contains the active head/tail of the timestamp being actively processed
37    active: SampleSequenceLocation,
38
39    /// prepared contains the samples that have been processed to date
40    prepared: SampleSequenceLocation,
41
42    /// number of packets forced to be dropped
43    dropped_packets: u16,
44
45    /// number of padding packets detected and dropped. This number will be a subset of
46    /// `dropped_packets`
47    padding_packets: u16,
48}
49
50impl<T: Depacketizer> SampleBuilder<T> {
51    /// Constructs a new SampleBuilder.
52    /// `max_late` is how long to wait until we can construct a completed [`Sample`].
53    /// `max_late` is measured in RTP packet sequence numbers.
54    /// A large max_late will result in less packet loss but higher latency.
55    /// The depacketizer extracts media samples from RTP packets.
56    /// Several depacketizers are available in package [github.com/pion/rtp/codecs](https://github.com/webrtc-rs/rtp/tree/main/src/codecs).
57    pub fn new(max_late: u16, depacketizer: T, sample_rate: u32) -> Self {
58        Self {
59            max_late,
60            max_late_timestamp: 0,
61            buffer: vec![None; u16::MAX as usize + 1],
62            prepared_samples: (0..=u16::MAX as usize).map(|_| None).collect(),
63            last_sample_timestamp: None,
64            depacketizer,
65            sample_rate,
66            filled: SampleSequenceLocation::new(),
67            active: SampleSequenceLocation::new(),
68            prepared: SampleSequenceLocation::new(),
69            dropped_packets: 0,
70            padding_packets: 0,
71        }
72    }
73
74    pub fn with_max_time_delay(mut self, max_late_duration: Duration) -> Self {
75        self.max_late_timestamp =
76            (self.sample_rate as u128 * max_late_duration.as_millis() / 1000) as u32;
77        self
78    }
79
80    fn too_old(&self, location: &SampleSequenceLocation) -> bool {
81        if self.max_late_timestamp == 0 {
82            return false;
83        }
84
85        let mut found_head: Option<u32> = None;
86        let mut found_tail: Option<u32> = None;
87
88        let mut i = location.head;
89        while i != location.tail {
90            if let Some(ref packet) = self.buffer[i as usize] {
91                found_head = Some(packet.header.timestamp);
92                break;
93            }
94            i = i.wrapping_add(1);
95        }
96
97        if found_head.is_none() {
98            return false;
99        }
100
101        let mut i = location.tail.wrapping_sub(1);
102        while i != location.head {
103            if let Some(ref packet) = self.buffer[i as usize] {
104                found_tail = Some(packet.header.timestamp);
105                break;
106            }
107            i = i.wrapping_sub(1);
108        }
109
110        if found_tail.is_none() {
111            return false;
112        }
113
114        found_tail.unwrap() - found_head.unwrap() > self.max_late_timestamp
115    }
116
117    /// Returns the timestamp associated with a given sample location
118    fn fetch_timestamp(&self, location: &SampleSequenceLocation) -> Option<u32> {
119        if location.empty() {
120            None
121        } else {
122            Some(
123                (self.buffer[location.head as usize])
124                    .as_ref()?
125                    .header
126                    .timestamp,
127            )
128        }
129    }
130
131    fn release_packet(&mut self, i: u16) {
132        self.buffer[i as usize] = None;
133    }
134
135    /// Clears all buffers that have already been consumed by
136    /// popping.
137    fn purge_consumed_buffers(&mut self) {
138        let active = self.active;
139        self.purge_consumed_location(&active, false);
140    }
141
142    /// Clears all buffers that have already been consumed
143    /// during a sample building method.
144    fn purge_consumed_location(&mut self, consume: &SampleSequenceLocation, force_consume: bool) {
145        if !self.filled.has_data() {
146            return;
147        }
148        match consume.compare(self.filled.head) {
149            Comparison::Inside if force_consume => {
150                self.release_packet(self.filled.head);
151                self.filled.head = self.filled.head.wrapping_add(1);
152            }
153            Comparison::Before => {
154                self.release_packet(self.filled.head);
155                self.filled.head = self.filled.head.wrapping_add(1);
156            }
157            _ => {}
158        }
159    }
160
161    /// Flushes all buffers that are already consumed or those buffers
162    /// that are too late to consume.
163    fn purge_buffers(&mut self) {
164        self.purge_consumed_buffers();
165
166        while (self.too_old(&self.filled) || (self.filled.count() > self.max_late))
167            && self.filled.has_data()
168        {
169            if self.active.empty() {
170                // refill the active based on the filled packets
171                self.active = self.filled;
172            }
173
174            if self.active.has_data() && (self.active.head == self.filled.head) {
175                // attempt to force the active packet to be consumed even though
176                // outstanding data may be pending arrival
177                let err = match self.build_sample(true) {
178                    Ok(_) => continue,
179                    Err(e) => e,
180                };
181
182                if !matches!(err, BuildError::InvalidPartition(_)) {
183                    // In the InvalidPartition case `build_sample` will have already adjusted `dropped_packets`.
184                    self.dropped_packets += 1;
185                }
186
187                // could not build the sample so drop it
188                self.active.head = self.active.head.wrapping_add(1);
189            }
190
191            self.release_packet(self.filled.head);
192            self.filled.head = self.filled.head.wrapping_add(1);
193        }
194    }
195
196    /// Adds an RTP Packet to self's buffer.
197    ///
198    /// Push does not copy the input. If you wish to reuse
199    /// this memory make sure to copy before calling push
200    pub fn push(&mut self, p: Packet) {
201        let sequence_number = p.header.sequence_number;
202        self.buffer[sequence_number as usize] = Some(p);
203        match self.filled.compare(sequence_number) {
204            Comparison::Void => {
205                self.filled.head = sequence_number;
206                self.filled.tail = sequence_number.wrapping_add(1);
207            }
208            Comparison::Before => {
209                self.filled.head = sequence_number;
210            }
211            Comparison::After => {
212                self.filled.tail = sequence_number.wrapping_add(1);
213            }
214            _ => {}
215        }
216        self.purge_buffers();
217    }
218
219    /// Creates a sample from a valid collection of RTP Packets by
220    /// walking forwards building a sample if everything looks good clear and
221    /// update buffer+values
222    fn build_sample(
223        &mut self,
224        purging_buffers: bool,
225    ) -> Result<SampleSequenceLocation, BuildError> {
226        if self.active.empty() {
227            self.active = self.filled;
228        }
229
230        if self.active.empty() {
231            return Err(BuildError::NoActiveSegment);
232        }
233
234        if self.filled.compare(self.active.tail) == Comparison::Inside {
235            self.active.tail = self.filled.tail;
236        }
237
238        let mut consume = SampleSequenceLocation::new();
239
240        let mut i = self.active.head;
241        // `self.active` isn't modified in the loop, fetch the timestamp once and cache it.
242        let head_timestamp = self.fetch_timestamp(&self.active);
243        while let Some(ref packet) = self.buffer[i as usize] {
244            if self.active.compare(i) == Comparison::After {
245                break;
246            }
247            let is_same_timestamp = head_timestamp.map(|t| packet.header.timestamp == t);
248            let is_different_timestamp = is_same_timestamp.map(std::ops::Not::not);
249            let is_partition_tail = self
250                .depacketizer
251                .is_partition_tail(packet.header.marker, &packet.payload);
252
253            // If the timestamp is not the same it might be because the next packet is both a start
254            // and end of the next partition in which case a sample should be generated now. This
255            // can happen when padding packets are used .e.g:
256            //
257            // p1(t=1), p2(t=1), p3(t=1), p4(t=2, marker=true, start=true)
258            //
259            // In thic case the generated sample should be p1 through p3, but excluding p4 which is
260            // its own sample.
261            if is_partition_tail && is_same_timestamp.unwrap_or(true) {
262                consume.head = self.active.head;
263                consume.tail = i.wrapping_add(1);
264                break;
265            }
266
267            if is_different_timestamp.unwrap_or(false) {
268                consume.head = self.active.head;
269                consume.tail = i;
270                break;
271            }
272            i = i.wrapping_add(1);
273        }
274
275        if consume.empty() {
276            return Err(BuildError::NothingToConsume);
277        }
278
279        if !purging_buffers && self.buffer[consume.tail as usize].is_none() {
280            // wait for the next packet after this set of packets to arrive
281            // to ensure at least one post sample timestamp is known
282            // (unless we have to release right now)
283            return Err(BuildError::PendingTimestampPacket);
284        }
285
286        let sample_timestamp = self.fetch_timestamp(&self.active).unwrap_or(0);
287        let mut after_timestamp = sample_timestamp;
288
289        // scan for any packet after the current and use that time stamp as the diff point
290        for i in consume.tail..self.active.tail {
291            if let Some(ref packet) = self.buffer[i as usize] {
292                after_timestamp = packet.header.timestamp;
293                break;
294            }
295        }
296
297        // prior to decoding all the packets, check if this packet
298        // would end being disposed anyway
299        let head_payload = self.buffer[consume.head as usize]
300            .as_ref()
301            .map(|p| &p.payload)
302            .ok_or(BuildError::GapInSegment)?;
303        if !self.depacketizer.is_partition_head(head_payload) {
304            // libWebRTC will sometimes send several empty padding packets to smooth out send
305            // rate. These packets don't carry any media payloads.
306            let is_padding = consume.range(&self.buffer).all(|p| {
307                p.map(|p| {
308                    self.last_sample_timestamp == Some(p.header.timestamp) && p.payload.is_empty()
309                })
310                .unwrap_or(false)
311            });
312
313            self.dropped_packets += consume.count();
314            if is_padding {
315                self.padding_packets += consume.count();
316            }
317            self.purge_consumed_location(&consume, true);
318            self.purge_consumed_buffers();
319
320            self.active.head = consume.tail;
321            return Err(BuildError::InvalidPartition(consume));
322        }
323
324        // the head set of packets is now fully consumed
325        self.active.head = consume.tail;
326
327        // merge all the buffers into a sample
328        let mut data: Vec<u8> = Vec::new();
329        let mut i = consume.head;
330        while i != consume.tail {
331            let payload = self.buffer[i as usize]
332                .as_ref()
333                .map(|p| &p.payload)
334                .ok_or(BuildError::GapInSegment)?;
335
336            let p = self
337                .depacketizer
338                .depacketize(payload)
339                .map_err(|_| BuildError::DepacketizerFailed)?;
340
341            data.extend_from_slice(&p);
342            i = i.wrapping_add(1);
343        }
344        let samples = after_timestamp - sample_timestamp;
345
346        let sample = Sample {
347            data: Bytes::copy_from_slice(&data),
348            timestamp: SystemTime::now(),
349            duration: Duration::from_secs_f64((samples as f64) / (self.sample_rate as f64)),
350            packet_timestamp: sample_timestamp,
351            prev_dropped_packets: self.dropped_packets,
352            prev_padding_packets: self.padding_packets,
353        };
354
355        self.dropped_packets = 0;
356        self.padding_packets = 0;
357        self.last_sample_timestamp = Some(sample_timestamp);
358
359        self.prepared_samples[self.prepared.tail as usize] = Some(sample);
360        self.prepared.tail = self.prepared.tail.wrapping_add(1);
361
362        self.purge_consumed_location(&consume, true);
363        self.purge_consumed_buffers();
364
365        Ok(consume)
366    }
367
368    /// Compiles pushed RTP packets into media samples and then
369    /// returns the next valid sample (or None if no sample is compiled).
370    pub fn pop(&mut self) -> Option<Sample> {
371        let _ = self.build_sample(false);
372
373        if self.prepared.empty() {
374            return None;
375        }
376        let result = self.prepared_samples[self.prepared.head as usize].take();
377        self.prepared.head = self.prepared.head.wrapping_add(1);
378        result
379    }
380
381    /// Compiles pushed RTP packets into media samples and then
382    /// returns the next valid sample with its associated RTP timestamp (or `None` if
383    /// no sample is compiled).
384    pub fn pop_with_timestamp(&mut self) -> Option<(Sample, u32)> {
385        if let Some(sample) = self.pop() {
386            let timestamp = sample.packet_timestamp;
387            Some((sample, timestamp))
388        } else {
389            None
390        }
391    }
392}
393
394// Computes the distance between two sequence numbers
395/*pub(crate) fn seqnum_distance(head: u16, tail: u16) -> u16 {
396    if head > tail {
397        head.wrapping_add(tail)
398    } else {
399        tail - head
400    }
401}*/
402
403pub(crate) fn seqnum_distance(x: u16, y: u16) -> u16 {
404    let diff = x.wrapping_sub(y);
405    if diff > 0xFFFF / 2 {
406        0xFFFF - diff + 1
407    } else {
408        diff
409    }
410}
411
412#[derive(Debug)]
413enum BuildError {
414    /// There's no active segment of RTP packets to consider yet.
415    NoActiveSegment,
416
417    /// No sample partition could be found in the active segment.
418    NothingToConsume,
419
420    /// A segment to consume was identified, but a subsequent packet is needed to determine the
421    /// duration of the sample.
422    PendingTimestampPacket,
423
424    /// The active segment's head was not aligned with a sample partition head. Some packets were
425    /// dropped.
426    InvalidPartition(SampleSequenceLocation),
427
428    /// There was a gap in the active segment because of one or more missing RTP packets.
429    GapInSegment,
430
431    /// We failed to depacketize an RTP packet.
432    DepacketizerFailed,
433}