webrtc_media/io/sample_builder/
mod.rs1#[cfg(test)]
2mod sample_builder_test;
3#[cfg(test)]
4mod sample_sequence_location_test;
5
6pub mod sample_sequence_location;
7
8use std::time::{Duration, SystemTime};
9
10use bytes::Bytes;
11use rtp::packet::Packet;
12use rtp::packetizer::Depacketizer;
13
14use self::sample_sequence_location::{Comparison, SampleSequenceLocation};
15use crate::Sample;
16
17pub struct SampleBuilder<T: Depacketizer> {
19 max_late: u16,
21 max_late_timestamp: u32,
23 buffer: Vec<Option<Packet>>,
24 prepared_samples: Vec<Option<Sample>>,
25 last_sample_timestamp: Option<u32>,
26
27 depacketizer: T,
29
30 sample_rate: u32,
32
33 filled: SampleSequenceLocation,
35
36 active: SampleSequenceLocation,
38
39 prepared: SampleSequenceLocation,
41
42 dropped_packets: u16,
44
45 padding_packets: u16,
48}
49
50impl<T: Depacketizer> SampleBuilder<T> {
51 pub fn new(max_late: u16, depacketizer: T, sample_rate: u32) -> Self {
58 Self {
59 max_late,
60 max_late_timestamp: 0,
61 buffer: vec![None; u16::MAX as usize + 1],
62 prepared_samples: (0..=u16::MAX as usize).map(|_| None).collect(),
63 last_sample_timestamp: None,
64 depacketizer,
65 sample_rate,
66 filled: SampleSequenceLocation::new(),
67 active: SampleSequenceLocation::new(),
68 prepared: SampleSequenceLocation::new(),
69 dropped_packets: 0,
70 padding_packets: 0,
71 }
72 }
73
74 pub fn with_max_time_delay(mut self, max_late_duration: Duration) -> Self {
75 self.max_late_timestamp =
76 (self.sample_rate as u128 * max_late_duration.as_millis() / 1000) as u32;
77 self
78 }
79
80 fn too_old(&self, location: &SampleSequenceLocation) -> bool {
81 if self.max_late_timestamp == 0 {
82 return false;
83 }
84
85 let mut found_head: Option<u32> = None;
86 let mut found_tail: Option<u32> = None;
87
88 let mut i = location.head;
89 while i != location.tail {
90 if let Some(ref packet) = self.buffer[i as usize] {
91 found_head = Some(packet.header.timestamp);
92 break;
93 }
94 i = i.wrapping_add(1);
95 }
96
97 if found_head.is_none() {
98 return false;
99 }
100
101 let mut i = location.tail.wrapping_sub(1);
102 while i != location.head {
103 if let Some(ref packet) = self.buffer[i as usize] {
104 found_tail = Some(packet.header.timestamp);
105 break;
106 }
107 i = i.wrapping_sub(1);
108 }
109
110 if found_tail.is_none() {
111 return false;
112 }
113
114 found_tail.unwrap() - found_head.unwrap() > self.max_late_timestamp
115 }
116
117 fn fetch_timestamp(&self, location: &SampleSequenceLocation) -> Option<u32> {
119 if location.empty() {
120 None
121 } else {
122 Some(
123 (self.buffer[location.head as usize])
124 .as_ref()?
125 .header
126 .timestamp,
127 )
128 }
129 }
130
131 fn release_packet(&mut self, i: u16) {
132 self.buffer[i as usize] = None;
133 }
134
135 fn purge_consumed_buffers(&mut self) {
138 let active = self.active;
139 self.purge_consumed_location(&active, false);
140 }
141
142 fn purge_consumed_location(&mut self, consume: &SampleSequenceLocation, force_consume: bool) {
145 if !self.filled.has_data() {
146 return;
147 }
148 match consume.compare(self.filled.head) {
149 Comparison::Inside if force_consume => {
150 self.release_packet(self.filled.head);
151 self.filled.head = self.filled.head.wrapping_add(1);
152 }
153 Comparison::Before => {
154 self.release_packet(self.filled.head);
155 self.filled.head = self.filled.head.wrapping_add(1);
156 }
157 _ => {}
158 }
159 }
160
161 fn purge_buffers(&mut self) {
164 self.purge_consumed_buffers();
165
166 while (self.too_old(&self.filled) || (self.filled.count() > self.max_late))
167 && self.filled.has_data()
168 {
169 if self.active.empty() {
170 self.active = self.filled;
172 }
173
174 if self.active.has_data() && (self.active.head == self.filled.head) {
175 let err = match self.build_sample(true) {
178 Ok(_) => continue,
179 Err(e) => e,
180 };
181
182 if !matches!(err, BuildError::InvalidPartition(_)) {
183 self.dropped_packets += 1;
185 }
186
187 self.active.head = self.active.head.wrapping_add(1);
189 }
190
191 self.release_packet(self.filled.head);
192 self.filled.head = self.filled.head.wrapping_add(1);
193 }
194 }
195
196 pub fn push(&mut self, p: Packet) {
201 let sequence_number = p.header.sequence_number;
202 self.buffer[sequence_number as usize] = Some(p);
203 match self.filled.compare(sequence_number) {
204 Comparison::Void => {
205 self.filled.head = sequence_number;
206 self.filled.tail = sequence_number.wrapping_add(1);
207 }
208 Comparison::Before => {
209 self.filled.head = sequence_number;
210 }
211 Comparison::After => {
212 self.filled.tail = sequence_number.wrapping_add(1);
213 }
214 _ => {}
215 }
216 self.purge_buffers();
217 }
218
219 fn build_sample(
223 &mut self,
224 purging_buffers: bool,
225 ) -> Result<SampleSequenceLocation, BuildError> {
226 if self.active.empty() {
227 self.active = self.filled;
228 }
229
230 if self.active.empty() {
231 return Err(BuildError::NoActiveSegment);
232 }
233
234 if self.filled.compare(self.active.tail) == Comparison::Inside {
235 self.active.tail = self.filled.tail;
236 }
237
238 let mut consume = SampleSequenceLocation::new();
239
240 let mut i = self.active.head;
241 let head_timestamp = self.fetch_timestamp(&self.active);
243 while let Some(ref packet) = self.buffer[i as usize] {
244 if self.active.compare(i) == Comparison::After {
245 break;
246 }
247 let is_same_timestamp = head_timestamp.map(|t| packet.header.timestamp == t);
248 let is_different_timestamp = is_same_timestamp.map(std::ops::Not::not);
249 let is_partition_tail = self
250 .depacketizer
251 .is_partition_tail(packet.header.marker, &packet.payload);
252
253 if is_partition_tail && is_same_timestamp.unwrap_or(true) {
262 consume.head = self.active.head;
263 consume.tail = i.wrapping_add(1);
264 break;
265 }
266
267 if is_different_timestamp.unwrap_or(false) {
268 consume.head = self.active.head;
269 consume.tail = i;
270 break;
271 }
272 i = i.wrapping_add(1);
273 }
274
275 if consume.empty() {
276 return Err(BuildError::NothingToConsume);
277 }
278
279 if !purging_buffers && self.buffer[consume.tail as usize].is_none() {
280 return Err(BuildError::PendingTimestampPacket);
284 }
285
286 let sample_timestamp = self.fetch_timestamp(&self.active).unwrap_or(0);
287 let mut after_timestamp = sample_timestamp;
288
289 for i in consume.tail..self.active.tail {
291 if let Some(ref packet) = self.buffer[i as usize] {
292 after_timestamp = packet.header.timestamp;
293 break;
294 }
295 }
296
297 let head_payload = self.buffer[consume.head as usize]
300 .as_ref()
301 .map(|p| &p.payload)
302 .ok_or(BuildError::GapInSegment)?;
303 if !self.depacketizer.is_partition_head(head_payload) {
304 let is_padding = consume.range(&self.buffer).all(|p| {
307 p.map(|p| {
308 self.last_sample_timestamp == Some(p.header.timestamp) && p.payload.is_empty()
309 })
310 .unwrap_or(false)
311 });
312
313 self.dropped_packets += consume.count();
314 if is_padding {
315 self.padding_packets += consume.count();
316 }
317 self.purge_consumed_location(&consume, true);
318 self.purge_consumed_buffers();
319
320 self.active.head = consume.tail;
321 return Err(BuildError::InvalidPartition(consume));
322 }
323
324 self.active.head = consume.tail;
326
327 let mut data: Vec<u8> = Vec::new();
329 let mut i = consume.head;
330 while i != consume.tail {
331 let payload = self.buffer[i as usize]
332 .as_ref()
333 .map(|p| &p.payload)
334 .ok_or(BuildError::GapInSegment)?;
335
336 let p = self
337 .depacketizer
338 .depacketize(payload)
339 .map_err(|_| BuildError::DepacketizerFailed)?;
340
341 data.extend_from_slice(&p);
342 i = i.wrapping_add(1);
343 }
344 let samples = after_timestamp - sample_timestamp;
345
346 let sample = Sample {
347 data: Bytes::copy_from_slice(&data),
348 timestamp: SystemTime::now(),
349 duration: Duration::from_secs_f64((samples as f64) / (self.sample_rate as f64)),
350 packet_timestamp: sample_timestamp,
351 prev_dropped_packets: self.dropped_packets,
352 prev_padding_packets: self.padding_packets,
353 };
354
355 self.dropped_packets = 0;
356 self.padding_packets = 0;
357 self.last_sample_timestamp = Some(sample_timestamp);
358
359 self.prepared_samples[self.prepared.tail as usize] = Some(sample);
360 self.prepared.tail = self.prepared.tail.wrapping_add(1);
361
362 self.purge_consumed_location(&consume, true);
363 self.purge_consumed_buffers();
364
365 Ok(consume)
366 }
367
368 pub fn pop(&mut self) -> Option<Sample> {
371 let _ = self.build_sample(false);
372
373 if self.prepared.empty() {
374 return None;
375 }
376 let result = self.prepared_samples[self.prepared.head as usize].take();
377 self.prepared.head = self.prepared.head.wrapping_add(1);
378 result
379 }
380
381 pub fn pop_with_timestamp(&mut self) -> Option<(Sample, u32)> {
385 if let Some(sample) = self.pop() {
386 let timestamp = sample.packet_timestamp;
387 Some((sample, timestamp))
388 } else {
389 None
390 }
391 }
392}
393
394pub(crate) fn seqnum_distance(x: u16, y: u16) -> u16 {
404 let diff = x.wrapping_sub(y);
405 if diff > 0xFFFF / 2 {
406 0xFFFF - diff + 1
407 } else {
408 diff
409 }
410}
411
412#[derive(Debug)]
413enum BuildError {
414 NoActiveSegment,
416
417 NothingToConsume,
419
420 PendingTimestampPacket,
423
424 InvalidPartition(SampleSequenceLocation),
427
428 GapInSegment,
430
431 DepacketizerFailed,
433}