sctp_proto/association/
mod.rs

1use crate::association::{
2    state::{AckMode, AckState, AssociationState},
3    stats::AssociationStats,
4};
5use crate::chunk::{
6    chunk_abort::ChunkAbort, chunk_cookie_ack::ChunkCookieAck, chunk_cookie_echo::ChunkCookieEcho,
7    chunk_error::ChunkError, chunk_forward_tsn::ChunkForwardTsn,
8    chunk_forward_tsn::ChunkForwardTsnStream, chunk_heartbeat::ChunkHeartbeat,
9    chunk_heartbeat_ack::ChunkHeartbeatAck, chunk_init::ChunkInit, chunk_init::ChunkInitAck,
10    chunk_payload_data::ChunkPayloadData, chunk_payload_data::PayloadProtocolIdentifier,
11    chunk_reconfig::ChunkReconfig, chunk_selective_ack::ChunkSelectiveAck,
12    chunk_shutdown::ChunkShutdown, chunk_shutdown_ack::ChunkShutdownAck,
13    chunk_shutdown_complete::ChunkShutdownComplete, chunk_type::CT_FORWARD_TSN, Chunk,
14    ErrorCauseUnrecognizedChunkType, USER_INITIATED_ABORT,
15};
16use crate::config::{ServerConfig, TransportConfig, COMMON_HEADER_SIZE, DATA_CHUNK_HEADER_SIZE};
17use crate::error::{Error, Result};
18use crate::packet::{CommonHeader, Packet};
19use crate::param::{
20    param_heartbeat_info::ParamHeartbeatInfo,
21    param_outgoing_reset_request::ParamOutgoingResetRequest,
22    param_reconfig_response::{ParamReconfigResponse, ReconfigResult},
23    param_state_cookie::ParamStateCookie,
24    param_supported_extensions::ParamSupportedExtensions,
25    Param,
26};
27use crate::queue::{payload_queue::PayloadQueue, pending_queue::PendingQueue};
28use crate::shared::{AssociationEventInner, AssociationId, EndpointEvent, EndpointEventInner};
29use crate::util::{sna16lt, sna32gt, sna32gte, sna32lt, sna32lte};
30use crate::{AssociationEvent, Payload, Side, Transmit};
31use stream::{ReliabilityType, Stream, StreamEvent, StreamId, StreamState};
32use timer::{RtoManager, Timer, TimerTable, ACK_INTERVAL};
33
34use crate::association::stream::RecvSendState;
35use bytes::Bytes;
36use fxhash::FxHashMap;
37use log::{debug, error, trace, warn};
38use rand::random;
39use std::collections::{HashMap, VecDeque};
40use std::net::{IpAddr, SocketAddr};
41use std::str::FromStr;
42use std::sync::Arc;
43use std::time::{Duration, Instant};
44use thiserror::Error;
45
46pub(crate) mod state;
47pub(crate) mod stats;
48pub(crate) mod stream;
49mod timer;
50
51#[cfg(test)]
52mod association_test;
53
54/// Reasons why an association might be lost
55#[derive(Debug, Error, Eq, Clone, PartialEq)]
56pub enum AssociationError {
57    /// Handshake failed
58    #[error("{0}")]
59    HandshakeFailed(#[from] Error),
60    /// The peer violated the QUIC specification as understood by this implementation
61    #[error("transport error")]
62    TransportError,
63    /// The peer's QUIC stack aborted the association automatically
64    #[error("aborted by peer")]
65    AssociationClosed,
66    /// The peer closed the association
67    #[error("closed by peer")]
68    ApplicationClosed,
69    /// The peer is unable to continue processing this association, usually due to having restarted
70    #[error("reset by peer")]
71    Reset,
72    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
73    ///
74    /// If neither side is sending keep-alives, an association will time out after a long enough idle
75    /// period even if the peer is still reachable
76    #[error("timed out")]
77    TimedOut,
78    /// The local application closed the association
79    #[error("closed")]
80    LocallyClosed,
81}
82
83/// Events of interest to the application
84#[derive(Debug)]
85pub enum Event {
86    /// The association was successfully established
87    Connected,
88    /// The association was lost
89    ///
90    /// Emitted if the peer closes the association or an error is encountered.
91    AssociationLost {
92        /// Reason that the association was closed
93        reason: AssociationError,
94    },
95    /// Stream events
96    Stream(StreamEvent),
97    /// One or more application datagrams have been received
98    DatagramReceived,
99}
100
101///Association represents an SCTP association
102//13.2.  Parameters Necessary per Association (i.e., the TCB)
103//Peer : Tag value to be sent in every packet and is received
104//Verification: in the INIT or INIT ACK chunk.
105//Tag :
106//
107//My : Tag expected in every inbound packet and sent in the
108//Verification: INIT or INIT ACK chunk.
109//
110//Tag :
111//State : A state variable indicating what state the association
112// : is in, i.e., COOKIE-WAIT, COOKIE-ECHOED, ESTABLISHED,
113// : SHUTDOWN-PENDING, SHUTDOWN-SENT, SHUTDOWN-RECEIVED,
114// : SHUTDOWN-ACK-SENT.
115//
116// No Closed state is illustrated since if a
117// association is Closed its TCB SHOULD be removed.
118#[derive(Debug)]
119pub struct Association {
120    side: Side,
121    state: AssociationState,
122    handshake_completed: bool,
123    max_message_size: u32,
124    inflight_queue_length: usize,
125    will_send_shutdown: bool,
126    bytes_received: usize,
127    bytes_sent: usize,
128
129    peer_verification_tag: u32,
130    my_verification_tag: u32,
131    my_next_tsn: u32,
132    peer_last_tsn: u32,
133    // for RTT measurement
134    min_tsn2measure_rtt: u32,
135    will_send_forward_tsn: bool,
136    will_retransmit_fast: bool,
137    will_retransmit_reconfig: bool,
138
139    will_send_shutdown_ack: bool,
140    will_send_shutdown_complete: bool,
141
142    // Reconfig
143    my_next_rsn: u32,
144    reconfigs: FxHashMap<u32, ChunkReconfig>,
145    reconfig_requests: FxHashMap<u32, ParamOutgoingResetRequest>,
146
147    // Non-RFC internal data
148    remote_addr: SocketAddr,
149    local_ip: Option<IpAddr>,
150    source_port: u16,
151    destination_port: u16,
152    my_max_num_inbound_streams: u16,
153    my_max_num_outbound_streams: u16,
154    my_cookie: Option<ParamStateCookie>,
155
156    payload_queue: PayloadQueue,
157    inflight_queue: PayloadQueue,
158    pending_queue: PendingQueue,
159    control_queue: VecDeque<Packet>,
160    stream_queue: VecDeque<u16>,
161
162    pub(crate) mtu: u32,
163    // max DATA chunk payload size
164    max_payload_size: u32,
165    cumulative_tsn_ack_point: u32,
166    advanced_peer_tsn_ack_point: u32,
167    use_forward_tsn: bool,
168
169    pub(crate) rto_mgr: RtoManager,
170    timers: TimerTable,
171
172    // Congestion control parameters
173    max_receive_buffer_size: u32,
174    // my congestion window size
175    pub(crate) cwnd: u32,
176    // calculated peer's receiver windows size
177    rwnd: u32,
178    // slow start threshold
179    pub(crate) ssthresh: u32,
180    partial_bytes_acked: u32,
181    pub(crate) in_fast_recovery: bool,
182    fast_recover_exit_point: u32,
183
184    // Chunks stored for retransmission
185    stored_init: Option<ChunkInit>,
186    stored_cookie_echo: Option<ChunkCookieEcho>,
187    pub(crate) streams: FxHashMap<StreamId, StreamState>,
188
189    events: VecDeque<Event>,
190    endpoint_events: VecDeque<EndpointEventInner>,
191    error: Option<AssociationError>,
192
193    // per inbound packet context
194    delayed_ack_triggered: bool,
195    immediate_ack_triggered: bool,
196
197    pub(crate) stats: AssociationStats,
198    ack_state: AckState,
199
200    // for testing
201    pub(crate) ack_mode: AckMode,
202}
203
204impl Default for Association {
205    fn default() -> Self {
206        Association {
207            side: Side::default(),
208            state: AssociationState::default(),
209            handshake_completed: false,
210            max_message_size: 0,
211            inflight_queue_length: 0,
212            will_send_shutdown: false,
213            bytes_received: 0,
214            bytes_sent: 0,
215
216            peer_verification_tag: 0,
217            my_verification_tag: 0,
218            my_next_tsn: 0,
219            peer_last_tsn: 0,
220            // for RTT measurement
221            min_tsn2measure_rtt: 0,
222            will_send_forward_tsn: false,
223            will_retransmit_fast: false,
224            will_retransmit_reconfig: false,
225
226            will_send_shutdown_ack: false,
227            will_send_shutdown_complete: false,
228
229            // Reconfig
230            my_next_rsn: 0,
231            reconfigs: FxHashMap::default(),
232            reconfig_requests: FxHashMap::default(),
233
234            // Non-RFC internal data
235            remote_addr: SocketAddr::from_str("0.0.0.0:0").unwrap(),
236            local_ip: None,
237            source_port: 0,
238            destination_port: 0,
239            my_max_num_inbound_streams: 0,
240            my_max_num_outbound_streams: 0,
241            my_cookie: None,
242
243            payload_queue: PayloadQueue::default(),
244            inflight_queue: PayloadQueue::default(),
245            pending_queue: PendingQueue::default(),
246            control_queue: VecDeque::default(),
247            stream_queue: VecDeque::default(),
248
249            mtu: 0,
250            // max DATA chunk payload size
251            max_payload_size: 0,
252            cumulative_tsn_ack_point: 0,
253            advanced_peer_tsn_ack_point: 0,
254            use_forward_tsn: false,
255
256            rto_mgr: RtoManager::default(),
257            timers: TimerTable::default(),
258
259            // Congestion control parameters
260            max_receive_buffer_size: 0,
261            // my congestion window size
262            cwnd: 0,
263            // calculated peer's receiver windows size
264            rwnd: 0,
265            // slow start threshold
266            ssthresh: 0,
267            partial_bytes_acked: 0,
268            in_fast_recovery: false,
269            fast_recover_exit_point: 0,
270
271            // Chunks stored for retransmission
272            stored_init: None,
273            stored_cookie_echo: None,
274            streams: FxHashMap::default(),
275
276            events: VecDeque::default(),
277            endpoint_events: VecDeque::default(),
278            error: None,
279
280            // per inbound packet context
281            delayed_ack_triggered: false,
282            immediate_ack_triggered: false,
283
284            stats: AssociationStats::default(),
285            ack_state: AckState::default(),
286
287            // for testing
288            ack_mode: AckMode::default(),
289        }
290    }
291}
292
293impl Association {
294    pub(crate) fn new(
295        server_config: Option<Arc<ServerConfig>>,
296        config: Arc<TransportConfig>,
297        max_payload_size: u32,
298        local_aid: AssociationId,
299        remote_addr: SocketAddr,
300        local_ip: Option<IpAddr>,
301        now: Instant,
302    ) -> Self {
303        let side = if server_config.is_some() {
304            Side::Server
305        } else {
306            Side::Client
307        };
308
309        // It's a bit strange, but we're going backwards from the calculation in
310        // config.rs to get max_payload_size from INITIAL_MTU.
311        let mtu = max_payload_size + COMMON_HEADER_SIZE + DATA_CHUNK_HEADER_SIZE;
312
313        // RFC 4690 Sec 7.2.1
314        // The initial cwnd before DATA transmission or after a sufficiently
315        // long idle period MUST be set to min(4*MTU, max (2*MTU, 4380bytes)).
316        let cwnd = (2 * mtu).clamp(4380, 4 * mtu);
317        let mut tsn = random::<u32>();
318        if tsn == 0 {
319            tsn += 1;
320        }
321
322        let mut this = Association {
323            side,
324            handshake_completed: false,
325            max_receive_buffer_size: config.max_receive_buffer_size(),
326            max_message_size: config.max_message_size(),
327            my_max_num_outbound_streams: config.max_num_outbound_streams(),
328            my_max_num_inbound_streams: config.max_num_inbound_streams(),
329            max_payload_size,
330
331            rto_mgr: RtoManager::new(),
332            timers: TimerTable::new(),
333
334            mtu,
335            cwnd,
336            remote_addr,
337            local_ip,
338
339            my_verification_tag: local_aid,
340            my_next_tsn: tsn,
341            my_next_rsn: tsn,
342            min_tsn2measure_rtt: tsn,
343            cumulative_tsn_ack_point: tsn - 1,
344            advanced_peer_tsn_ack_point: tsn - 1,
345            error: None,
346
347            ..Default::default()
348        };
349
350        if side.is_client() {
351            let mut init = ChunkInit {
352                initial_tsn: this.my_next_tsn,
353                num_outbound_streams: this.my_max_num_outbound_streams,
354                num_inbound_streams: this.my_max_num_inbound_streams,
355                initiate_tag: this.my_verification_tag,
356                advertised_receiver_window_credit: this.max_receive_buffer_size,
357                ..Default::default()
358            };
359            init.set_supported_extensions();
360
361            this.set_state(AssociationState::CookieWait);
362            this.stored_init = Some(init);
363            let _ = this.send_init();
364            this.timers
365                .start(Timer::T1Init, now, this.rto_mgr.get_rto());
366        }
367
368        this
369    }
370
371    /// Returns application-facing event
372    ///
373    /// Associations should be polled for events after:
374    /// - a call was made to `handle_event`
375    /// - a call was made to `handle_timeout`
376    #[must_use]
377    pub fn poll(&mut self) -> Option<Event> {
378        if let Some(x) = self.events.pop_front() {
379            return Some(x);
380        }
381
382        /*TODO: if let Some(event) = self.streams.poll() {
383            return Some(Event::Stream(event));
384        }*/
385
386        if let Some(err) = self.error.take() {
387            return Some(Event::AssociationLost { reason: err });
388        }
389
390        None
391    }
392
393    /// Return endpoint-facing event
394    #[must_use]
395    pub fn poll_endpoint_event(&mut self) -> Option<EndpointEvent> {
396        self.endpoint_events.pop_front().map(EndpointEvent)
397    }
398
399    /// Returns the next time at which `handle_timeout` should be called
400    ///
401    /// The value returned may change after:
402    /// - the application performed some I/O on the association
403    /// - a call was made to `handle_transmit`
404    /// - a call to `poll_transmit` returned `Some`
405    /// - a call was made to `handle_timeout`
406    #[must_use]
407    pub fn poll_timeout(&mut self) -> Option<Instant> {
408        self.timers.next_timeout()
409    }
410
411    /// Returns packets to transmit
412    ///
413    /// Associations should be polled for transmit after:
414    /// - the application performed some I/O on the Association
415    /// - a call was made to `handle_event`
416    /// - a call was made to `handle_timeout`
417    #[must_use]
418    pub fn poll_transmit(&mut self, now: Instant) -> Option<Transmit> {
419        let (contents, _) = self.gather_outbound(now);
420        if contents.is_empty() {
421            None
422        } else {
423            trace!(
424                "[{}] sending {} bytes (total {} datagrams)",
425                self.side,
426                contents.iter().fold(0, |l, c| l + c.len()),
427                contents.len()
428            );
429            Some(Transmit {
430                now,
431                remote: self.remote_addr,
432                payload: Payload::RawEncode(contents),
433                ecn: None,
434                local_ip: self.local_ip,
435            })
436        }
437    }
438
439    /// Process timer expirations
440    ///
441    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
442    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
443    /// methods.
444    ///
445    /// It is most efficient to call this immediately after the system clock reaches the latest
446    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
447    /// no-op and therefore are safe.
448    pub fn handle_timeout(&mut self, now: Instant) {
449        for &timer in &Timer::VALUES {
450            let (expired, failure, n_rtos) = self.timers.is_expired(timer, now);
451            if !expired {
452                continue;
453            }
454            self.timers.set(timer, None);
455            //trace!("{:?} timeout", timer);
456
457            if timer == Timer::Ack {
458                self.on_ack_timeout();
459            } else if failure {
460                self.on_retransmission_failure(timer);
461            } else {
462                self.on_retransmission_timeout(timer, n_rtos);
463                self.timers.start(timer, now, self.rto_mgr.get_rto());
464            }
465        }
466    }
467
468    /// Process `AssociationEvent`s generated by the associated `Endpoint`
469    ///
470    /// Will execute protocol logic upon receipt of an association event, in turn preparing signals
471    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
472    /// extracted through the relevant methods.
473    pub fn handle_event(&mut self, event: AssociationEvent) {
474        match event.0 {
475            AssociationEventInner::Datagram(transmit) => {
476                // If this packet could initiate a migration and we're a client or a server that
477                // forbids migration, drop the datagram. This could be relaxed to heuristically
478                // permit NAT-rebinding-like migration.
479                /*TODO:if remote != self.remote && self.server_config.as_ref().map_or(true, |x| !x.migration)
480                {
481                    trace!("discarding packet from unrecognized peer {}", remote);
482                    return;
483                }*/
484
485                if let Payload::PartialDecode(partial_decode) = transmit.payload {
486                    trace!(
487                        "[{}] receiving {} bytes",
488                        self.side,
489                        COMMON_HEADER_SIZE as usize + partial_decode.remaining.len()
490                    );
491
492                    let pkt = match partial_decode.finish() {
493                        Ok(p) => p,
494                        Err(err) => {
495                            warn!("[{}] unable to parse SCTP packet {}", self.side, err);
496                            return;
497                        }
498                    };
499
500                    if let Err(err) = self.handle_inbound(pkt, transmit.now) {
501                        error!("handle_inbound got err: {}", err);
502                        let _ = self.close();
503                    }
504                } else {
505                    trace!("discarding invalid partial_decode");
506                }
507            } //TODO:
508        }
509    }
510
511    /// Returns Association statistics
512    pub fn stats(&self) -> AssociationStats {
513        self.stats
514    }
515
516    /// Whether the Association is in the process of being established
517    ///
518    /// If this returns `false`, the Association may be either established or closed, signaled by the
519    /// emission of a `Connected` or `AssociationLost` message respectively.
520    pub fn is_handshaking(&self) -> bool {
521        !self.handshake_completed
522    }
523
524    /// Whether the Association is closed
525    ///
526    /// Closed Associations cannot transport any further data. An association becomes closed when
527    /// either peer application intentionally closes it, or when either transport layer detects an
528    /// error such as a time-out or certificate validation failure.
529    ///
530    /// A `AssociationLost` event is emitted with details when the association becomes closed.
531    pub fn is_closed(&self) -> bool {
532        self.state == AssociationState::Closed
533    }
534
535    /// Whether there is no longer any need to keep the association around
536    ///
537    /// Closed associations become drained after a brief timeout to absorb any remaining in-flight
538    /// packets from the peer. All drained associations have been closed.
539    pub fn is_drained(&self) -> bool {
540        self.state.is_drained()
541    }
542
543    /// Look up whether we're the client or server of this Association
544    pub fn side(&self) -> Side {
545        self.side
546    }
547
548    /// The latest socket address for this Association's peer
549    pub fn remote_addr(&self) -> SocketAddr {
550        self.remote_addr
551    }
552
553    /// Current best estimate of this Association's latency (round-trip-time)
554    pub fn rtt(&self) -> Duration {
555        Duration::from_millis(self.rto_mgr.get_rto())
556    }
557
558    /// The local IP address which was used when the peer established
559    /// the association
560    ///
561    /// This can be different from the address the endpoint is bound to, in case
562    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
563    ///
564    /// This will return `None` for clients.
565    ///
566    /// Retrieving the local IP address is currently supported on the following
567    /// platforms:
568    /// - Linux
569    ///
570    /// On all non-supported platforms the local IP address will not be available,
571    /// and the method will return `None`.
572    pub fn local_ip(&self) -> Option<IpAddr> {
573        self.local_ip
574    }
575
576    /// Shutdown initiates the shutdown sequence. The method blocks until the
577    /// shutdown sequence is completed and the association is closed, or until the
578    /// passed context is done, in which case the context's error is returned.
579    pub fn shutdown(&mut self) -> Result<()> {
580        debug!("[{}] closing association..", self.side);
581
582        let state = self.state();
583        if state != AssociationState::Established {
584            return Err(Error::ErrShutdownNonEstablished);
585        }
586
587        // Attempt a graceful shutdown.
588        self.set_state(AssociationState::ShutdownPending);
589
590        if self.inflight_queue_length == 0 {
591            // No more outstanding, send shutdown.
592            self.will_send_shutdown = true;
593            self.awake_write_loop();
594            self.set_state(AssociationState::ShutdownSent);
595        }
596
597        self.endpoint_events.push_back(EndpointEventInner::Drained);
598
599        Ok(())
600    }
601
602    /// Close ends the SCTP Association and cleans up any state
603    pub fn close(&mut self) -> Result<()> {
604        if self.state() != AssociationState::Closed {
605            self.set_state(AssociationState::Closed);
606
607            debug!("[{}] closing association..", self.side);
608
609            self.close_all_timers();
610
611            for si in self.streams.keys().cloned().collect::<Vec<u16>>() {
612                self.unregister_stream(si);
613            }
614
615            debug!("[{}] association closed", self.side);
616            debug!(
617                "[{}] stats nDATAs (in) : {}",
618                self.side,
619                self.stats.get_num_datas()
620            );
621            debug!(
622                "[{}] stats nSACKs (in) : {}",
623                self.side,
624                self.stats.get_num_sacks()
625            );
626            debug!(
627                "[{}] stats nT3Timeouts : {}",
628                self.side,
629                self.stats.get_num_t3timeouts()
630            );
631            debug!(
632                "[{}] stats nAckTimeouts: {}",
633                self.side,
634                self.stats.get_num_ack_timeouts()
635            );
636            debug!(
637                "[{}] stats nFastRetrans: {}",
638                self.side,
639                self.stats.get_num_fast_retrans()
640            );
641        }
642
643        Ok(())
644    }
645
646    /// open_stream opens a stream
647    pub fn open_stream(
648        &mut self,
649        stream_identifier: StreamId,
650        default_payload_type: PayloadProtocolIdentifier,
651    ) -> Result<Stream<'_>> {
652        if self.streams.contains_key(&stream_identifier) {
653            return Err(Error::ErrStreamAlreadyExist);
654        }
655
656        if let Some(s) = self.create_stream(stream_identifier, false, default_payload_type) {
657            Ok(s)
658        } else {
659            Err(Error::ErrStreamCreateFailed)
660        }
661    }
662
663    /// accept_stream accepts a stream
664    pub fn accept_stream(&mut self) -> Option<Stream<'_>> {
665        self.stream_queue
666            .pop_front()
667            .map(move |stream_identifier| Stream {
668                stream_identifier,
669                association: self,
670            })
671    }
672
673    /// stream returns a stream
674    pub fn stream(&mut self, stream_identifier: StreamId) -> Result<Stream<'_>> {
675        if !self.streams.contains_key(&stream_identifier) {
676            Err(Error::ErrStreamNotExisted)
677        } else {
678            Ok(Stream {
679                stream_identifier,
680                association: self,
681            })
682        }
683    }
684
685    /// bytes_sent returns the number of bytes sent
686    pub(crate) fn bytes_sent(&self) -> usize {
687        self.bytes_sent
688    }
689
690    /// bytes_received returns the number of bytes received
691    pub(crate) fn bytes_received(&self) -> usize {
692        self.bytes_received
693    }
694
695    /// max_message_size returns the maximum message size you can send.
696    pub(crate) fn max_message_size(&self) -> u32 {
697        self.max_message_size
698    }
699
700    /// set_max_message_size sets the maximum message size you can send.
701    pub(crate) fn set_max_message_size(&mut self, max_message_size: u32) {
702        self.max_message_size = max_message_size;
703    }
704
705    /// unregister_stream un-registers a stream from the association
706    /// The caller should hold the association write lock.
707    fn unregister_stream(&mut self, stream_identifier: StreamId) {
708        if let Some(mut s) = self.streams.remove(&stream_identifier) {
709            debug!("[{}] unregister_stream {}", self.side, stream_identifier);
710            s.state = RecvSendState::Closed;
711        }
712    }
713
714    /// set_state atomically sets the state of the Association.
715    fn set_state(&mut self, new_state: AssociationState) {
716        if new_state != self.state {
717            debug!(
718                "[{}] state change: '{}' => '{}'",
719                self.side, self.state, new_state,
720            );
721        }
722        self.state = new_state;
723    }
724
725    /// state atomically returns the state of the Association.
726    pub(crate) fn state(&self) -> AssociationState {
727        self.state
728    }
729
730    /// caller must hold self.lock
731    fn send_init(&mut self) -> Result<()> {
732        if let Some(stored_init) = &self.stored_init {
733            debug!("[{}] sending INIT", self.side);
734
735            self.source_port = 5000; // Spec??
736            self.destination_port = 5000; // Spec??
737
738            let outbound = Packet {
739                common_header: CommonHeader {
740                    source_port: self.source_port,
741                    destination_port: self.destination_port,
742                    verification_tag: self.peer_verification_tag,
743                },
744                chunks: vec![Box::new(stored_init.clone())],
745            };
746
747            self.control_queue.push_back(outbound);
748            self.awake_write_loop();
749
750            Ok(())
751        } else {
752            Err(Error::ErrInitNotStoredToSend)
753        }
754    }
755
756    /// caller must hold self.lock
757    fn send_cookie_echo(&mut self) -> Result<()> {
758        if let Some(stored_cookie_echo) = &self.stored_cookie_echo {
759            debug!("[{}] sending COOKIE-ECHO", self.side);
760
761            let outbound = Packet {
762                common_header: CommonHeader {
763                    source_port: self.source_port,
764                    destination_port: self.destination_port,
765                    verification_tag: self.peer_verification_tag,
766                },
767                chunks: vec![Box::new(stored_cookie_echo.clone())],
768            };
769
770            self.control_queue.push_back(outbound);
771            self.awake_write_loop();
772
773            Ok(())
774        } else {
775            Err(Error::ErrCookieEchoNotStoredToSend)
776        }
777    }
778
779    /// handle_inbound parses incoming raw packets
780    fn handle_inbound(&mut self, p: Packet, now: Instant) -> Result<()> {
781        if let Err(err) = p.check_packet() {
782            warn!("[{}] failed validating packet {}", self.side, err);
783            return Ok(());
784        }
785
786        self.handle_chunk_start();
787
788        for c in &p.chunks {
789            self.handle_chunk(&p, c, now)?;
790        }
791
792        self.handle_chunk_end(now);
793
794        Ok(())
795    }
796
797    fn handle_chunk_start(&mut self) {
798        self.delayed_ack_triggered = false;
799        self.immediate_ack_triggered = false;
800    }
801
802    fn handle_chunk_end(&mut self, now: Instant) {
803        if self.immediate_ack_triggered {
804            self.ack_state = AckState::Immediate;
805            self.timers.stop(Timer::Ack);
806            self.awake_write_loop();
807        } else if self.delayed_ack_triggered {
808            // Will send delayed ack in the next ack timeout
809            self.ack_state = AckState::Delay;
810            self.timers.start(Timer::Ack, now, ACK_INTERVAL);
811        }
812    }
813
814    #[allow(clippy::borrowed_box)]
815    fn handle_chunk(
816        &mut self,
817        p: &Packet,
818        chunk: &Box<dyn Chunk + Send + Sync>,
819        now: Instant,
820    ) -> Result<()> {
821        chunk.check()?;
822        let chunk_any = chunk.as_any();
823        let packets = if let Some(c) = chunk_any.downcast_ref::<ChunkInit>() {
824            if c.is_ack {
825                self.handle_init_ack(p, c, now)?
826            } else {
827                self.handle_init(p, c)?
828            }
829        } else if let Some(c) = chunk_any.downcast_ref::<ChunkAbort>() {
830            let mut err_str = String::new();
831            for e in &c.error_causes {
832                if matches!(e.code, USER_INITIATED_ABORT) {
833                    debug!("User initiated abort received");
834                    let _ = self.close();
835                    return Ok(());
836                }
837                err_str += &format!("({})", e);
838            }
839            return Err(Error::ErrAbortChunk(err_str));
840        } else if let Some(c) = chunk_any.downcast_ref::<ChunkError>() {
841            let mut err_str = String::new();
842            for e in &c.error_causes {
843                err_str += &format!("({})", e);
844            }
845            return Err(Error::ErrAbortChunk(err_str));
846        } else if let Some(c) = chunk_any.downcast_ref::<ChunkHeartbeat>() {
847            self.handle_heartbeat(c)?
848        } else if let Some(c) = chunk_any.downcast_ref::<ChunkCookieEcho>() {
849            self.handle_cookie_echo(c)?
850        } else if chunk_any.downcast_ref::<ChunkCookieAck>().is_some() {
851            self.handle_cookie_ack()?
852        } else if let Some(c) = chunk_any.downcast_ref::<ChunkPayloadData>() {
853            self.handle_data(c)?
854        } else if let Some(c) = chunk_any.downcast_ref::<ChunkSelectiveAck>() {
855            self.handle_sack(c, now)?
856        } else if let Some(c) = chunk_any.downcast_ref::<ChunkReconfig>() {
857            self.handle_reconfig(c)?
858        } else if let Some(c) = chunk_any.downcast_ref::<ChunkForwardTsn>() {
859            self.handle_forward_tsn(c)?
860        } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdown>() {
861            self.handle_shutdown(c)?
862        } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownAck>() {
863            self.handle_shutdown_ack(c)?
864        } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownComplete>() {
865            self.handle_shutdown_complete(c)?
866        } else {
867            return Err(Error::ErrChunkTypeUnhandled);
868        };
869
870        if !packets.is_empty() {
871            let mut buf: VecDeque<_> = packets.into_iter().collect();
872            self.control_queue.append(&mut buf);
873            self.awake_write_loop();
874        }
875
876        Ok(())
877    }
878
879    fn handle_init(&mut self, p: &Packet, i: &ChunkInit) -> Result<Vec<Packet>> {
880        let state = self.state();
881        debug!("[{}] chunkInit received in state '{}'", self.side, state);
882
883        // https://tools.ietf.org/html/rfc4960#section-5.2.1
884        // Upon receipt of an INIT in the COOKIE-WAIT state, an endpoint MUST
885        // respond with an INIT ACK using the same parameters it sent in its
886        // original INIT chunk (including its Initiate Tag, unchanged).  When
887        // responding, the endpoint MUST send the INIT ACK back to the same
888        // address that the original INIT (sent by this endpoint) was sent.
889
890        if state != AssociationState::Closed
891            && state != AssociationState::CookieWait
892            && state != AssociationState::CookieEchoed
893        {
894            // 5.2.2.  Unexpected INIT in States Other than CLOSED, COOKIE-ECHOED,
895            //        COOKIE-WAIT, and SHUTDOWN-ACK-SENT
896            return Err(Error::ErrHandleInitState);
897        }
898
899        // Should we be setting any of these permanently until we've ACKed further?
900        self.my_max_num_inbound_streams =
901            std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
902        self.my_max_num_outbound_streams =
903            std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
904        self.peer_verification_tag = i.initiate_tag;
905        self.source_port = p.common_header.destination_port;
906        self.destination_port = p.common_header.source_port;
907
908        // 13.2 This is the last TSN received in sequence.  This value
909        // is set initially by taking the peer's initial TSN,
910        // received in the INIT or INIT ACK chunk, and
911        // subtracting one from it.
912        self.peer_last_tsn = if i.initial_tsn == 0 {
913            u32::MAX
914        } else {
915            i.initial_tsn - 1
916        };
917
918        for param in &i.params {
919            if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
920                for t in &v.chunk_types {
921                    if *t == CT_FORWARD_TSN {
922                        debug!("[{}] use ForwardTSN (on init)", self.side);
923                        self.use_forward_tsn = true;
924                    }
925                }
926            }
927        }
928        if !self.use_forward_tsn {
929            warn!("[{}] not using ForwardTSN (on init)", self.side);
930        }
931
932        let mut outbound = Packet {
933            common_header: CommonHeader {
934                verification_tag: self.peer_verification_tag,
935                source_port: self.source_port,
936                destination_port: self.destination_port,
937            },
938            chunks: vec![],
939        };
940
941        let mut init_ack = ChunkInit {
942            is_ack: true,
943            initial_tsn: self.my_next_tsn,
944            num_outbound_streams: self.my_max_num_outbound_streams,
945            num_inbound_streams: self.my_max_num_inbound_streams,
946            initiate_tag: self.my_verification_tag,
947            advertised_receiver_window_credit: self.max_receive_buffer_size,
948            ..Default::default()
949        };
950
951        if self.my_cookie.is_none() {
952            self.my_cookie = Some(ParamStateCookie::new());
953        }
954
955        if let Some(my_cookie) = &self.my_cookie {
956            init_ack.params = vec![Box::new(my_cookie.clone())];
957        }
958
959        init_ack.set_supported_extensions();
960
961        outbound.chunks = vec![Box::new(init_ack)];
962
963        Ok(vec![outbound])
964    }
965
966    fn handle_init_ack(
967        &mut self,
968        p: &Packet,
969        i: &ChunkInitAck,
970        now: Instant,
971    ) -> Result<Vec<Packet>> {
972        let state = self.state();
973        debug!("[{}] chunkInitAck received in state '{}'", self.side, state);
974        if state != AssociationState::CookieWait {
975            // RFC 4960
976            // 5.2.3.  Unexpected INIT ACK
977            //   If an INIT ACK is received by an endpoint in any state other than the
978            //   COOKIE-WAIT state, the endpoint should discard the INIT ACK chunk.
979            //   An unexpected INIT ACK usually indicates the processing of an old or
980            //   duplicated INIT chunk.
981            return Ok(vec![]);
982        }
983
984        self.my_max_num_inbound_streams =
985            std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
986        self.my_max_num_outbound_streams =
987            std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
988        self.peer_verification_tag = i.initiate_tag;
989        self.peer_last_tsn = if i.initial_tsn == 0 {
990            u32::MAX
991        } else {
992            i.initial_tsn - 1
993        };
994        if self.source_port != p.common_header.destination_port
995            || self.destination_port != p.common_header.source_port
996        {
997            warn!("[{}] handle_init_ack: port mismatch", self.side);
998            return Ok(vec![]);
999        }
1000
1001        self.rwnd = i.advertised_receiver_window_credit;
1002        debug!("[{}] initial rwnd={}", self.side, self.rwnd);
1003
1004        // RFC 4690 Sec 7.2.1
1005        //  o  The initial value of ssthresh MAY be arbitrarily high (for
1006        //     example, implementations MAY use the size of the receiver
1007        //     advertised window).
1008        self.ssthresh = self.rwnd;
1009        trace!(
1010            "[{}] updated cwnd={} ssthresh={} inflight={} (INI)",
1011            self.side,
1012            self.cwnd,
1013            self.ssthresh,
1014            self.inflight_queue.get_num_bytes()
1015        );
1016
1017        self.timers.stop(Timer::T1Init);
1018        self.stored_init = None;
1019
1020        let mut cookie_param = None;
1021        for param in &i.params {
1022            if let Some(v) = param.as_any().downcast_ref::<ParamStateCookie>() {
1023                cookie_param = Some(v);
1024            } else if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
1025                for t in &v.chunk_types {
1026                    if *t == CT_FORWARD_TSN {
1027                        debug!("[{}] use ForwardTSN (on initAck)", self.side);
1028                        self.use_forward_tsn = true;
1029                    }
1030                }
1031            }
1032        }
1033        if !self.use_forward_tsn {
1034            warn!("[{}] not using ForwardTSN (on initAck)", self.side);
1035        }
1036
1037        if let Some(v) = cookie_param {
1038            self.stored_cookie_echo = Some(ChunkCookieEcho {
1039                cookie: v.cookie.clone(),
1040            });
1041
1042            self.send_cookie_echo()?;
1043
1044            self.timers
1045                .start(Timer::T1Cookie, now, self.rto_mgr.get_rto());
1046
1047            self.set_state(AssociationState::CookieEchoed);
1048
1049            Ok(vec![])
1050        } else {
1051            Err(Error::ErrInitAckNoCookie)
1052        }
1053    }
1054
1055    fn handle_heartbeat(&self, c: &ChunkHeartbeat) -> Result<Vec<Packet>> {
1056        trace!("[{}] chunkHeartbeat", self.side);
1057        if let Some(p) = c.params.first() {
1058            if let Some(hbi) = p.as_any().downcast_ref::<ParamHeartbeatInfo>() {
1059                return Ok(vec![Packet {
1060                    common_header: CommonHeader {
1061                        verification_tag: self.peer_verification_tag,
1062                        source_port: self.source_port,
1063                        destination_port: self.destination_port,
1064                    },
1065                    chunks: vec![Box::new(ChunkHeartbeatAck {
1066                        params: vec![Box::new(ParamHeartbeatInfo {
1067                            heartbeat_information: hbi.heartbeat_information.clone(),
1068                        })],
1069                    })],
1070                }]);
1071            } else {
1072                warn!(
1073                    "[{}] failed to handle Heartbeat, no ParamHeartbeatInfo",
1074                    self.side,
1075                );
1076            }
1077        }
1078
1079        Ok(vec![])
1080    }
1081
1082    fn handle_cookie_echo(&mut self, c: &ChunkCookieEcho) -> Result<Vec<Packet>> {
1083        let state = self.state();
1084        debug!("[{}] COOKIE-ECHO received in state '{}'", self.side, state);
1085
1086        if let Some(my_cookie) = &self.my_cookie {
1087            match state {
1088                AssociationState::Established => {
1089                    if my_cookie.cookie != c.cookie {
1090                        return Ok(vec![]);
1091                    }
1092                }
1093                AssociationState::Closed
1094                | AssociationState::CookieWait
1095                | AssociationState::CookieEchoed => {
1096                    if my_cookie.cookie != c.cookie {
1097                        return Ok(vec![]);
1098                    }
1099
1100                    self.timers.stop(Timer::T1Init);
1101                    self.stored_init = None;
1102
1103                    self.timers.stop(Timer::T1Cookie);
1104                    self.stored_cookie_echo = None;
1105
1106                    self.events.push_back(Event::Connected);
1107                    self.set_state(AssociationState::Established);
1108                    self.handshake_completed = true;
1109                }
1110                _ => return Ok(vec![]),
1111            };
1112        } else {
1113            debug!("[{}] COOKIE-ECHO received before initialization", self.side);
1114            return Ok(vec![]);
1115        }
1116
1117        Ok(vec![Packet {
1118            common_header: CommonHeader {
1119                verification_tag: self.peer_verification_tag,
1120                source_port: self.source_port,
1121                destination_port: self.destination_port,
1122            },
1123            chunks: vec![Box::new(ChunkCookieAck {})],
1124        }])
1125    }
1126
1127    fn handle_cookie_ack(&mut self) -> Result<Vec<Packet>> {
1128        let state = self.state();
1129        debug!("[{}] COOKIE-ACK received in state '{}'", self.side, state);
1130        if state != AssociationState::CookieEchoed {
1131            // RFC 4960
1132            // 5.2.5.  Handle Duplicate COOKIE-ACK.
1133            //   At any state other than COOKIE-ECHOED, an endpoint should silently
1134            //   discard a received COOKIE ACK chunk.
1135            return Ok(vec![]);
1136        }
1137
1138        self.timers.stop(Timer::T1Cookie);
1139        self.stored_cookie_echo = None;
1140
1141        self.events.push_back(Event::Connected);
1142        self.set_state(AssociationState::Established);
1143        self.handshake_completed = true;
1144
1145        Ok(vec![])
1146    }
1147
1148    fn handle_data(&mut self, d: &ChunkPayloadData) -> Result<Vec<Packet>> {
1149        trace!(
1150            "[{}] DATA: tsn={} immediateSack={} len={}",
1151            self.side,
1152            d.tsn,
1153            d.immediate_sack,
1154            d.user_data.len()
1155        );
1156        self.stats.inc_datas();
1157
1158        let can_push = self.payload_queue.can_push(d, self.peer_last_tsn);
1159        let mut stream_handle_data = false;
1160        if can_push {
1161            if self.get_or_create_stream(d.stream_identifier).is_some() {
1162                if self.get_my_receiver_window_credit() > 0 {
1163                    // Pass the new chunk to stream level as soon as it arrives
1164                    self.payload_queue.push(d.clone(), self.peer_last_tsn);
1165                    stream_handle_data = true;
1166                } else {
1167                    // Receive buffer is full
1168                    if let Some(last_tsn) = self.payload_queue.get_last_tsn_received() {
1169                        if sna32lt(d.tsn, *last_tsn) {
1170                            debug!("[{}] receive buffer full, but accepted as this is a missing chunk with tsn={} ssn={}", self.side, d.tsn, d.stream_sequence_number);
1171                            self.payload_queue.push(d.clone(), self.peer_last_tsn);
1172                            stream_handle_data = true; //s.handle_data(d.clone());
1173                        }
1174                    } else {
1175                        debug!(
1176                            "[{}] receive buffer full. dropping DATA with tsn={} ssn={}",
1177                            self.side, d.tsn, d.stream_sequence_number
1178                        );
1179                    }
1180                }
1181            } else {
1182                // silently discard the data. (sender will retry on T3-rtx timeout)
1183                // see pion/sctp#30
1184                debug!("[{}] discard {}", self.side, d.stream_sequence_number);
1185                return Ok(vec![]);
1186            }
1187        }
1188
1189        let immediate_sack = d.immediate_sack;
1190
1191        if stream_handle_data {
1192            if let Some(s) = self.streams.get_mut(&d.stream_identifier) {
1193                self.events.push_back(Event::DatagramReceived);
1194                s.handle_data(d);
1195                if s.reassembly_queue.is_readable() {
1196                    self.events.push_back(Event::Stream(StreamEvent::Readable {
1197                        id: d.stream_identifier,
1198                    }))
1199                }
1200            }
1201        }
1202
1203        self.handle_peer_last_tsn_and_acknowledgement(immediate_sack)
1204    }
1205
1206    fn handle_sack(&mut self, d: &ChunkSelectiveAck, now: Instant) -> Result<Vec<Packet>> {
1207        trace!(
1208            "[{}] {}, SACK: cumTSN={} a_rwnd={}",
1209            self.side,
1210            self.cumulative_tsn_ack_point,
1211            d.cumulative_tsn_ack,
1212            d.advertised_receiver_window_credit
1213        );
1214        let state = self.state();
1215        if state != AssociationState::Established
1216            && state != AssociationState::ShutdownPending
1217            && state != AssociationState::ShutdownReceived
1218        {
1219            return Ok(vec![]);
1220        }
1221
1222        self.stats.inc_sacks();
1223
1224        if sna32gt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1225            // RFC 4960 sec 6.2.1.  Processing a Received SACK
1226            // D)
1227            //   i) If Cumulative TSN Ack is less than the Cumulative TSN Ack
1228            //      Point, then drop the SACK.  Since Cumulative TSN Ack is
1229            //      monotonically increasing, a SACK whose Cumulative TSN Ack is
1230            //      less than the Cumulative TSN Ack Point indicates an out-of-
1231            //      order SACK.
1232
1233            debug!(
1234                "[{}] SACK Cumulative ACK {} is older than ACK point {}",
1235                self.side, d.cumulative_tsn_ack, self.cumulative_tsn_ack_point
1236            );
1237
1238            return Ok(vec![]);
1239        }
1240
1241        // Process selective ack
1242        let (bytes_acked_per_stream, htna) = self.process_selective_ack(d, now)?;
1243
1244        let mut total_bytes_acked = 0;
1245        for n_bytes_acked in bytes_acked_per_stream.values() {
1246            total_bytes_acked += *n_bytes_acked;
1247        }
1248
1249        let mut cum_tsn_ack_point_advanced = false;
1250        if sna32lt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1251            trace!(
1252                "[{}] SACK: cumTSN advanced: {} -> {}",
1253                self.side,
1254                self.cumulative_tsn_ack_point,
1255                d.cumulative_tsn_ack
1256            );
1257
1258            self.cumulative_tsn_ack_point = d.cumulative_tsn_ack;
1259            cum_tsn_ack_point_advanced = true;
1260            self.on_cumulative_tsn_ack_point_advanced(total_bytes_acked, now);
1261        }
1262
1263        for (si, n_bytes_acked) in &bytes_acked_per_stream {
1264            if let Some(s) = self.streams.get_mut(si) {
1265                if s.on_buffer_released(*n_bytes_acked) {
1266                    self.events
1267                        .push_back(Event::Stream(StreamEvent::BufferedAmountLow { id: *si }))
1268                }
1269            }
1270        }
1271
1272        // New rwnd value
1273        // RFC 4960 sec 6.2.1.  Processing a Received SACK
1274        // D)
1275        //   ii) Set rwnd equal to the newly received a_rwnd minus the number
1276        //       of bytes still outstanding after processing the Cumulative
1277        //       TSN Ack and the Gap Ack Blocks.
1278
1279        // bytes acked were already subtracted by markAsAcked() method
1280        let bytes_outstanding = self.inflight_queue.get_num_bytes() as u32;
1281        if bytes_outstanding >= d.advertised_receiver_window_credit {
1282            self.rwnd = 0;
1283        } else {
1284            self.rwnd = d.advertised_receiver_window_credit - bytes_outstanding;
1285        }
1286
1287        self.process_fast_retransmission(d.cumulative_tsn_ack, htna, cum_tsn_ack_point_advanced)?;
1288
1289        if self.use_forward_tsn {
1290            // RFC 3758 Sec 3.5 C1
1291            if sna32lt(
1292                self.advanced_peer_tsn_ack_point,
1293                self.cumulative_tsn_ack_point,
1294            ) {
1295                self.advanced_peer_tsn_ack_point = self.cumulative_tsn_ack_point
1296            }
1297
1298            // RFC 3758 Sec 3.5 C2
1299            let mut i = self.advanced_peer_tsn_ack_point + 1;
1300            while let Some(c) = self.inflight_queue.get(i) {
1301                if !c.abandoned() {
1302                    break;
1303                }
1304                self.advanced_peer_tsn_ack_point = i;
1305                i += 1;
1306            }
1307
1308            // RFC 3758 Sec 3.5 C3
1309            if sna32gt(
1310                self.advanced_peer_tsn_ack_point,
1311                self.cumulative_tsn_ack_point,
1312            ) {
1313                self.will_send_forward_tsn = true;
1314                debug!(
1315                    "[{}] handleSack {}: sna32GT({}, {})",
1316                    self.side,
1317                    self.will_send_forward_tsn,
1318                    self.advanced_peer_tsn_ack_point,
1319                    self.cumulative_tsn_ack_point
1320                );
1321            }
1322            self.awake_write_loop();
1323        }
1324
1325        self.postprocess_sack(state, cum_tsn_ack_point_advanced, now);
1326
1327        Ok(vec![])
1328    }
1329
1330    fn handle_reconfig(&mut self, c: &ChunkReconfig) -> Result<Vec<Packet>> {
1331        trace!("[{}] handle_reconfig", self.side);
1332
1333        let mut pp = vec![];
1334
1335        if let Some(param_a) = &c.param_a {
1336            self.handle_reconfig_param(param_a, &mut pp)?;
1337        }
1338
1339        if let Some(param_b) = &c.param_b {
1340            self.handle_reconfig_param(param_b, &mut pp)?;
1341        }
1342
1343        Ok(pp)
1344    }
1345
1346    fn handle_forward_tsn(&mut self, c: &ChunkForwardTsn) -> Result<Vec<Packet>> {
1347        trace!("[{}] FwdTSN: {}", self.side, c.to_string());
1348
1349        if !self.use_forward_tsn {
1350            warn!("[{}] received FwdTSN but not enabled", self.side);
1351            // Return an error chunk
1352            let cerr = ChunkError {
1353                error_causes: vec![ErrorCauseUnrecognizedChunkType::default()],
1354            };
1355
1356            let outbound = Packet {
1357                common_header: CommonHeader {
1358                    verification_tag: self.peer_verification_tag,
1359                    source_port: self.source_port,
1360                    destination_port: self.destination_port,
1361                },
1362                chunks: vec![Box::new(cerr)],
1363            };
1364            return Ok(vec![outbound]);
1365        }
1366
1367        // From RFC 3758 Sec 3.6:
1368        //   Note, if the "New Cumulative TSN" value carried in the arrived
1369        //   FORWARD TSN chunk is found to be behind or at the current cumulative
1370        //   TSN point, the data receiver MUST treat this FORWARD TSN as out-of-
1371        //   date and MUST NOT update its Cumulative TSN.  The receiver SHOULD
1372        //   send a SACK to its peer (the sender of the FORWARD TSN) since such a
1373        //   duplicate may indicate the previous SACK was lost in the network.
1374
1375        trace!(
1376            "[{}] should send ack? newCumTSN={} peer_last_tsn={}",
1377            self.side,
1378            c.new_cumulative_tsn,
1379            self.peer_last_tsn
1380        );
1381        if sna32lte(c.new_cumulative_tsn, self.peer_last_tsn) {
1382            trace!("[{}] sending ack on Forward TSN", self.side);
1383            self.ack_state = AckState::Immediate;
1384            self.timers.stop(Timer::Ack);
1385            self.awake_write_loop();
1386            return Ok(vec![]);
1387        }
1388
1389        // From RFC 3758 Sec 3.6:
1390        //   the receiver MUST perform the same TSN handling, including duplicate
1391        //   detection, gap detection, SACK generation, cumulative TSN
1392        //   advancement, etc. as defined in RFC 2960 [2]---with the following
1393        //   exceptions and additions.
1394
1395        //   When a FORWARD TSN chunk arrives, the data receiver MUST first update
1396        //   its cumulative TSN point to the value carried in the FORWARD TSN
1397        //   chunk,
1398
1399        // Advance peer_last_tsn
1400        while sna32lt(self.peer_last_tsn, c.new_cumulative_tsn) {
1401            self.payload_queue.pop(self.peer_last_tsn + 1); // may not exist
1402            self.peer_last_tsn += 1;
1403        }
1404
1405        // Report new peer_last_tsn value and abandoned largest SSN value to
1406        // corresponding streams so that the abandoned chunks can be removed
1407        // from the reassemblyQueue.
1408        for forwarded in &c.streams {
1409            if let Some(s) = self.streams.get_mut(&forwarded.identifier) {
1410                s.handle_forward_tsn_for_ordered(forwarded.sequence);
1411            }
1412        }
1413
1414        // TSN may be forewared for unordered chunks. ForwardTSN chunk does not
1415        // report which stream identifier it skipped for unordered chunks.
1416        // Therefore, we need to broadcast this event to all existing streams for
1417        // unordered chunks.
1418        // See https://github.com/pion/sctp/issues/106
1419        for s in self.streams.values_mut() {
1420            s.handle_forward_tsn_for_unordered(c.new_cumulative_tsn);
1421        }
1422
1423        self.handle_peer_last_tsn_and_acknowledgement(false)
1424    }
1425
1426    fn handle_shutdown(&mut self, _: &ChunkShutdown) -> Result<Vec<Packet>> {
1427        let state = self.state();
1428
1429        if state == AssociationState::Established {
1430            if !self.inflight_queue.is_empty() {
1431                self.set_state(AssociationState::ShutdownReceived);
1432            } else {
1433                // No more outstanding, send shutdown ack.
1434                self.will_send_shutdown_ack = true;
1435                self.set_state(AssociationState::ShutdownAckSent);
1436
1437                self.awake_write_loop();
1438            }
1439        } else if state == AssociationState::ShutdownSent {
1440            // self.cumulative_tsn_ack_point = c.cumulative_tsn_ack
1441
1442            self.will_send_shutdown_ack = true;
1443            self.set_state(AssociationState::ShutdownAckSent);
1444
1445            self.awake_write_loop();
1446        }
1447
1448        Ok(vec![])
1449    }
1450
1451    fn handle_shutdown_ack(&mut self, _: &ChunkShutdownAck) -> Result<Vec<Packet>> {
1452        let state = self.state();
1453        if state == AssociationState::ShutdownSent || state == AssociationState::ShutdownAckSent {
1454            self.timers.stop(Timer::T2Shutdown);
1455            self.will_send_shutdown_complete = true;
1456
1457            self.awake_write_loop();
1458        }
1459
1460        Ok(vec![])
1461    }
1462
1463    fn handle_shutdown_complete(&mut self, _: &ChunkShutdownComplete) -> Result<Vec<Packet>> {
1464        let state = self.state();
1465        if state == AssociationState::ShutdownAckSent {
1466            self.timers.stop(Timer::T2Shutdown);
1467            self.close()?;
1468        }
1469
1470        Ok(vec![])
1471    }
1472
1473    /// A common routine for handle_data and handle_forward_tsn routines
1474    fn handle_peer_last_tsn_and_acknowledgement(
1475        &mut self,
1476        sack_immediately: bool,
1477    ) -> Result<Vec<Packet>> {
1478        let mut reply = vec![];
1479
1480        // Try to advance peer_last_tsn
1481
1482        // From RFC 3758 Sec 3.6:
1483        //   .. and then MUST further advance its cumulative TSN point locally
1484        //   if possible
1485        // Meaning, if peer_last_tsn+1 points to a chunk that is received,
1486        // advance peer_last_tsn until peer_last_tsn+1 points to unreceived chunk.
1487        //debug!("[{}] peer_last_tsn = {}", self.side, self.peer_last_tsn);
1488        while self.payload_queue.pop(self.peer_last_tsn + 1).is_some() {
1489            self.peer_last_tsn += 1;
1490            //debug!("[{}] peer_last_tsn = {}", self.side, self.peer_last_tsn);
1491
1492            let rst_reqs: Vec<ParamOutgoingResetRequest> =
1493                self.reconfig_requests.values().cloned().collect();
1494            for rst_req in rst_reqs {
1495                self.reset_streams_if_any(&rst_req, false, &mut reply)?;
1496            }
1497        }
1498
1499        let has_packet_loss = !self.payload_queue.is_empty();
1500        if has_packet_loss {
1501            trace!(
1502                "[{}] packetloss: {}",
1503                self.side,
1504                self.payload_queue
1505                    .get_gap_ack_blocks_string(self.peer_last_tsn)
1506            );
1507        }
1508
1509        if (self.ack_state != AckState::Immediate
1510            && !sack_immediately
1511            && !has_packet_loss
1512            && self.ack_mode == AckMode::Normal)
1513            || self.ack_mode == AckMode::AlwaysDelay
1514        {
1515            if self.ack_state == AckState::Idle {
1516                self.delayed_ack_triggered = true;
1517            } else {
1518                self.immediate_ack_triggered = true;
1519            }
1520        } else {
1521            self.immediate_ack_triggered = true;
1522        }
1523
1524        Ok(reply)
1525    }
1526
1527    #[allow(clippy::borrowed_box)]
1528    fn handle_reconfig_param(
1529        &mut self,
1530        raw: &Box<dyn Param + Send + Sync>,
1531        reply: &mut Vec<Packet>,
1532    ) -> Result<()> {
1533        if let Some(p) = raw.as_any().downcast_ref::<ParamOutgoingResetRequest>() {
1534            self.reconfig_requests
1535                .insert(p.reconfig_request_sequence_number, p.clone());
1536            self.reset_streams_if_any(p, true, reply)?;
1537            Ok(())
1538        } else if let Some(p) = raw.as_any().downcast_ref::<ParamReconfigResponse>() {
1539            self.reconfigs.remove(&p.reconfig_response_sequence_number);
1540            if self.reconfigs.is_empty() {
1541                self.timers.stop(Timer::Reconfig);
1542            }
1543            Ok(())
1544        } else {
1545            Err(Error::ErrParameterType)
1546        }
1547    }
1548
1549    fn process_selective_ack(
1550        &mut self,
1551        d: &ChunkSelectiveAck,
1552        now: Instant,
1553    ) -> Result<(HashMap<u16, i64>, u32)> {
1554        let mut bytes_acked_per_stream = HashMap::new();
1555
1556        // New ack point, so pop all ACKed packets from inflight_queue
1557        // We add 1 because the "currentAckPoint" has already been popped from the inflight queue
1558        // For the first SACK we take care of this by setting the ackpoint to cumAck - 1
1559        let mut i = self.cumulative_tsn_ack_point + 1;
1560        //log::debug!("[{}] i={} d={}", self.name, i, d.cumulative_tsn_ack);
1561        while sna32lte(i, d.cumulative_tsn_ack) {
1562            if let Some(c) = self.inflight_queue.pop(i) {
1563                if !c.acked {
1564                    // RFC 4096 sec 6.3.2.  Retransmission Timer Rules
1565                    //   R3)  Whenever a SACK is received that acknowledges the DATA chunk
1566                    //        with the earliest outstanding TSN for that address, restart the
1567                    //        T3-rtx timer for that address with its current RTO (if there is
1568                    //        still outstanding data on that address).
1569                    if i == self.cumulative_tsn_ack_point + 1 {
1570                        // T3 timer needs to be reset. Stop it for now.
1571                        self.timers.stop(Timer::T3RTX);
1572                    }
1573
1574                    let n_bytes_acked = c.user_data.len() as i64;
1575
1576                    // Sum the number of bytes acknowledged per stream
1577                    if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1578                        *amount += n_bytes_acked;
1579                    } else {
1580                        bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1581                    }
1582
1583                    // RFC 4960 sec 6.3.1.  RTO Calculation
1584                    //   C4)  When data is in flight and when allowed by rule C5 below, a new
1585                    //        RTT measurement MUST be made each round trip.  Furthermore, new
1586                    //        RTT measurements SHOULD be made no more than once per round trip
1587                    //        for a given destination transport address.
1588                    //   C5)  Karn's algorithm: RTT measurements MUST NOT be made using
1589                    //        packets that were retransmitted (and thus for which it is
1590                    //        ambiguous whether the reply was for the first instance of the
1591                    //        chunk or for a later instance)
1592                    if c.nsent == 1 && sna32gte(c.tsn, self.min_tsn2measure_rtt) {
1593                        self.min_tsn2measure_rtt = self.my_next_tsn;
1594                        if let Some(since) = &c.since {
1595                            let rtt = now.duration_since(*since);
1596                            let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1597                            trace!(
1598                                "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1599                                self.side,
1600                                rtt.as_millis(),
1601                                srtt,
1602                                self.rto_mgr.get_rto()
1603                            );
1604                        } else {
1605                            error!("[{}] invalid c.since", self.side);
1606                        }
1607                    }
1608                }
1609
1610                if self.in_fast_recovery && c.tsn == self.fast_recover_exit_point {
1611                    debug!("[{}] exit fast-recovery", self.side);
1612                    self.in_fast_recovery = false;
1613                }
1614            } else {
1615                return Err(Error::ErrInflightQueueTsnPop);
1616            }
1617
1618            i += 1;
1619        }
1620
1621        let mut htna = d.cumulative_tsn_ack;
1622
1623        // Mark selectively acknowledged chunks as "acked"
1624        for g in &d.gap_ack_blocks {
1625            for i in g.start..=g.end {
1626                let tsn = d.cumulative_tsn_ack + i as u32;
1627
1628                let (is_existed, is_acked) = if let Some(c) = self.inflight_queue.get(tsn) {
1629                    (true, c.acked)
1630                } else {
1631                    (false, false)
1632                };
1633                let n_bytes_acked = if is_existed && !is_acked {
1634                    self.inflight_queue.mark_as_acked(tsn) as i64
1635                } else {
1636                    0
1637                };
1638
1639                if let Some(c) = self.inflight_queue.get(tsn) {
1640                    if !is_acked {
1641                        // Sum the number of bytes acknowledged per stream
1642                        if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1643                            *amount += n_bytes_acked;
1644                        } else {
1645                            bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1646                        }
1647
1648                        trace!("[{}] tsn={} has been sacked", self.side, c.tsn);
1649
1650                        if c.nsent == 1 {
1651                            self.min_tsn2measure_rtt = self.my_next_tsn;
1652                            if let Some(since) = &c.since {
1653                                let rtt = now.duration_since(*since);
1654                                let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1655                                trace!(
1656                                    "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1657                                    self.side,
1658                                    rtt.as_millis(),
1659                                    srtt,
1660                                    self.rto_mgr.get_rto()
1661                                );
1662                            } else {
1663                                error!("[{}] invalid c.since", self.side);
1664                            }
1665                        }
1666
1667                        if sna32lt(htna, tsn) {
1668                            htna = tsn;
1669                        }
1670                    }
1671                } else {
1672                    return Err(Error::ErrTsnRequestNotExist);
1673                }
1674            }
1675        }
1676
1677        Ok((bytes_acked_per_stream, htna))
1678    }
1679
1680    fn on_cumulative_tsn_ack_point_advanced(&mut self, total_bytes_acked: i64, now: Instant) {
1681        // RFC 4096, sec 6.3.2.  Retransmission Timer Rules
1682        //   R2)  Whenever all outstanding data sent to an address have been
1683        //        acknowledged, turn off the T3-rtx timer of that address.
1684        if self.inflight_queue.is_empty() {
1685            trace!(
1686                "[{}] SACK: no more packet in-flight (pending={})",
1687                self.side,
1688                self.pending_queue.len()
1689            );
1690            self.timers.stop(Timer::T3RTX);
1691        } else {
1692            trace!("[{}] T3-rtx timer start (pt2)", self.side);
1693            self.timers
1694                .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1695        }
1696
1697        // Update congestion control parameters
1698        if self.cwnd <= self.ssthresh {
1699            // RFC 4096, sec 7.2.1.  Slow-Start
1700            //   o  When cwnd is less than or equal to ssthresh, an SCTP endpoint MUST
1701            //		use the slow-start algorithm to increase cwnd only if the current
1702            //      congestion window is being fully utilized, an incoming SACK
1703            //      advances the Cumulative TSN Ack Point, and the data sender is not
1704            //      in Fast Recovery.  Only when these three conditions are met can
1705            //      the cwnd be increased; otherwise, the cwnd MUST not be increased.
1706            //		If these conditions are met, then cwnd MUST be increased by, at
1707            //      most, the lesser of 1) the total size of the previously
1708            //      outstanding DATA chunk(s) acknowledged, and 2) the destination's
1709            //      path MTU.
1710            if !self.in_fast_recovery && !self.pending_queue.is_empty() {
1711                self.cwnd += std::cmp::min(total_bytes_acked as u32, self.cwnd); // TCP way
1712                                                                                 // self.cwnd += min32(uint32(total_bytes_acked), self.mtu) // SCTP way (slow)
1713                trace!(
1714                    "[{}] updated cwnd={} ssthresh={} acked={} (SS)",
1715                    self.side,
1716                    self.cwnd,
1717                    self.ssthresh,
1718                    total_bytes_acked
1719                );
1720            } else {
1721                trace!(
1722                    "[{}] cwnd did not grow: cwnd={} ssthresh={} acked={} FR={} pending={}",
1723                    self.side,
1724                    self.cwnd,
1725                    self.ssthresh,
1726                    total_bytes_acked,
1727                    self.in_fast_recovery,
1728                    self.pending_queue.len()
1729                );
1730            }
1731        } else {
1732            // RFC 4096, sec 7.2.2.  Congestion Avoidance
1733            //   o  Whenever cwnd is greater than ssthresh, upon each SACK arrival
1734            //      that advances the Cumulative TSN Ack Point, increase
1735            //      partial_bytes_acked by the total number of bytes of all new chunks
1736            //      acknowledged in that SACK including chunks acknowledged by the new
1737            //      Cumulative TSN Ack and by Gap Ack Blocks.
1738            self.partial_bytes_acked += total_bytes_acked as u32;
1739
1740            //   o  When partial_bytes_acked is equal to or greater than cwnd and
1741            //      before the arrival of the SACK the sender had cwnd or more bytes
1742            //      of data outstanding (i.e., before arrival of the SACK, flight size
1743            //      was greater than or equal to cwnd), increase cwnd by MTU, and
1744            //      reset partial_bytes_acked to (partial_bytes_acked - cwnd).
1745            if self.partial_bytes_acked >= self.cwnd && !self.pending_queue.is_empty() {
1746                self.partial_bytes_acked -= self.cwnd;
1747                self.cwnd += self.mtu;
1748                trace!(
1749                    "[{}] updated cwnd={} ssthresh={} acked={} (CA)",
1750                    self.side,
1751                    self.cwnd,
1752                    self.ssthresh,
1753                    total_bytes_acked
1754                );
1755            }
1756        }
1757    }
1758
1759    fn process_fast_retransmission(
1760        &mut self,
1761        cum_tsn_ack_point: u32,
1762        htna: u32,
1763        cum_tsn_ack_point_advanced: bool,
1764    ) -> Result<()> {
1765        // HTNA algorithm - RFC 4960 Sec 7.2.4
1766        // Increment missIndicator of each chunks that the SACK reported missing
1767        // when either of the following is met:
1768        // a)  Not in fast-recovery
1769        //     miss indications are incremented only for missing TSNs prior to the
1770        //     highest TSN newly acknowledged in the SACK.
1771        // b)  In fast-recovery AND the Cumulative TSN Ack Point advanced
1772        //     the miss indications are incremented for all TSNs reported missing
1773        //     in the SACK.
1774        if !self.in_fast_recovery || cum_tsn_ack_point_advanced {
1775            let max_tsn = if !self.in_fast_recovery {
1776                // a) increment only for missing TSNs prior to the HTNA
1777                htna
1778            } else {
1779                // b) increment for all TSNs reported missing
1780                cum_tsn_ack_point + (self.inflight_queue.len() as u32) + 1
1781            };
1782
1783            let mut tsn = cum_tsn_ack_point + 1;
1784            while sna32lt(tsn, max_tsn) {
1785                if let Some(c) = self.inflight_queue.get_mut(tsn) {
1786                    if !c.acked && !c.abandoned() && c.miss_indicator < 3 {
1787                        c.miss_indicator += 1;
1788                        if c.miss_indicator == 3 && !self.in_fast_recovery {
1789                            // 2)  If not in Fast Recovery, adjust the ssthresh and cwnd of the
1790                            //     destination address(es) to which the missing DATA chunks were
1791                            //     last sent, according to the formula described in Section 7.2.3.
1792                            self.in_fast_recovery = true;
1793                            self.fast_recover_exit_point = htna;
1794                            self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
1795                            self.cwnd = self.ssthresh;
1796                            self.partial_bytes_acked = 0;
1797                            self.will_retransmit_fast = true;
1798
1799                            trace!(
1800                                "[{}] updated cwnd={} ssthresh={} inflight={} (FR)",
1801                                self.side,
1802                                self.cwnd,
1803                                self.ssthresh,
1804                                self.inflight_queue.get_num_bytes()
1805                            );
1806                        }
1807                    }
1808                } else {
1809                    return Err(Error::ErrTsnRequestNotExist);
1810                }
1811
1812                tsn += 1;
1813            }
1814        }
1815
1816        if self.in_fast_recovery && cum_tsn_ack_point_advanced {
1817            self.will_retransmit_fast = true;
1818        }
1819
1820        Ok(())
1821    }
1822
1823    /// The caller must hold the lock. This method was only added because the
1824    /// linter was complaining about the "cognitive complexity" of handle_sack.
1825    fn postprocess_sack(
1826        &mut self,
1827        state: AssociationState,
1828        mut should_awake_write_loop: bool,
1829        now: Instant,
1830    ) {
1831        if !self.inflight_queue.is_empty() {
1832            // Start timer. (noop if already started)
1833            trace!("[{}] T3-rtx timer start (pt3)", self.side);
1834            self.timers
1835                .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1836        } else if state == AssociationState::ShutdownPending {
1837            // No more outstanding, send shutdown.
1838            should_awake_write_loop = true;
1839            self.will_send_shutdown = true;
1840            self.set_state(AssociationState::ShutdownSent);
1841        } else if state == AssociationState::ShutdownReceived {
1842            // No more outstanding, send shutdown ack.
1843            should_awake_write_loop = true;
1844            self.will_send_shutdown_ack = true;
1845            self.set_state(AssociationState::ShutdownAckSent);
1846        }
1847
1848        if should_awake_write_loop {
1849            self.awake_write_loop();
1850        }
1851    }
1852
1853    fn reset_streams_if_any(
1854        &mut self,
1855        p: &ParamOutgoingResetRequest,
1856        respond: bool,
1857        reply: &mut Vec<Packet>,
1858    ) -> Result<()> {
1859        let mut result = ReconfigResult::SuccessPerformed;
1860        let mut sis_to_reset = vec![];
1861
1862        if sna32lte(p.sender_last_tsn, self.peer_last_tsn) {
1863            debug!(
1864                "[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}",
1865                self.side, p.sender_last_tsn, self.peer_last_tsn
1866            );
1867            for id in &p.stream_identifiers {
1868                if self.streams.contains_key(id) {
1869                    if respond {
1870                        sis_to_reset.push(*id);
1871                    }
1872                    self.unregister_stream(*id);
1873                }
1874            }
1875            self.reconfig_requests
1876                .remove(&p.reconfig_request_sequence_number);
1877        } else {
1878            debug!(
1879                "[{}] resetStream(): senderLastTSN={} > peer_last_tsn={}",
1880                self.side, p.sender_last_tsn, self.peer_last_tsn
1881            );
1882            result = ReconfigResult::InProgress;
1883        }
1884
1885        // Answer incoming reset requests with the same reset request, but with
1886        // reconfig_response_sequence_number.
1887        if !sis_to_reset.is_empty() {
1888            let rsn = self.generate_next_rsn();
1889            let tsn = self.my_next_tsn - 1;
1890
1891            let c = ChunkReconfig {
1892                param_a: Some(Box::new(ParamOutgoingResetRequest {
1893                    reconfig_request_sequence_number: rsn,
1894                    reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1895                    sender_last_tsn: tsn,
1896                    stream_identifiers: sis_to_reset,
1897                })),
1898                ..Default::default()
1899            };
1900
1901            self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission
1902
1903            let p = self.create_packet(vec![Box::new(c)]);
1904            reply.push(p);
1905        }
1906
1907        let packet = self.create_packet(vec![Box::new(ChunkReconfig {
1908            param_a: Some(Box::new(ParamReconfigResponse {
1909                reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1910                result,
1911            })),
1912            param_b: None,
1913        })]);
1914
1915        debug!("[{}] RESET RESPONSE: {}", self.side, packet);
1916
1917        reply.push(packet);
1918
1919        Ok(())
1920    }
1921
1922    /// create_packet wraps chunks in a packet.
1923    /// The caller should hold the read lock.
1924    pub(crate) fn create_packet(&self, chunks: Vec<Box<dyn Chunk + Send + Sync>>) -> Packet {
1925        Packet {
1926            common_header: CommonHeader {
1927                verification_tag: self.peer_verification_tag,
1928                source_port: self.source_port,
1929                destination_port: self.destination_port,
1930            },
1931            chunks,
1932        }
1933    }
1934
1935    /// create_stream creates a stream. The caller should hold the lock and check no stream exists for this id.
1936    fn create_stream(
1937        &mut self,
1938        stream_identifier: StreamId,
1939        accept: bool,
1940        default_payload_type: PayloadProtocolIdentifier,
1941    ) -> Option<Stream<'_>> {
1942        let s = StreamState::new(
1943            self.side,
1944            stream_identifier,
1945            self.max_payload_size,
1946            default_payload_type,
1947        );
1948
1949        if accept {
1950            self.stream_queue.push_back(stream_identifier);
1951            self.events.push_back(Event::Stream(StreamEvent::Opened));
1952        }
1953
1954        self.streams.insert(stream_identifier, s);
1955
1956        Some(Stream {
1957            stream_identifier,
1958            association: self,
1959        })
1960    }
1961
1962    /// get_or_create_stream gets or creates a stream. The caller should hold the lock.
1963    fn get_or_create_stream(&mut self, stream_identifier: StreamId) -> Option<Stream<'_>> {
1964        if self.streams.contains_key(&stream_identifier) {
1965            Some(Stream {
1966                stream_identifier,
1967                association: self,
1968            })
1969        } else {
1970            self.create_stream(
1971                stream_identifier,
1972                true,
1973                PayloadProtocolIdentifier::default(),
1974            )
1975        }
1976    }
1977
1978    pub(crate) fn get_my_receiver_window_credit(&self) -> u32 {
1979        let mut bytes_queued = 0;
1980        for s in self.streams.values() {
1981            bytes_queued += s.get_num_bytes_in_reassembly_queue() as u32;
1982        }
1983
1984        if bytes_queued >= self.max_receive_buffer_size {
1985            0
1986        } else {
1987            self.max_receive_buffer_size - bytes_queued
1988        }
1989    }
1990
1991    /// gather_outbound gathers outgoing packets. The returned bool value set to
1992    /// false means the association should be closed down after the final send.
1993    fn gather_outbound(&mut self, now: Instant) -> (Vec<Bytes>, bool) {
1994        let mut raw_packets = vec![];
1995
1996        if !self.control_queue.is_empty() {
1997            for p in self.control_queue.drain(..) {
1998                if let Ok(raw) = p.marshal() {
1999                    raw_packets.push(raw);
2000                } else {
2001                    warn!("[{}] failed to serialize a control packet", self.side);
2002                    continue;
2003                }
2004            }
2005        }
2006
2007        let state = self.state();
2008        match state {
2009            AssociationState::Established => {
2010                raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2011                raw_packets = self.gather_outbound_data_and_reconfig_packets(raw_packets, now);
2012                raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2013                raw_packets = self.gather_outbound_sack_packets(raw_packets);
2014                raw_packets = self.gather_outbound_forward_tsn_packets(raw_packets);
2015                (raw_packets, true)
2016            }
2017            AssociationState::ShutdownPending
2018            | AssociationState::ShutdownSent
2019            | AssociationState::ShutdownReceived => {
2020                raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2021                raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2022                raw_packets = self.gather_outbound_sack_packets(raw_packets);
2023                self.gather_outbound_shutdown_packets(raw_packets, now)
2024            }
2025            AssociationState::ShutdownAckSent => {
2026                self.gather_outbound_shutdown_packets(raw_packets, now)
2027            }
2028            _ => (raw_packets, true),
2029        }
2030    }
2031
2032    fn gather_data_packets_to_retransmit(
2033        &mut self,
2034        mut raw_packets: Vec<Bytes>,
2035        now: Instant,
2036    ) -> Vec<Bytes> {
2037        for p in &self.get_data_packets_to_retransmit(now) {
2038            if let Ok(raw) = p.marshal() {
2039                raw_packets.push(raw);
2040            } else {
2041                warn!(
2042                    "[{}] failed to serialize a DATA packet to be retransmitted",
2043                    self.side
2044                );
2045            }
2046        }
2047
2048        raw_packets
2049    }
2050
2051    fn gather_outbound_data_and_reconfig_packets(
2052        &mut self,
2053        mut raw_packets: Vec<Bytes>,
2054        now: Instant,
2055    ) -> Vec<Bytes> {
2056        // Pop unsent data chunks from the pending queue to send as much as
2057        // cwnd and rwnd allow.
2058        let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send(now);
2059        if !chunks.is_empty() {
2060            // Start timer. (noop if already started)
2061            trace!("[{}] T3-rtx timer start (pt1)", self.side);
2062            self.timers
2063                .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
2064
2065            for p in &self.bundle_data_chunks_into_packets(chunks) {
2066                if let Ok(raw) = p.marshal() {
2067                    raw_packets.push(raw);
2068                } else {
2069                    warn!("[{}] failed to serialize a DATA packet", self.side);
2070                }
2071            }
2072        }
2073
2074        if !sis_to_reset.is_empty() || self.will_retransmit_reconfig {
2075            if self.will_retransmit_reconfig {
2076                self.will_retransmit_reconfig = false;
2077                debug!(
2078                    "[{}] retransmit {} RECONFIG chunk(s)",
2079                    self.side,
2080                    self.reconfigs.len()
2081                );
2082                for c in self.reconfigs.values() {
2083                    let p = self.create_packet(vec![Box::new(c.clone())]);
2084                    if let Ok(raw) = p.marshal() {
2085                        raw_packets.push(raw);
2086                    } else {
2087                        warn!(
2088                            "[{}] failed to serialize a RECONFIG packet to be retransmitted",
2089                            self.side,
2090                        );
2091                    }
2092                }
2093            }
2094
2095            if !sis_to_reset.is_empty() {
2096                let rsn = self.generate_next_rsn();
2097                let tsn = self.my_next_tsn - 1;
2098                debug!(
2099                    "[{}] sending RECONFIG: rsn={} tsn={} streams={:?}",
2100                    self.side,
2101                    rsn,
2102                    self.my_next_tsn - 1,
2103                    sis_to_reset
2104                );
2105
2106                let c = ChunkReconfig {
2107                    param_a: Some(Box::new(ParamOutgoingResetRequest {
2108                        reconfig_request_sequence_number: rsn,
2109                        sender_last_tsn: tsn,
2110                        stream_identifiers: sis_to_reset,
2111                        ..Default::default()
2112                    })),
2113                    ..Default::default()
2114                };
2115                self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission
2116
2117                let p = self.create_packet(vec![Box::new(c)]);
2118                if let Ok(raw) = p.marshal() {
2119                    raw_packets.push(raw);
2120                } else {
2121                    warn!(
2122                        "[{}] failed to serialize a RECONFIG packet to be transmitted",
2123                        self.side
2124                    );
2125                }
2126            }
2127
2128            if !self.reconfigs.is_empty() {
2129                self.timers
2130                    .start(Timer::Reconfig, now, self.rto_mgr.get_rto());
2131            }
2132        }
2133
2134        raw_packets
2135    }
2136
2137    fn gather_outbound_fast_retransmission_packets(
2138        &mut self,
2139        mut raw_packets: Vec<Bytes>,
2140        now: Instant,
2141    ) -> Vec<Bytes> {
2142        if self.will_retransmit_fast {
2143            self.will_retransmit_fast = false;
2144
2145            let mut to_fast_retrans: Vec<Box<dyn Chunk + Send + Sync>> = vec![];
2146            let mut fast_retrans_size = COMMON_HEADER_SIZE;
2147
2148            let mut i = 0;
2149            loop {
2150                let tsn = self.cumulative_tsn_ack_point + i + 1;
2151                if let Some(c) = self.inflight_queue.get_mut(tsn) {
2152                    if c.acked || c.abandoned() || c.nsent > 1 || c.miss_indicator < 3 {
2153                        i += 1;
2154                        continue;
2155                    }
2156
2157                    // RFC 4960 Sec 7.2.4 Fast Retransmit on Gap Reports
2158                    //  3)  Determine how many of the earliest (i.e., lowest TSN) DATA chunks
2159                    //      marked for retransmission will fit into a single packet, subject
2160                    //      to constraint of the path MTU of the destination transport
2161                    //      address to which the packet is being sent.  Call this value K.
2162                    //      Retransmit those K DATA chunks in a single packet.  When a Fast
2163                    //      Retransmit is being performed, the sender SHOULD ignore the value
2164                    //      of cwnd and SHOULD NOT delay retransmission for this single
2165                    //		packet.
2166
2167                    let data_chunk_size = DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2168                    if self.mtu < fast_retrans_size + data_chunk_size {
2169                        break;
2170                    }
2171
2172                    fast_retrans_size += data_chunk_size;
2173                    self.stats.inc_fast_retrans();
2174                    c.nsent += 1;
2175                } else {
2176                    break; // end of pending data
2177                }
2178
2179                if let Some(c) = self.inflight_queue.get_mut(tsn) {
2180                    Association::check_partial_reliability_status(
2181                        c,
2182                        now,
2183                        self.use_forward_tsn,
2184                        self.side,
2185                        &self.streams,
2186                    );
2187                    to_fast_retrans.push(Box::new(c.clone()));
2188                    trace!(
2189                        "[{}] fast-retransmit: tsn={} sent={} htna={}",
2190                        self.side,
2191                        c.tsn,
2192                        c.nsent,
2193                        self.fast_recover_exit_point
2194                    );
2195                }
2196                i += 1;
2197            }
2198
2199            if !to_fast_retrans.is_empty() {
2200                if let Ok(raw) = self.create_packet(to_fast_retrans).marshal() {
2201                    raw_packets.push(raw);
2202                } else {
2203                    warn!(
2204                        "[{}] failed to serialize a DATA packet to be fast-retransmitted",
2205                        self.side
2206                    );
2207                }
2208            }
2209        }
2210
2211        raw_packets
2212    }
2213
2214    fn gather_outbound_sack_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2215        if self.ack_state == AckState::Immediate {
2216            self.ack_state = AckState::Idle;
2217            let sack = self.create_selective_ack_chunk();
2218            trace!("[{}] sending SACK: {}", self.side, sack);
2219            if let Ok(raw) = self.create_packet(vec![Box::new(sack)]).marshal() {
2220                raw_packets.push(raw);
2221            } else {
2222                warn!("[{}] failed to serialize a SACK packet", self.side);
2223            }
2224        }
2225
2226        raw_packets
2227    }
2228
2229    fn gather_outbound_forward_tsn_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2230        /*log::debug!(
2231            "[{}] gatherOutboundForwardTSNPackets {}",
2232            self.name,
2233            self.will_send_forward_tsn
2234        );*/
2235        if self.will_send_forward_tsn {
2236            self.will_send_forward_tsn = false;
2237            if sna32gt(
2238                self.advanced_peer_tsn_ack_point,
2239                self.cumulative_tsn_ack_point,
2240            ) {
2241                let fwd_tsn = self.create_forward_tsn();
2242                if let Ok(raw) = self.create_packet(vec![Box::new(fwd_tsn)]).marshal() {
2243                    raw_packets.push(raw);
2244                } else {
2245                    warn!("[{}] failed to serialize a Forward TSN packet", self.side);
2246                }
2247            }
2248        }
2249
2250        raw_packets
2251    }
2252
2253    fn gather_outbound_shutdown_packets(
2254        &mut self,
2255        mut raw_packets: Vec<Bytes>,
2256        now: Instant,
2257    ) -> (Vec<Bytes>, bool) {
2258        let mut ok = true;
2259
2260        if self.will_send_shutdown {
2261            self.will_send_shutdown = false;
2262
2263            let shutdown = ChunkShutdown {
2264                cumulative_tsn_ack: self.cumulative_tsn_ack_point,
2265            };
2266
2267            if let Ok(raw) = self.create_packet(vec![Box::new(shutdown)]).marshal() {
2268                self.timers
2269                    .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2270                raw_packets.push(raw);
2271            } else {
2272                warn!("[{}] failed to serialize a Shutdown packet", self.side);
2273            }
2274        } else if self.will_send_shutdown_ack {
2275            self.will_send_shutdown_ack = false;
2276
2277            let shutdown_ack = ChunkShutdownAck {};
2278
2279            if let Ok(raw) = self.create_packet(vec![Box::new(shutdown_ack)]).marshal() {
2280                self.timers
2281                    .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2282                raw_packets.push(raw);
2283            } else {
2284                warn!("[{}] failed to serialize a ShutdownAck packet", self.side);
2285            }
2286        } else if self.will_send_shutdown_complete {
2287            self.will_send_shutdown_complete = false;
2288
2289            let shutdown_complete = ChunkShutdownComplete {};
2290
2291            if let Ok(raw) = self
2292                .create_packet(vec![Box::new(shutdown_complete)])
2293                .marshal()
2294            {
2295                raw_packets.push(raw);
2296                ok = false;
2297            } else {
2298                warn!(
2299                    "[{}] failed to serialize a ShutdownComplete packet",
2300                    self.side
2301                );
2302            }
2303        }
2304
2305        (raw_packets, ok)
2306    }
2307
2308    /// get_data_packets_to_retransmit is called when T3-rtx is timed out and retransmit outstanding data chunks
2309    /// that are not acked or abandoned yet.
2310    fn get_data_packets_to_retransmit(&mut self, now: Instant) -> Vec<Packet> {
2311        let awnd = std::cmp::min(self.cwnd, self.rwnd);
2312        let mut chunks = vec![];
2313        let mut bytes_to_send = 0;
2314        let mut done = false;
2315        let mut i = 0;
2316        while !done {
2317            let tsn = self.cumulative_tsn_ack_point + i + 1;
2318            if let Some(c) = self.inflight_queue.get_mut(tsn) {
2319                if !c.retransmit {
2320                    i += 1;
2321                    continue;
2322                }
2323
2324                if i == 0 && self.rwnd < c.user_data.len() as u32 {
2325                    // Send it as a zero window probe
2326                    done = true;
2327                } else if bytes_to_send + c.user_data.len() > awnd as usize {
2328                    break;
2329                }
2330
2331                // reset the retransmit flag not to retransmit again before the next
2332                // t3-rtx timer fires
2333                c.retransmit = false;
2334                bytes_to_send += c.user_data.len();
2335
2336                c.nsent += 1;
2337            } else {
2338                break; // end of pending data
2339            }
2340
2341            if let Some(c) = self.inflight_queue.get_mut(tsn) {
2342                Association::check_partial_reliability_status(
2343                    c,
2344                    now,
2345                    self.use_forward_tsn,
2346                    self.side,
2347                    &self.streams,
2348                );
2349
2350                trace!(
2351                    "[{}] retransmitting tsn={} ssn={} sent={}",
2352                    self.side,
2353                    c.tsn,
2354                    c.stream_sequence_number,
2355                    c.nsent
2356                );
2357
2358                chunks.push(c.clone());
2359            }
2360            i += 1;
2361        }
2362
2363        self.bundle_data_chunks_into_packets(chunks)
2364    }
2365
2366    /// pop_pending_data_chunks_to_send pops chunks from the pending queues as many as
2367    /// the cwnd and rwnd allows to send.
2368    fn pop_pending_data_chunks_to_send(
2369        &mut self,
2370        now: Instant,
2371    ) -> (Vec<ChunkPayloadData>, Vec<u16>) {
2372        let mut chunks = vec![];
2373        let mut sis_to_reset = vec![]; // stream identifiers to reset
2374        if !self.pending_queue.is_empty() {
2375            // RFC 4960 sec 6.1.  Transmission of DATA Chunks
2376            //   A) At any given time, the data sender MUST NOT transmit new data to
2377            //      any destination transport address if its peer's rwnd indicates
2378            //      that the peer has no buffer space (i.e., rwnd is 0; see Section
2379            //      6.2.1).  However, regardless of the value of rwnd (including if it
2380            //      is 0), the data sender can always have one DATA chunk in flight to
2381            //      the receiver if allowed by cwnd (see rule B, below).
2382
2383            while let Some(c) = self.pending_queue.peek() {
2384                let (beginning_fragment, unordered, data_len, stream_identifier) = (
2385                    c.beginning_fragment,
2386                    c.unordered,
2387                    c.user_data.len(),
2388                    c.stream_identifier,
2389                );
2390
2391                if data_len == 0 {
2392                    sis_to_reset.push(stream_identifier);
2393                    if self
2394                        .pending_queue
2395                        .pop(beginning_fragment, unordered)
2396                        .is_none()
2397                    {
2398                        error!("[{}] failed to pop from pending queue", self.side);
2399                    }
2400                    continue;
2401                }
2402
2403                if self.inflight_queue.get_num_bytes() + data_len > self.cwnd as usize {
2404                    break; // would exceeds cwnd
2405                }
2406
2407                if data_len > self.rwnd as usize {
2408                    break; // no more rwnd
2409                }
2410
2411                self.rwnd -= data_len as u32;
2412
2413                if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2414                    beginning_fragment,
2415                    unordered,
2416                    now,
2417                ) {
2418                    chunks.push(chunk);
2419                }
2420            }
2421
2422            // the data sender can always have one DATA chunk in flight to the receiver
2423            if chunks.is_empty() && self.inflight_queue.is_empty() {
2424                // Send zero window probe
2425                if let Some(c) = self.pending_queue.peek() {
2426                    let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
2427
2428                    if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2429                        beginning_fragment,
2430                        unordered,
2431                        now,
2432                    ) {
2433                        chunks.push(chunk);
2434                    }
2435                }
2436            }
2437        }
2438
2439        (chunks, sis_to_reset)
2440    }
2441
2442    /// bundle_data_chunks_into_packets packs DATA chunks into packets. It tries to bundle
2443    /// DATA chunks into a packet so long as the resulting packet size does not exceed
2444    /// the path MTU.
2445    fn bundle_data_chunks_into_packets(&self, chunks: Vec<ChunkPayloadData>) -> Vec<Packet> {
2446        let mut packets = vec![];
2447        let mut chunks_to_send = vec![];
2448        let mut bytes_in_packet = COMMON_HEADER_SIZE;
2449
2450        for c in chunks {
2451            // RFC 4960 sec 6.1.  Transmission of DATA Chunks
2452            //   Multiple DATA chunks committed for transmission MAY be bundled in a
2453            //   single packet.  Furthermore, DATA chunks being retransmitted MAY be
2454            //   bundled with new DATA chunks, as long as the resulting packet size
2455            //   does not exceed the path MTU.
2456            if bytes_in_packet + c.user_data.len() as u32 > self.mtu {
2457                packets.push(self.create_packet(chunks_to_send));
2458                chunks_to_send = vec![];
2459                bytes_in_packet = COMMON_HEADER_SIZE;
2460            }
2461
2462            bytes_in_packet += DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2463            chunks_to_send.push(Box::new(c));
2464        }
2465
2466        if !chunks_to_send.is_empty() {
2467            packets.push(self.create_packet(chunks_to_send));
2468        }
2469
2470        packets
2471    }
2472
2473    /// generate_next_tsn returns the my_next_tsn and increases it. The caller should hold the lock.
2474    fn generate_next_tsn(&mut self) -> u32 {
2475        let tsn = self.my_next_tsn;
2476        self.my_next_tsn += 1;
2477        tsn
2478    }
2479
2480    /// generate_next_rsn returns the my_next_rsn and increases it. The caller should hold the lock.
2481    fn generate_next_rsn(&mut self) -> u32 {
2482        let rsn = self.my_next_rsn;
2483        self.my_next_rsn += 1;
2484        rsn
2485    }
2486
2487    fn check_partial_reliability_status(
2488        c: &mut ChunkPayloadData,
2489        now: Instant,
2490        use_forward_tsn: bool,
2491        side: Side,
2492        streams: &FxHashMap<u16, StreamState>,
2493    ) {
2494        if !use_forward_tsn {
2495            return;
2496        }
2497
2498        // draft-ietf-rtcweb-data-protocol-09.txt section 6
2499        //	6.  Procedures
2500        //		All Data Channel Establishment Protocol messages MUST be sent using
2501        //		ordered delivery and reliable transmission.
2502        //
2503        if c.payload_type == PayloadProtocolIdentifier::Dcep {
2504            return;
2505        }
2506
2507        // PR-SCTP
2508        if let Some(s) = streams.get(&c.stream_identifier) {
2509            let reliability_type: ReliabilityType = s.reliability_type;
2510            let reliability_value = s.reliability_value;
2511
2512            if reliability_type == ReliabilityType::Rexmit {
2513                if c.nsent >= reliability_value {
2514                    c.set_abandoned(true);
2515                    trace!(
2516                        "[{}] marked as abandoned: tsn={} ppi={} (remix: {})",
2517                        side,
2518                        c.tsn,
2519                        c.payload_type,
2520                        c.nsent
2521                    );
2522                }
2523            } else if reliability_type == ReliabilityType::Timed {
2524                if let Some(since) = &c.since {
2525                    let elapsed = now.duration_since(*since);
2526                    if elapsed.as_millis() as u32 >= reliability_value {
2527                        c.set_abandoned(true);
2528                        trace!(
2529                            "[{}] marked as abandoned: tsn={} ppi={} (timed: {:?})",
2530                            side,
2531                            c.tsn,
2532                            c.payload_type,
2533                            elapsed
2534                        );
2535                    }
2536                } else {
2537                    error!("[{}] invalid c.since", side);
2538                }
2539            }
2540        } else {
2541            error!("[{}] stream {} not found)", side, c.stream_identifier);
2542        }
2543    }
2544
2545    fn create_selective_ack_chunk(&mut self) -> ChunkSelectiveAck {
2546        ChunkSelectiveAck {
2547            cumulative_tsn_ack: self.peer_last_tsn,
2548            advertised_receiver_window_credit: self.get_my_receiver_window_credit(),
2549            gap_ack_blocks: self.payload_queue.get_gap_ack_blocks(self.peer_last_tsn),
2550            duplicate_tsn: self.payload_queue.pop_duplicates(),
2551        }
2552    }
2553
2554    /// create_forward_tsn generates ForwardTSN chunk.
2555    /// This method will be be called if use_forward_tsn is set to false.
2556    fn create_forward_tsn(&self) -> ChunkForwardTsn {
2557        // RFC 3758 Sec 3.5 C4
2558        let mut stream_map: HashMap<u16, u16> = HashMap::new(); // to report only once per SI
2559        let mut i = self.cumulative_tsn_ack_point + 1;
2560        while sna32lte(i, self.advanced_peer_tsn_ack_point) {
2561            if let Some(c) = self.inflight_queue.get(i) {
2562                if let Some(ssn) = stream_map.get(&c.stream_identifier) {
2563                    if sna16lt(*ssn, c.stream_sequence_number) {
2564                        // to report only once with greatest SSN
2565                        stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2566                    }
2567                } else {
2568                    stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2569                }
2570            } else {
2571                break;
2572            }
2573
2574            i += 1;
2575        }
2576
2577        let mut fwd_tsn = ChunkForwardTsn {
2578            new_cumulative_tsn: self.advanced_peer_tsn_ack_point,
2579            streams: vec![],
2580        };
2581
2582        let mut stream_str = String::new();
2583        for (si, ssn) in &stream_map {
2584            stream_str += format!("(si={} ssn={})", si, ssn).as_str();
2585            fwd_tsn.streams.push(ChunkForwardTsnStream {
2586                identifier: *si,
2587                sequence: *ssn,
2588            });
2589        }
2590        trace!(
2591            "[{}] building fwd_tsn: newCumulativeTSN={} cumTSN={} - {}",
2592            self.side,
2593            fwd_tsn.new_cumulative_tsn,
2594            self.cumulative_tsn_ack_point,
2595            stream_str
2596        );
2597
2598        fwd_tsn
2599    }
2600
2601    /// Move the chunk peeked with self.pending_queue.peek() to the inflight_queue.
2602    fn move_pending_data_chunk_to_inflight_queue(
2603        &mut self,
2604        beginning_fragment: bool,
2605        unordered: bool,
2606        now: Instant,
2607    ) -> Option<ChunkPayloadData> {
2608        if let Some(mut c) = self.pending_queue.pop(beginning_fragment, unordered) {
2609            // Mark all fragements are in-flight now
2610            if c.ending_fragment {
2611                c.set_all_inflight();
2612            }
2613
2614            // Assign TSN
2615            c.tsn = self.generate_next_tsn();
2616
2617            c.since = Some(now); // use to calculate RTT and also for maxPacketLifeTime
2618            c.nsent = 1; // being sent for the first time
2619
2620            Association::check_partial_reliability_status(
2621                &mut c,
2622                now,
2623                self.use_forward_tsn,
2624                self.side,
2625                &self.streams,
2626            );
2627
2628            trace!(
2629                "[{}] sending ppi={} tsn={} ssn={} sent={} len={} ({},{})",
2630                self.side,
2631                c.payload_type as u32,
2632                c.tsn,
2633                c.stream_sequence_number,
2634                c.nsent,
2635                c.user_data.len(),
2636                c.beginning_fragment,
2637                c.ending_fragment
2638            );
2639
2640            self.inflight_queue.push_no_check(c.clone());
2641
2642            Some(c)
2643        } else {
2644            error!("[{}] failed to pop from pending queue", self.side);
2645            None
2646        }
2647    }
2648
2649    pub(crate) fn send_reset_request(&mut self, stream_identifier: StreamId) -> Result<()> {
2650        let state = self.state();
2651        if state != AssociationState::Established {
2652            return Err(Error::ErrResetPacketInStateNotExist);
2653        }
2654
2655        // Create DATA chunk which only contains valid stream identifier with
2656        // nil userData and use it as a EOS from the stream.
2657        let c = ChunkPayloadData {
2658            stream_identifier,
2659            beginning_fragment: true,
2660            ending_fragment: true,
2661            user_data: Bytes::new(),
2662            ..Default::default()
2663        };
2664
2665        self.pending_queue.push(c);
2666        self.awake_write_loop();
2667
2668        Ok(())
2669    }
2670
2671    /// send_payload_data sends the data chunks.
2672    pub(crate) fn send_payload_data(&mut self, chunks: Vec<ChunkPayloadData>) -> Result<()> {
2673        let state = self.state();
2674        if state != AssociationState::Established {
2675            return Err(Error::ErrPayloadDataStateNotExist);
2676        }
2677
2678        // Push the chunks into the pending queue first.
2679        for c in chunks {
2680            self.pending_queue.push(c);
2681        }
2682
2683        self.awake_write_loop();
2684        Ok(())
2685    }
2686
2687    /// buffered_amount returns total amount (in bytes) of currently buffered user data.
2688    /// This is used only by testing.
2689    pub(crate) fn buffered_amount(&self) -> usize {
2690        self.pending_queue.get_num_bytes() + self.inflight_queue.get_num_bytes()
2691    }
2692
2693    fn awake_write_loop(&self) {
2694        // No Op on Purpose
2695    }
2696
2697    fn close_all_timers(&mut self) {
2698        // Close all retransmission & ack timers
2699        for timer in Timer::VALUES {
2700            self.timers.stop(timer);
2701        }
2702    }
2703
2704    fn on_ack_timeout(&mut self) {
2705        trace!(
2706            "[{}] ack timed out (ack_state: {})",
2707            self.side,
2708            self.ack_state
2709        );
2710        self.stats.inc_ack_timeouts();
2711        self.ack_state = AckState::Immediate;
2712        self.awake_write_loop();
2713    }
2714
2715    fn on_retransmission_timeout(&mut self, timer_id: Timer, n_rtos: usize) {
2716        match timer_id {
2717            Timer::T1Init => {
2718                if let Err(err) = self.send_init() {
2719                    debug!(
2720                        "[{}] failed to retransmit init (n_rtos={}): {:?}",
2721                        self.side, n_rtos, err
2722                    );
2723                }
2724            }
2725
2726            Timer::T1Cookie => {
2727                if let Err(err) = self.send_cookie_echo() {
2728                    debug!(
2729                        "[{}] failed to retransmit cookie-echo (n_rtos={}): {:?}",
2730                        self.side, n_rtos, err
2731                    );
2732                }
2733            }
2734
2735            Timer::T2Shutdown => {
2736                debug!(
2737                    "[{}] retransmission of shutdown timeout (n_rtos={})",
2738                    self.side, n_rtos
2739                );
2740                let state = self.state();
2741                match state {
2742                    AssociationState::ShutdownSent => {
2743                        self.will_send_shutdown = true;
2744                        self.awake_write_loop();
2745                    }
2746                    AssociationState::ShutdownAckSent => {
2747                        self.will_send_shutdown_ack = true;
2748                        self.awake_write_loop();
2749                    }
2750                    _ => {}
2751                }
2752            }
2753
2754            Timer::T3RTX => {
2755                self.stats.inc_t3timeouts();
2756
2757                // RFC 4960 sec 6.3.3
2758                //  E1)  For the destination address for which the timer expires, adjust
2759                //       its ssthresh with rules defined in Section 7.2.3 and set the
2760                //       cwnd <- MTU.
2761                // RFC 4960 sec 7.2.3
2762                //   When the T3-rtx timer expires on an address, SCTP should perform slow
2763                //   start by:
2764                //      ssthresh = max(cwnd/2, 4*MTU)
2765                //      cwnd = 1*MTU
2766
2767                self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
2768                self.cwnd = self.mtu;
2769                trace!(
2770                    "[{}] updated cwnd={} ssthresh={} inflight={} (RTO)",
2771                    self.side,
2772                    self.cwnd,
2773                    self.ssthresh,
2774                    self.inflight_queue.get_num_bytes()
2775                );
2776
2777                // RFC 3758 sec 3.5
2778                //  A5) Any time the T3-rtx timer expires, on any destination, the sender
2779                //  SHOULD try to advance the "Advanced.Peer.Ack.Point" by following
2780                //  the procedures outlined in C2 - C5.
2781                if self.use_forward_tsn {
2782                    // RFC 3758 Sec 3.5 C2
2783                    let mut i = self.advanced_peer_tsn_ack_point + 1;
2784                    while let Some(c) = self.inflight_queue.get(i) {
2785                        if !c.abandoned() {
2786                            break;
2787                        }
2788                        self.advanced_peer_tsn_ack_point = i;
2789                        i += 1;
2790                    }
2791
2792                    // RFC 3758 Sec 3.5 C3
2793                    if sna32gt(
2794                        self.advanced_peer_tsn_ack_point,
2795                        self.cumulative_tsn_ack_point,
2796                    ) {
2797                        self.will_send_forward_tsn = true;
2798                        debug!(
2799                            "[{}] on_retransmission_timeout {}: sna32GT({}, {})",
2800                            self.side,
2801                            self.will_send_forward_tsn,
2802                            self.advanced_peer_tsn_ack_point,
2803                            self.cumulative_tsn_ack_point
2804                        );
2805                    }
2806                }
2807
2808                debug!(
2809                    "[{}] T3-rtx timed out: n_rtos={} cwnd={} ssthresh={}",
2810                    self.side, n_rtos, self.cwnd, self.ssthresh
2811                );
2812
2813                self.inflight_queue.mark_all_to_retrasmit();
2814                self.awake_write_loop();
2815            }
2816
2817            Timer::Reconfig => {
2818                self.will_retransmit_reconfig = true;
2819                self.awake_write_loop();
2820            }
2821
2822            _ => {}
2823        }
2824    }
2825
2826    fn on_retransmission_failure(&mut self, id: Timer) {
2827        match id {
2828            Timer::T1Init => {
2829                error!("[{}] retransmission failure: T1-init", self.side);
2830                self.error = Some(AssociationError::HandshakeFailed(
2831                    Error::ErrHandshakeInitAck,
2832                ));
2833            }
2834
2835            Timer::T1Cookie => {
2836                error!("[{}] retransmission failure: T1-cookie", self.side);
2837                self.error = Some(AssociationError::HandshakeFailed(
2838                    Error::ErrHandshakeCookieEcho,
2839                ));
2840            }
2841
2842            Timer::T2Shutdown => {
2843                error!("[{}] retransmission failure: T2-shutdown", self.side);
2844            }
2845
2846            Timer::T3RTX => {
2847                // T3-rtx timer will not fail by design
2848                // Justifications:
2849                //  * ICE would fail if the connectivity is lost
2850                //  * WebRTC spec is not clear how this incident should be reported to ULP
2851                error!("[{}] retransmission failure: T3-rtx (DATA)", self.side);
2852            }
2853
2854            _ => {}
2855        }
2856    }
2857
2858    /// Whether no timers are running
2859    #[cfg(test)]
2860    pub(crate) fn is_idle(&self) -> bool {
2861        Timer::VALUES
2862            .iter()
2863            //.filter(|&&t| t != Timer::KeepAlive && t != Timer::PushNewCid)
2864            .filter_map(|&t| Some((t, self.timers.get(t)?)))
2865            .min_by_key(|&(_, time)| time)
2866            //.map_or(true, |(timer, _)| timer == Timer::Idle)
2867            .is_none()
2868    }
2869}