1use std::fmt;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4use std::time::SystemTime;
5
6use bytes::{Buf, BufMut, Bytes, BytesMut};
7use portable_atomic::AtomicBool;
8
9use super::chunk_header::*;
10use super::chunk_type::*;
11use super::*;
12
13pub(crate) const PAYLOAD_DATA_ENDING_FRAGMENT_BITMASK: u8 = 1;
14pub(crate) const PAYLOAD_DATA_BEGINNING_FRAGMENT_BITMASK: u8 = 2;
15pub(crate) const PAYLOAD_DATA_UNORDERED_BITMASK: u8 = 4;
16pub(crate) const PAYLOAD_DATA_IMMEDIATE_SACK: u8 = 8;
17pub(crate) const PAYLOAD_DATA_HEADER_SIZE: usize = 12;
18
19#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
23#[repr(C)]
24pub enum PayloadProtocolIdentifier {
25 Dcep = 50,
26 String = 51,
27 Binary = 53,
28 StringEmpty = 56,
29 BinaryEmpty = 57,
30 #[default]
31 Unknown,
32}
33
34impl fmt::Display for PayloadProtocolIdentifier {
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 let s = match *self {
37 PayloadProtocolIdentifier::Dcep => "WebRTC DCEP",
38 PayloadProtocolIdentifier::String => "WebRTC String",
39 PayloadProtocolIdentifier::Binary => "WebRTC Binary",
40 PayloadProtocolIdentifier::StringEmpty => "WebRTC String (Empty)",
41 PayloadProtocolIdentifier::BinaryEmpty => "WebRTC Binary (Empty)",
42 _ => "Unknown Payload Protocol Identifier",
43 };
44 write!(f, "{s}")
45 }
46}
47
48impl From<u32> for PayloadProtocolIdentifier {
49 fn from(v: u32) -> PayloadProtocolIdentifier {
50 match v {
51 50 => PayloadProtocolIdentifier::Dcep,
52 51 => PayloadProtocolIdentifier::String,
53 53 => PayloadProtocolIdentifier::Binary,
54 56 => PayloadProtocolIdentifier::StringEmpty,
55 57 => PayloadProtocolIdentifier::BinaryEmpty,
56 _ => PayloadProtocolIdentifier::Unknown,
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
96pub struct ChunkPayloadData {
97 pub(crate) unordered: bool,
98 pub(crate) beginning_fragment: bool,
99 pub(crate) ending_fragment: bool,
100 pub(crate) immediate_sack: bool,
101
102 pub(crate) tsn: u32,
103 pub(crate) stream_identifier: u16,
104 pub(crate) stream_sequence_number: u16,
105 pub(crate) payload_type: PayloadProtocolIdentifier,
106 pub(crate) user_data: Bytes,
107
108 pub(crate) acked: bool,
110 pub(crate) miss_indicator: u32,
111
112 pub(crate) since: SystemTime,
114 pub(crate) nsent: u32,
116
117 pub(crate) abandoned: Arc<AtomicBool>,
119 pub(crate) all_inflight: Arc<AtomicBool>,
121
122 pub(crate) retransmit: bool,
125}
126
127impl Default for ChunkPayloadData {
128 fn default() -> Self {
129 ChunkPayloadData {
130 unordered: false,
131 beginning_fragment: false,
132 ending_fragment: false,
133 immediate_sack: false,
134 tsn: 0,
135 stream_identifier: 0,
136 stream_sequence_number: 0,
137 payload_type: PayloadProtocolIdentifier::default(),
138 user_data: Bytes::new(),
139 acked: false,
140 miss_indicator: 0,
141 since: SystemTime::now(),
142 nsent: 0,
143 abandoned: Arc::new(AtomicBool::new(false)),
144 all_inflight: Arc::new(AtomicBool::new(false)),
145 retransmit: false,
146 }
147 }
148}
149
150impl fmt::Display for ChunkPayloadData {
152 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153 write!(f, "{}\n{}", self.header(), self.tsn)
154 }
155}
156
157impl Chunk for ChunkPayloadData {
158 fn header(&self) -> ChunkHeader {
159 let mut flags: u8 = 0;
160 if self.ending_fragment {
161 flags = 1;
162 }
163 if self.beginning_fragment {
164 flags |= 1 << 1;
165 }
166 if self.unordered {
167 flags |= 1 << 2;
168 }
169 if self.immediate_sack {
170 flags |= 1 << 3;
171 }
172
173 ChunkHeader {
174 typ: CT_PAYLOAD_DATA,
175 flags,
176 value_length: self.value_length() as u16,
177 }
178 }
179
180 fn unmarshal(raw: &Bytes) -> Result<Self> {
181 let header = ChunkHeader::unmarshal(raw)?;
182
183 if header.typ != CT_PAYLOAD_DATA {
184 return Err(Error::ErrChunkTypeNotPayloadData);
185 }
186
187 let immediate_sack = (header.flags & PAYLOAD_DATA_IMMEDIATE_SACK) != 0;
188 let unordered = (header.flags & PAYLOAD_DATA_UNORDERED_BITMASK) != 0;
189 let beginning_fragment = (header.flags & PAYLOAD_DATA_BEGINNING_FRAGMENT_BITMASK) != 0;
190 let ending_fragment = (header.flags & PAYLOAD_DATA_ENDING_FRAGMENT_BITMASK) != 0;
191
192 if header.value_length() < PAYLOAD_DATA_HEADER_SIZE {
194 return Err(Error::ErrChunkPayloadSmall);
195 }
196
197 let reader = &mut raw.slice(CHUNK_HEADER_SIZE..CHUNK_HEADER_SIZE + header.value_length());
198
199 let tsn = reader.get_u32();
200 let stream_identifier = reader.get_u16();
201 let stream_sequence_number = reader.get_u16();
202 let payload_type: PayloadProtocolIdentifier = reader.get_u32().into();
203 let user_data = raw.slice(
204 CHUNK_HEADER_SIZE + PAYLOAD_DATA_HEADER_SIZE..CHUNK_HEADER_SIZE + header.value_length(),
205 );
206
207 Ok(ChunkPayloadData {
208 unordered,
209 beginning_fragment,
210 ending_fragment,
211 immediate_sack,
212
213 tsn,
214 stream_identifier,
215 stream_sequence_number,
216 payload_type,
217 user_data,
218 acked: false,
219 miss_indicator: 0,
220 since: SystemTime::now(),
221 nsent: 0,
222 abandoned: Arc::new(AtomicBool::new(false)),
223 all_inflight: Arc::new(AtomicBool::new(false)),
224 retransmit: false,
225 })
226 }
227
228 fn marshal_to(&self, writer: &mut BytesMut) -> Result<usize> {
229 self.header().marshal_to(writer)?;
230
231 writer.put_u32(self.tsn);
232 writer.put_u16(self.stream_identifier);
233 writer.put_u16(self.stream_sequence_number);
234 writer.put_u32(self.payload_type as u32);
235 writer.extend_from_slice(&self.user_data);
236
237 Ok(writer.len())
238 }
239
240 fn check(&self) -> Result<()> {
241 Ok(())
242 }
243
244 fn value_length(&self) -> usize {
245 PAYLOAD_DATA_HEADER_SIZE + self.user_data.len()
246 }
247
248 fn as_any(&self) -> &(dyn Any + Send + Sync) {
249 self
250 }
251}
252
253impl ChunkPayloadData {
254 pub(crate) fn abandoned(&self) -> bool {
255 let (abandoned, all_inflight) = (
256 self.abandoned.load(Ordering::SeqCst),
257 self.all_inflight.load(Ordering::SeqCst),
258 );
259
260 abandoned && all_inflight
261 }
262
263 pub(crate) fn set_abandoned(&self, abandoned: bool) {
264 self.abandoned.store(abandoned, Ordering::SeqCst);
265 }
266
267 pub(crate) fn set_all_inflight(&mut self) {
268 if self.ending_fragment {
269 self.all_inflight.store(true, Ordering::SeqCst);
270 }
271 }
272}