webrtc_sctp/chunk/
chunk_payload_data.rs

1use std::fmt;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4use std::time::SystemTime;
5
6use bytes::{Buf, BufMut, Bytes, BytesMut};
7use portable_atomic::AtomicBool;
8
9use super::chunk_header::*;
10use super::chunk_type::*;
11use super::*;
12
13pub(crate) const PAYLOAD_DATA_ENDING_FRAGMENT_BITMASK: u8 = 1;
14pub(crate) const PAYLOAD_DATA_BEGINNING_FRAGMENT_BITMASK: u8 = 2;
15pub(crate) const PAYLOAD_DATA_UNORDERED_BITMASK: u8 = 4;
16pub(crate) const PAYLOAD_DATA_IMMEDIATE_SACK: u8 = 8;
17pub(crate) const PAYLOAD_DATA_HEADER_SIZE: usize = 12;
18
19/// PayloadProtocolIdentifier is an enum for DataChannel payload types
20/// PayloadProtocolIdentifier enums
21/// https://www.iana.org/assignments/sctp-parameters/sctp-parameters.xhtml#sctp-parameters-25
22#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
23#[repr(C)]
24pub enum PayloadProtocolIdentifier {
25    Dcep = 50,
26    String = 51,
27    Binary = 53,
28    StringEmpty = 56,
29    BinaryEmpty = 57,
30    #[default]
31    Unknown,
32}
33
34impl fmt::Display for PayloadProtocolIdentifier {
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        let s = match *self {
37            PayloadProtocolIdentifier::Dcep => "WebRTC DCEP",
38            PayloadProtocolIdentifier::String => "WebRTC String",
39            PayloadProtocolIdentifier::Binary => "WebRTC Binary",
40            PayloadProtocolIdentifier::StringEmpty => "WebRTC String (Empty)",
41            PayloadProtocolIdentifier::BinaryEmpty => "WebRTC Binary (Empty)",
42            _ => "Unknown Payload Protocol Identifier",
43        };
44        write!(f, "{s}")
45    }
46}
47
48impl From<u32> for PayloadProtocolIdentifier {
49    fn from(v: u32) -> PayloadProtocolIdentifier {
50        match v {
51            50 => PayloadProtocolIdentifier::Dcep,
52            51 => PayloadProtocolIdentifier::String,
53            53 => PayloadProtocolIdentifier::Binary,
54            56 => PayloadProtocolIdentifier::StringEmpty,
55            57 => PayloadProtocolIdentifier::BinaryEmpty,
56            _ => PayloadProtocolIdentifier::Unknown,
57        }
58    }
59}
60
61///chunkPayloadData represents an SCTP Chunk of type DATA
62///
63/// 0                   1                   2                   3
64/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
65///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
66///|   Type = 0    | Reserved|U|B|E|    Length                     |
67///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
68///|                              TSN                              |
69///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
70///|      Stream Identifier S      |   Stream Sequence Number n    |
71///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
72///|                  Payload Protocol Identifier                  |
73///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
74///|                                                               |
75///|                 User Data (seq n of Stream S)                 |
76///|                                                               |
77///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
78///
79///
80///An unfragmented user message shall have both the B and E bits set to
81///'1'.  Setting both B and E bits to '0' indicates a middle fragment of
82///a multi-fragment user message, as summarized in the following table:
83///   B E                  Description
84///============================================================
85///|  1 0 | First piece of a fragmented user message          |
86///+----------------------------------------------------------+
87///|  0 0 | Middle piece of a fragmented user message         |
88///+----------------------------------------------------------+
89///|  0 1 | Last piece of a fragmented user message           |
90///+----------------------------------------------------------+
91///|  1 1 | Unfragmented message                              |
92///============================================================
93///|             Table 1: Fragment Description Flags          |
94///============================================================
95#[derive(Debug, Clone)]
96pub struct ChunkPayloadData {
97    pub(crate) unordered: bool,
98    pub(crate) beginning_fragment: bool,
99    pub(crate) ending_fragment: bool,
100    pub(crate) immediate_sack: bool,
101
102    pub(crate) tsn: u32,
103    pub(crate) stream_identifier: u16,
104    pub(crate) stream_sequence_number: u16,
105    pub(crate) payload_type: PayloadProtocolIdentifier,
106    pub(crate) user_data: Bytes,
107
108    /// Whether this data chunk was acknowledged (received by peer)
109    pub(crate) acked: bool,
110    pub(crate) miss_indicator: u32,
111
112    /// Partial-reliability parameters used only by sender
113    pub(crate) since: SystemTime,
114    /// number of transmission made for this chunk
115    pub(crate) nsent: u32,
116
117    /// valid only with the first fragment
118    pub(crate) abandoned: Arc<AtomicBool>,
119    /// valid only with the first fragment
120    pub(crate) all_inflight: Arc<AtomicBool>,
121
122    /// Retransmission flag set when T1-RTX timeout occurred and this
123    /// chunk is still in the inflight queue
124    pub(crate) retransmit: bool,
125}
126
127impl Default for ChunkPayloadData {
128    fn default() -> Self {
129        ChunkPayloadData {
130            unordered: false,
131            beginning_fragment: false,
132            ending_fragment: false,
133            immediate_sack: false,
134            tsn: 0,
135            stream_identifier: 0,
136            stream_sequence_number: 0,
137            payload_type: PayloadProtocolIdentifier::default(),
138            user_data: Bytes::new(),
139            acked: false,
140            miss_indicator: 0,
141            since: SystemTime::now(),
142            nsent: 0,
143            abandoned: Arc::new(AtomicBool::new(false)),
144            all_inflight: Arc::new(AtomicBool::new(false)),
145            retransmit: false,
146        }
147    }
148}
149
150/// makes chunkPayloadData printable
151impl fmt::Display for ChunkPayloadData {
152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153        write!(f, "{}\n{}", self.header(), self.tsn)
154    }
155}
156
157impl Chunk for ChunkPayloadData {
158    fn header(&self) -> ChunkHeader {
159        let mut flags: u8 = 0;
160        if self.ending_fragment {
161            flags = 1;
162        }
163        if self.beginning_fragment {
164            flags |= 1 << 1;
165        }
166        if self.unordered {
167            flags |= 1 << 2;
168        }
169        if self.immediate_sack {
170            flags |= 1 << 3;
171        }
172
173        ChunkHeader {
174            typ: CT_PAYLOAD_DATA,
175            flags,
176            value_length: self.value_length() as u16,
177        }
178    }
179
180    fn unmarshal(raw: &Bytes) -> Result<Self> {
181        let header = ChunkHeader::unmarshal(raw)?;
182
183        if header.typ != CT_PAYLOAD_DATA {
184            return Err(Error::ErrChunkTypeNotPayloadData);
185        }
186
187        let immediate_sack = (header.flags & PAYLOAD_DATA_IMMEDIATE_SACK) != 0;
188        let unordered = (header.flags & PAYLOAD_DATA_UNORDERED_BITMASK) != 0;
189        let beginning_fragment = (header.flags & PAYLOAD_DATA_BEGINNING_FRAGMENT_BITMASK) != 0;
190        let ending_fragment = (header.flags & PAYLOAD_DATA_ENDING_FRAGMENT_BITMASK) != 0;
191
192        // validity of value_length is checked in ChunkHeader::unmarshal
193        if header.value_length() < PAYLOAD_DATA_HEADER_SIZE {
194            return Err(Error::ErrChunkPayloadSmall);
195        }
196
197        let reader = &mut raw.slice(CHUNK_HEADER_SIZE..CHUNK_HEADER_SIZE + header.value_length());
198
199        let tsn = reader.get_u32();
200        let stream_identifier = reader.get_u16();
201        let stream_sequence_number = reader.get_u16();
202        let payload_type: PayloadProtocolIdentifier = reader.get_u32().into();
203        let user_data = raw.slice(
204            CHUNK_HEADER_SIZE + PAYLOAD_DATA_HEADER_SIZE..CHUNK_HEADER_SIZE + header.value_length(),
205        );
206
207        Ok(ChunkPayloadData {
208            unordered,
209            beginning_fragment,
210            ending_fragment,
211            immediate_sack,
212
213            tsn,
214            stream_identifier,
215            stream_sequence_number,
216            payload_type,
217            user_data,
218            acked: false,
219            miss_indicator: 0,
220            since: SystemTime::now(),
221            nsent: 0,
222            abandoned: Arc::new(AtomicBool::new(false)),
223            all_inflight: Arc::new(AtomicBool::new(false)),
224            retransmit: false,
225        })
226    }
227
228    fn marshal_to(&self, writer: &mut BytesMut) -> Result<usize> {
229        self.header().marshal_to(writer)?;
230
231        writer.put_u32(self.tsn);
232        writer.put_u16(self.stream_identifier);
233        writer.put_u16(self.stream_sequence_number);
234        writer.put_u32(self.payload_type as u32);
235        writer.extend_from_slice(&self.user_data);
236
237        Ok(writer.len())
238    }
239
240    fn check(&self) -> Result<()> {
241        Ok(())
242    }
243
244    fn value_length(&self) -> usize {
245        PAYLOAD_DATA_HEADER_SIZE + self.user_data.len()
246    }
247
248    fn as_any(&self) -> &(dyn Any + Send + Sync) {
249        self
250    }
251}
252
253impl ChunkPayloadData {
254    pub(crate) fn abandoned(&self) -> bool {
255        let (abandoned, all_inflight) = (
256            self.abandoned.load(Ordering::SeqCst),
257            self.all_inflight.load(Ordering::SeqCst),
258        );
259
260        abandoned && all_inflight
261    }
262
263    pub(crate) fn set_abandoned(&self, abandoned: bool) {
264        self.abandoned.store(abandoned, Ordering::SeqCst);
265    }
266
267    pub(crate) fn set_all_inflight(&mut self) {
268        if self.ending_fragment {
269            self.all_inflight.store(true, Ordering::SeqCst);
270        }
271    }
272}