1use crate::association::{
2 state::{AckMode, AckState, AssociationState},
3 stats::AssociationStats,
4};
5use crate::chunk::{
6 chunk_abort::ChunkAbort, chunk_cookie_ack::ChunkCookieAck, chunk_cookie_echo::ChunkCookieEcho,
7 chunk_error::ChunkError, chunk_forward_tsn::ChunkForwardTsn,
8 chunk_forward_tsn::ChunkForwardTsnStream, chunk_heartbeat::ChunkHeartbeat,
9 chunk_heartbeat_ack::ChunkHeartbeatAck, chunk_init::ChunkInit, chunk_init::ChunkInitAck,
10 chunk_payload_data::ChunkPayloadData, chunk_payload_data::PayloadProtocolIdentifier,
11 chunk_reconfig::ChunkReconfig, chunk_selective_ack::ChunkSelectiveAck,
12 chunk_shutdown::ChunkShutdown, chunk_shutdown_ack::ChunkShutdownAck,
13 chunk_shutdown_complete::ChunkShutdownComplete, chunk_type::CT_FORWARD_TSN, Chunk,
14 ErrorCauseUnrecognizedChunkType, USER_INITIATED_ABORT,
15};
16use crate::config::{ServerConfig, TransportConfig, COMMON_HEADER_SIZE, DATA_CHUNK_HEADER_SIZE};
17use crate::error::{Error, Result};
18use crate::packet::{CommonHeader, Packet};
19use crate::param::{
20 param_heartbeat_info::ParamHeartbeatInfo,
21 param_outgoing_reset_request::ParamOutgoingResetRequest,
22 param_reconfig_response::{ParamReconfigResponse, ReconfigResult},
23 param_state_cookie::ParamStateCookie,
24 param_supported_extensions::ParamSupportedExtensions,
25 Param,
26};
27use crate::queue::{payload_queue::PayloadQueue, pending_queue::PendingQueue};
28use crate::shared::{AssociationEventInner, AssociationId, EndpointEvent, EndpointEventInner};
29use crate::util::{sna16lt, sna32gt, sna32gte, sna32lt, sna32lte};
30use crate::{AssociationEvent, Payload, Side, Transmit};
31use stream::{ReliabilityType, Stream, StreamEvent, StreamId, StreamState};
32use timer::{RtoManager, Timer, TimerTable, ACK_INTERVAL};
33
34use crate::association::stream::RecvSendState;
35use bytes::Bytes;
36use fxhash::FxHashMap;
37use log::{debug, error, trace, warn};
38use rand::random;
39use std::collections::{HashMap, VecDeque};
40use std::net::{IpAddr, SocketAddr};
41use std::str::FromStr;
42use std::sync::Arc;
43use std::time::{Duration, Instant};
44use thiserror::Error;
45
46pub(crate) mod state;
47pub(crate) mod stats;
48pub(crate) mod stream;
49mod timer;
50
51#[cfg(test)]
52mod association_test;
53
54#[derive(Debug, Error, Eq, Clone, PartialEq)]
56pub enum AssociationError {
57 #[error("{0}")]
59 HandshakeFailed(#[from] Error),
60 #[error("transport error")]
62 TransportError,
63 #[error("aborted by peer")]
65 AssociationClosed,
66 #[error("closed by peer")]
68 ApplicationClosed,
69 #[error("reset by peer")]
71 Reset,
72 #[error("timed out")]
77 TimedOut,
78 #[error("closed")]
80 LocallyClosed,
81}
82
83#[derive(Debug)]
85pub enum Event {
86 Connected,
88 AssociationLost {
92 reason: AssociationError,
94 },
95 Stream(StreamEvent),
97 DatagramReceived,
99}
100
101#[derive(Debug)]
119pub struct Association {
120 side: Side,
121 state: AssociationState,
122 handshake_completed: bool,
123 max_message_size: u32,
124 inflight_queue_length: usize,
125 will_send_shutdown: bool,
126 bytes_received: usize,
127 bytes_sent: usize,
128
129 peer_verification_tag: u32,
130 my_verification_tag: u32,
131 my_next_tsn: u32,
132 peer_last_tsn: u32,
133 min_tsn2measure_rtt: u32,
135 will_send_forward_tsn: bool,
136 will_retransmit_fast: bool,
137 will_retransmit_reconfig: bool,
138
139 will_send_shutdown_ack: bool,
140 will_send_shutdown_complete: bool,
141
142 my_next_rsn: u32,
144 reconfigs: FxHashMap<u32, ChunkReconfig>,
145 reconfig_requests: FxHashMap<u32, ParamOutgoingResetRequest>,
146
147 remote_addr: SocketAddr,
149 local_ip: Option<IpAddr>,
150 source_port: u16,
151 destination_port: u16,
152 my_max_num_inbound_streams: u16,
153 my_max_num_outbound_streams: u16,
154 my_cookie: Option<ParamStateCookie>,
155
156 payload_queue: PayloadQueue,
157 inflight_queue: PayloadQueue,
158 pending_queue: PendingQueue,
159 control_queue: VecDeque<Packet>,
160 stream_queue: VecDeque<u16>,
161
162 pub(crate) mtu: u32,
163 max_payload_size: u32,
165 cumulative_tsn_ack_point: u32,
166 advanced_peer_tsn_ack_point: u32,
167 use_forward_tsn: bool,
168
169 pub(crate) rto_mgr: RtoManager,
170 timers: TimerTable,
171
172 max_receive_buffer_size: u32,
174 pub(crate) cwnd: u32,
176 rwnd: u32,
178 pub(crate) ssthresh: u32,
180 partial_bytes_acked: u32,
181 pub(crate) in_fast_recovery: bool,
182 fast_recover_exit_point: u32,
183
184 stored_init: Option<ChunkInit>,
186 stored_cookie_echo: Option<ChunkCookieEcho>,
187 pub(crate) streams: FxHashMap<StreamId, StreamState>,
188
189 events: VecDeque<Event>,
190 endpoint_events: VecDeque<EndpointEventInner>,
191 error: Option<AssociationError>,
192
193 delayed_ack_triggered: bool,
195 immediate_ack_triggered: bool,
196
197 pub(crate) stats: AssociationStats,
198 ack_state: AckState,
199
200 pub(crate) ack_mode: AckMode,
202}
203
204impl Default for Association {
205 fn default() -> Self {
206 Association {
207 side: Side::default(),
208 state: AssociationState::default(),
209 handshake_completed: false,
210 max_message_size: 0,
211 inflight_queue_length: 0,
212 will_send_shutdown: false,
213 bytes_received: 0,
214 bytes_sent: 0,
215
216 peer_verification_tag: 0,
217 my_verification_tag: 0,
218 my_next_tsn: 0,
219 peer_last_tsn: 0,
220 min_tsn2measure_rtt: 0,
222 will_send_forward_tsn: false,
223 will_retransmit_fast: false,
224 will_retransmit_reconfig: false,
225
226 will_send_shutdown_ack: false,
227 will_send_shutdown_complete: false,
228
229 my_next_rsn: 0,
231 reconfigs: FxHashMap::default(),
232 reconfig_requests: FxHashMap::default(),
233
234 remote_addr: SocketAddr::from_str("0.0.0.0:0").unwrap(),
236 local_ip: None,
237 source_port: 0,
238 destination_port: 0,
239 my_max_num_inbound_streams: 0,
240 my_max_num_outbound_streams: 0,
241 my_cookie: None,
242
243 payload_queue: PayloadQueue::default(),
244 inflight_queue: PayloadQueue::default(),
245 pending_queue: PendingQueue::default(),
246 control_queue: VecDeque::default(),
247 stream_queue: VecDeque::default(),
248
249 mtu: 0,
250 max_payload_size: 0,
252 cumulative_tsn_ack_point: 0,
253 advanced_peer_tsn_ack_point: 0,
254 use_forward_tsn: false,
255
256 rto_mgr: RtoManager::default(),
257 timers: TimerTable::default(),
258
259 max_receive_buffer_size: 0,
261 cwnd: 0,
263 rwnd: 0,
265 ssthresh: 0,
267 partial_bytes_acked: 0,
268 in_fast_recovery: false,
269 fast_recover_exit_point: 0,
270
271 stored_init: None,
273 stored_cookie_echo: None,
274 streams: FxHashMap::default(),
275
276 events: VecDeque::default(),
277 endpoint_events: VecDeque::default(),
278 error: None,
279
280 delayed_ack_triggered: false,
282 immediate_ack_triggered: false,
283
284 stats: AssociationStats::default(),
285 ack_state: AckState::default(),
286
287 ack_mode: AckMode::default(),
289 }
290 }
291}
292
293impl Association {
294 pub(crate) fn new(
295 server_config: Option<Arc<ServerConfig>>,
296 config: Arc<TransportConfig>,
297 max_payload_size: u32,
298 local_aid: AssociationId,
299 remote_addr: SocketAddr,
300 local_ip: Option<IpAddr>,
301 now: Instant,
302 ) -> Self {
303 let side = if server_config.is_some() {
304 Side::Server
305 } else {
306 Side::Client
307 };
308
309 let mtu = max_payload_size + COMMON_HEADER_SIZE + DATA_CHUNK_HEADER_SIZE;
312
313 let cwnd = (2 * mtu).clamp(4380, 4 * mtu);
317 let mut tsn = random::<u32>();
318 if tsn == 0 {
319 tsn += 1;
320 }
321
322 let mut this = Association {
323 side,
324 handshake_completed: false,
325 max_receive_buffer_size: config.max_receive_buffer_size(),
326 max_message_size: config.max_message_size(),
327 my_max_num_outbound_streams: config.max_num_outbound_streams(),
328 my_max_num_inbound_streams: config.max_num_inbound_streams(),
329 max_payload_size,
330
331 rto_mgr: RtoManager::new(),
332 timers: TimerTable::new(),
333
334 mtu,
335 cwnd,
336 remote_addr,
337 local_ip,
338
339 my_verification_tag: local_aid,
340 my_next_tsn: tsn,
341 my_next_rsn: tsn,
342 min_tsn2measure_rtt: tsn,
343 cumulative_tsn_ack_point: tsn - 1,
344 advanced_peer_tsn_ack_point: tsn - 1,
345 error: None,
346
347 ..Default::default()
348 };
349
350 if side.is_client() {
351 let mut init = ChunkInit {
352 initial_tsn: this.my_next_tsn,
353 num_outbound_streams: this.my_max_num_outbound_streams,
354 num_inbound_streams: this.my_max_num_inbound_streams,
355 initiate_tag: this.my_verification_tag,
356 advertised_receiver_window_credit: this.max_receive_buffer_size,
357 ..Default::default()
358 };
359 init.set_supported_extensions();
360
361 this.set_state(AssociationState::CookieWait);
362 this.stored_init = Some(init);
363 let _ = this.send_init();
364 this.timers
365 .start(Timer::T1Init, now, this.rto_mgr.get_rto());
366 }
367
368 this
369 }
370
371 #[must_use]
377 pub fn poll(&mut self) -> Option<Event> {
378 if let Some(x) = self.events.pop_front() {
379 return Some(x);
380 }
381
382 if let Some(err) = self.error.take() {
387 return Some(Event::AssociationLost { reason: err });
388 }
389
390 None
391 }
392
393 #[must_use]
395 pub fn poll_endpoint_event(&mut self) -> Option<EndpointEvent> {
396 self.endpoint_events.pop_front().map(EndpointEvent)
397 }
398
399 #[must_use]
407 pub fn poll_timeout(&mut self) -> Option<Instant> {
408 self.timers.next_timeout()
409 }
410
411 #[must_use]
418 pub fn poll_transmit(&mut self, now: Instant) -> Option<Transmit> {
419 let (contents, _) = self.gather_outbound(now);
420 if contents.is_empty() {
421 None
422 } else {
423 trace!(
424 "[{}] sending {} bytes (total {} datagrams)",
425 self.side,
426 contents.iter().fold(0, |l, c| l + c.len()),
427 contents.len()
428 );
429 Some(Transmit {
430 now,
431 remote: self.remote_addr,
432 payload: Payload::RawEncode(contents),
433 ecn: None,
434 local_ip: self.local_ip,
435 })
436 }
437 }
438
439 pub fn handle_timeout(&mut self, now: Instant) {
449 for &timer in &Timer::VALUES {
450 let (expired, failure, n_rtos) = self.timers.is_expired(timer, now);
451 if !expired {
452 continue;
453 }
454 self.timers.set(timer, None);
455 if timer == Timer::Ack {
458 self.on_ack_timeout();
459 } else if failure {
460 self.on_retransmission_failure(timer);
461 } else {
462 self.on_retransmission_timeout(timer, n_rtos);
463 self.timers.start(timer, now, self.rto_mgr.get_rto());
464 }
465 }
466 }
467
468 pub fn handle_event(&mut self, event: AssociationEvent) {
474 match event.0 {
475 AssociationEventInner::Datagram(transmit) => {
476 if let Payload::PartialDecode(partial_decode) = transmit.payload {
486 trace!(
487 "[{}] receiving {} bytes",
488 self.side,
489 COMMON_HEADER_SIZE as usize + partial_decode.remaining.len()
490 );
491
492 let pkt = match partial_decode.finish() {
493 Ok(p) => p,
494 Err(err) => {
495 warn!("[{}] unable to parse SCTP packet {}", self.side, err);
496 return;
497 }
498 };
499
500 if let Err(err) = self.handle_inbound(pkt, transmit.now) {
501 error!("handle_inbound got err: {}", err);
502 let _ = self.close();
503 }
504 } else {
505 trace!("discarding invalid partial_decode");
506 }
507 } }
509 }
510
511 pub fn stats(&self) -> AssociationStats {
513 self.stats
514 }
515
516 pub fn is_handshaking(&self) -> bool {
521 !self.handshake_completed
522 }
523
524 pub fn is_closed(&self) -> bool {
532 self.state == AssociationState::Closed
533 }
534
535 pub fn is_drained(&self) -> bool {
540 self.state.is_drained()
541 }
542
543 pub fn side(&self) -> Side {
545 self.side
546 }
547
548 pub fn remote_addr(&self) -> SocketAddr {
550 self.remote_addr
551 }
552
553 pub fn rtt(&self) -> Duration {
555 Duration::from_millis(self.rto_mgr.get_rto())
556 }
557
558 pub fn local_ip(&self) -> Option<IpAddr> {
573 self.local_ip
574 }
575
576 pub fn shutdown(&mut self) -> Result<()> {
580 debug!("[{}] closing association..", self.side);
581
582 let state = self.state();
583 if state != AssociationState::Established {
584 return Err(Error::ErrShutdownNonEstablished);
585 }
586
587 self.set_state(AssociationState::ShutdownPending);
589
590 if self.inflight_queue_length == 0 {
591 self.will_send_shutdown = true;
593 self.awake_write_loop();
594 self.set_state(AssociationState::ShutdownSent);
595 }
596
597 self.endpoint_events.push_back(EndpointEventInner::Drained);
598
599 Ok(())
600 }
601
602 pub fn close(&mut self) -> Result<()> {
604 if self.state() != AssociationState::Closed {
605 self.set_state(AssociationState::Closed);
606
607 debug!("[{}] closing association..", self.side);
608
609 self.close_all_timers();
610
611 for si in self.streams.keys().cloned().collect::<Vec<u16>>() {
612 self.unregister_stream(si);
613 }
614
615 debug!("[{}] association closed", self.side);
616 debug!(
617 "[{}] stats nDATAs (in) : {}",
618 self.side,
619 self.stats.get_num_datas()
620 );
621 debug!(
622 "[{}] stats nSACKs (in) : {}",
623 self.side,
624 self.stats.get_num_sacks()
625 );
626 debug!(
627 "[{}] stats nT3Timeouts : {}",
628 self.side,
629 self.stats.get_num_t3timeouts()
630 );
631 debug!(
632 "[{}] stats nAckTimeouts: {}",
633 self.side,
634 self.stats.get_num_ack_timeouts()
635 );
636 debug!(
637 "[{}] stats nFastRetrans: {}",
638 self.side,
639 self.stats.get_num_fast_retrans()
640 );
641 }
642
643 Ok(())
644 }
645
646 pub fn open_stream(
648 &mut self,
649 stream_identifier: StreamId,
650 default_payload_type: PayloadProtocolIdentifier,
651 ) -> Result<Stream<'_>> {
652 if self.streams.contains_key(&stream_identifier) {
653 return Err(Error::ErrStreamAlreadyExist);
654 }
655
656 if let Some(s) = self.create_stream(stream_identifier, false, default_payload_type) {
657 Ok(s)
658 } else {
659 Err(Error::ErrStreamCreateFailed)
660 }
661 }
662
663 pub fn accept_stream(&mut self) -> Option<Stream<'_>> {
665 self.stream_queue
666 .pop_front()
667 .map(move |stream_identifier| Stream {
668 stream_identifier,
669 association: self,
670 })
671 }
672
673 pub fn stream(&mut self, stream_identifier: StreamId) -> Result<Stream<'_>> {
675 if !self.streams.contains_key(&stream_identifier) {
676 Err(Error::ErrStreamNotExisted)
677 } else {
678 Ok(Stream {
679 stream_identifier,
680 association: self,
681 })
682 }
683 }
684
685 pub(crate) fn bytes_sent(&self) -> usize {
687 self.bytes_sent
688 }
689
690 pub(crate) fn bytes_received(&self) -> usize {
692 self.bytes_received
693 }
694
695 pub(crate) fn max_message_size(&self) -> u32 {
697 self.max_message_size
698 }
699
700 pub(crate) fn set_max_message_size(&mut self, max_message_size: u32) {
702 self.max_message_size = max_message_size;
703 }
704
705 fn unregister_stream(&mut self, stream_identifier: StreamId) {
708 if let Some(mut s) = self.streams.remove(&stream_identifier) {
709 debug!("[{}] unregister_stream {}", self.side, stream_identifier);
710 s.state = RecvSendState::Closed;
711 }
712 }
713
714 fn set_state(&mut self, new_state: AssociationState) {
716 if new_state != self.state {
717 debug!(
718 "[{}] state change: '{}' => '{}'",
719 self.side, self.state, new_state,
720 );
721 }
722 self.state = new_state;
723 }
724
725 pub(crate) fn state(&self) -> AssociationState {
727 self.state
728 }
729
730 fn send_init(&mut self) -> Result<()> {
732 if let Some(stored_init) = &self.stored_init {
733 debug!("[{}] sending INIT", self.side);
734
735 self.source_port = 5000; self.destination_port = 5000; let outbound = Packet {
739 common_header: CommonHeader {
740 source_port: self.source_port,
741 destination_port: self.destination_port,
742 verification_tag: self.peer_verification_tag,
743 },
744 chunks: vec![Box::new(stored_init.clone())],
745 };
746
747 self.control_queue.push_back(outbound);
748 self.awake_write_loop();
749
750 Ok(())
751 } else {
752 Err(Error::ErrInitNotStoredToSend)
753 }
754 }
755
756 fn send_cookie_echo(&mut self) -> Result<()> {
758 if let Some(stored_cookie_echo) = &self.stored_cookie_echo {
759 debug!("[{}] sending COOKIE-ECHO", self.side);
760
761 let outbound = Packet {
762 common_header: CommonHeader {
763 source_port: self.source_port,
764 destination_port: self.destination_port,
765 verification_tag: self.peer_verification_tag,
766 },
767 chunks: vec![Box::new(stored_cookie_echo.clone())],
768 };
769
770 self.control_queue.push_back(outbound);
771 self.awake_write_loop();
772
773 Ok(())
774 } else {
775 Err(Error::ErrCookieEchoNotStoredToSend)
776 }
777 }
778
779 fn handle_inbound(&mut self, p: Packet, now: Instant) -> Result<()> {
781 if let Err(err) = p.check_packet() {
782 warn!("[{}] failed validating packet {}", self.side, err);
783 return Ok(());
784 }
785
786 self.handle_chunk_start();
787
788 for c in &p.chunks {
789 self.handle_chunk(&p, c, now)?;
790 }
791
792 self.handle_chunk_end(now);
793
794 Ok(())
795 }
796
797 fn handle_chunk_start(&mut self) {
798 self.delayed_ack_triggered = false;
799 self.immediate_ack_triggered = false;
800 }
801
802 fn handle_chunk_end(&mut self, now: Instant) {
803 if self.immediate_ack_triggered {
804 self.ack_state = AckState::Immediate;
805 self.timers.stop(Timer::Ack);
806 self.awake_write_loop();
807 } else if self.delayed_ack_triggered {
808 self.ack_state = AckState::Delay;
810 self.timers.start(Timer::Ack, now, ACK_INTERVAL);
811 }
812 }
813
814 #[allow(clippy::borrowed_box)]
815 fn handle_chunk(
816 &mut self,
817 p: &Packet,
818 chunk: &Box<dyn Chunk + Send + Sync>,
819 now: Instant,
820 ) -> Result<()> {
821 chunk.check()?;
822 let chunk_any = chunk.as_any();
823 let packets = if let Some(c) = chunk_any.downcast_ref::<ChunkInit>() {
824 if c.is_ack {
825 self.handle_init_ack(p, c, now)?
826 } else {
827 self.handle_init(p, c)?
828 }
829 } else if let Some(c) = chunk_any.downcast_ref::<ChunkAbort>() {
830 let mut err_str = String::new();
831 for e in &c.error_causes {
832 if matches!(e.code, USER_INITIATED_ABORT) {
833 debug!("User initiated abort received");
834 let _ = self.close();
835 return Ok(());
836 }
837 err_str += &format!("({})", e);
838 }
839 return Err(Error::ErrAbortChunk(err_str));
840 } else if let Some(c) = chunk_any.downcast_ref::<ChunkError>() {
841 let mut err_str = String::new();
842 for e in &c.error_causes {
843 err_str += &format!("({})", e);
844 }
845 return Err(Error::ErrAbortChunk(err_str));
846 } else if let Some(c) = chunk_any.downcast_ref::<ChunkHeartbeat>() {
847 self.handle_heartbeat(c)?
848 } else if let Some(c) = chunk_any.downcast_ref::<ChunkCookieEcho>() {
849 self.handle_cookie_echo(c)?
850 } else if chunk_any.downcast_ref::<ChunkCookieAck>().is_some() {
851 self.handle_cookie_ack()?
852 } else if let Some(c) = chunk_any.downcast_ref::<ChunkPayloadData>() {
853 self.handle_data(c)?
854 } else if let Some(c) = chunk_any.downcast_ref::<ChunkSelectiveAck>() {
855 self.handle_sack(c, now)?
856 } else if let Some(c) = chunk_any.downcast_ref::<ChunkReconfig>() {
857 self.handle_reconfig(c)?
858 } else if let Some(c) = chunk_any.downcast_ref::<ChunkForwardTsn>() {
859 self.handle_forward_tsn(c)?
860 } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdown>() {
861 self.handle_shutdown(c)?
862 } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownAck>() {
863 self.handle_shutdown_ack(c)?
864 } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownComplete>() {
865 self.handle_shutdown_complete(c)?
866 } else {
867 return Err(Error::ErrChunkTypeUnhandled);
868 };
869
870 if !packets.is_empty() {
871 let mut buf: VecDeque<_> = packets.into_iter().collect();
872 self.control_queue.append(&mut buf);
873 self.awake_write_loop();
874 }
875
876 Ok(())
877 }
878
879 fn handle_init(&mut self, p: &Packet, i: &ChunkInit) -> Result<Vec<Packet>> {
880 let state = self.state();
881 debug!("[{}] chunkInit received in state '{}'", self.side, state);
882
883 if state != AssociationState::Closed
891 && state != AssociationState::CookieWait
892 && state != AssociationState::CookieEchoed
893 {
894 return Err(Error::ErrHandleInitState);
897 }
898
899 self.my_max_num_inbound_streams =
901 std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
902 self.my_max_num_outbound_streams =
903 std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
904 self.peer_verification_tag = i.initiate_tag;
905 self.source_port = p.common_header.destination_port;
906 self.destination_port = p.common_header.source_port;
907
908 self.peer_last_tsn = if i.initial_tsn == 0 {
913 u32::MAX
914 } else {
915 i.initial_tsn - 1
916 };
917
918 for param in &i.params {
919 if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
920 for t in &v.chunk_types {
921 if *t == CT_FORWARD_TSN {
922 debug!("[{}] use ForwardTSN (on init)", self.side);
923 self.use_forward_tsn = true;
924 }
925 }
926 }
927 }
928 if !self.use_forward_tsn {
929 warn!("[{}] not using ForwardTSN (on init)", self.side);
930 }
931
932 let mut outbound = Packet {
933 common_header: CommonHeader {
934 verification_tag: self.peer_verification_tag,
935 source_port: self.source_port,
936 destination_port: self.destination_port,
937 },
938 chunks: vec![],
939 };
940
941 let mut init_ack = ChunkInit {
942 is_ack: true,
943 initial_tsn: self.my_next_tsn,
944 num_outbound_streams: self.my_max_num_outbound_streams,
945 num_inbound_streams: self.my_max_num_inbound_streams,
946 initiate_tag: self.my_verification_tag,
947 advertised_receiver_window_credit: self.max_receive_buffer_size,
948 ..Default::default()
949 };
950
951 if self.my_cookie.is_none() {
952 self.my_cookie = Some(ParamStateCookie::new());
953 }
954
955 if let Some(my_cookie) = &self.my_cookie {
956 init_ack.params = vec![Box::new(my_cookie.clone())];
957 }
958
959 init_ack.set_supported_extensions();
960
961 outbound.chunks = vec![Box::new(init_ack)];
962
963 Ok(vec![outbound])
964 }
965
966 fn handle_init_ack(
967 &mut self,
968 p: &Packet,
969 i: &ChunkInitAck,
970 now: Instant,
971 ) -> Result<Vec<Packet>> {
972 let state = self.state();
973 debug!("[{}] chunkInitAck received in state '{}'", self.side, state);
974 if state != AssociationState::CookieWait {
975 return Ok(vec![]);
982 }
983
984 self.my_max_num_inbound_streams =
985 std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
986 self.my_max_num_outbound_streams =
987 std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
988 self.peer_verification_tag = i.initiate_tag;
989 self.peer_last_tsn = if i.initial_tsn == 0 {
990 u32::MAX
991 } else {
992 i.initial_tsn - 1
993 };
994 if self.source_port != p.common_header.destination_port
995 || self.destination_port != p.common_header.source_port
996 {
997 warn!("[{}] handle_init_ack: port mismatch", self.side);
998 return Ok(vec![]);
999 }
1000
1001 self.rwnd = i.advertised_receiver_window_credit;
1002 debug!("[{}] initial rwnd={}", self.side, self.rwnd);
1003
1004 self.ssthresh = self.rwnd;
1009 trace!(
1010 "[{}] updated cwnd={} ssthresh={} inflight={} (INI)",
1011 self.side,
1012 self.cwnd,
1013 self.ssthresh,
1014 self.inflight_queue.get_num_bytes()
1015 );
1016
1017 self.timers.stop(Timer::T1Init);
1018 self.stored_init = None;
1019
1020 let mut cookie_param = None;
1021 for param in &i.params {
1022 if let Some(v) = param.as_any().downcast_ref::<ParamStateCookie>() {
1023 cookie_param = Some(v);
1024 } else if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
1025 for t in &v.chunk_types {
1026 if *t == CT_FORWARD_TSN {
1027 debug!("[{}] use ForwardTSN (on initAck)", self.side);
1028 self.use_forward_tsn = true;
1029 }
1030 }
1031 }
1032 }
1033 if !self.use_forward_tsn {
1034 warn!("[{}] not using ForwardTSN (on initAck)", self.side);
1035 }
1036
1037 if let Some(v) = cookie_param {
1038 self.stored_cookie_echo = Some(ChunkCookieEcho {
1039 cookie: v.cookie.clone(),
1040 });
1041
1042 self.send_cookie_echo()?;
1043
1044 self.timers
1045 .start(Timer::T1Cookie, now, self.rto_mgr.get_rto());
1046
1047 self.set_state(AssociationState::CookieEchoed);
1048
1049 Ok(vec![])
1050 } else {
1051 Err(Error::ErrInitAckNoCookie)
1052 }
1053 }
1054
1055 fn handle_heartbeat(&self, c: &ChunkHeartbeat) -> Result<Vec<Packet>> {
1056 trace!("[{}] chunkHeartbeat", self.side);
1057 if let Some(p) = c.params.first() {
1058 if let Some(hbi) = p.as_any().downcast_ref::<ParamHeartbeatInfo>() {
1059 return Ok(vec![Packet {
1060 common_header: CommonHeader {
1061 verification_tag: self.peer_verification_tag,
1062 source_port: self.source_port,
1063 destination_port: self.destination_port,
1064 },
1065 chunks: vec![Box::new(ChunkHeartbeatAck {
1066 params: vec![Box::new(ParamHeartbeatInfo {
1067 heartbeat_information: hbi.heartbeat_information.clone(),
1068 })],
1069 })],
1070 }]);
1071 } else {
1072 warn!(
1073 "[{}] failed to handle Heartbeat, no ParamHeartbeatInfo",
1074 self.side,
1075 );
1076 }
1077 }
1078
1079 Ok(vec![])
1080 }
1081
1082 fn handle_cookie_echo(&mut self, c: &ChunkCookieEcho) -> Result<Vec<Packet>> {
1083 let state = self.state();
1084 debug!("[{}] COOKIE-ECHO received in state '{}'", self.side, state);
1085
1086 if let Some(my_cookie) = &self.my_cookie {
1087 match state {
1088 AssociationState::Established => {
1089 if my_cookie.cookie != c.cookie {
1090 return Ok(vec![]);
1091 }
1092 }
1093 AssociationState::Closed
1094 | AssociationState::CookieWait
1095 | AssociationState::CookieEchoed => {
1096 if my_cookie.cookie != c.cookie {
1097 return Ok(vec![]);
1098 }
1099
1100 self.timers.stop(Timer::T1Init);
1101 self.stored_init = None;
1102
1103 self.timers.stop(Timer::T1Cookie);
1104 self.stored_cookie_echo = None;
1105
1106 self.events.push_back(Event::Connected);
1107 self.set_state(AssociationState::Established);
1108 self.handshake_completed = true;
1109 }
1110 _ => return Ok(vec![]),
1111 };
1112 } else {
1113 debug!("[{}] COOKIE-ECHO received before initialization", self.side);
1114 return Ok(vec![]);
1115 }
1116
1117 Ok(vec![Packet {
1118 common_header: CommonHeader {
1119 verification_tag: self.peer_verification_tag,
1120 source_port: self.source_port,
1121 destination_port: self.destination_port,
1122 },
1123 chunks: vec![Box::new(ChunkCookieAck {})],
1124 }])
1125 }
1126
1127 fn handle_cookie_ack(&mut self) -> Result<Vec<Packet>> {
1128 let state = self.state();
1129 debug!("[{}] COOKIE-ACK received in state '{}'", self.side, state);
1130 if state != AssociationState::CookieEchoed {
1131 return Ok(vec![]);
1136 }
1137
1138 self.timers.stop(Timer::T1Cookie);
1139 self.stored_cookie_echo = None;
1140
1141 self.events.push_back(Event::Connected);
1142 self.set_state(AssociationState::Established);
1143 self.handshake_completed = true;
1144
1145 Ok(vec![])
1146 }
1147
1148 fn handle_data(&mut self, d: &ChunkPayloadData) -> Result<Vec<Packet>> {
1149 trace!(
1150 "[{}] DATA: tsn={} immediateSack={} len={}",
1151 self.side,
1152 d.tsn,
1153 d.immediate_sack,
1154 d.user_data.len()
1155 );
1156 self.stats.inc_datas();
1157
1158 let can_push = self.payload_queue.can_push(d, self.peer_last_tsn);
1159 let mut stream_handle_data = false;
1160 if can_push {
1161 if self.get_or_create_stream(d.stream_identifier).is_some() {
1162 if self.get_my_receiver_window_credit() > 0 {
1163 self.payload_queue.push(d.clone(), self.peer_last_tsn);
1165 stream_handle_data = true;
1166 } else {
1167 if let Some(last_tsn) = self.payload_queue.get_last_tsn_received() {
1169 if sna32lt(d.tsn, *last_tsn) {
1170 debug!("[{}] receive buffer full, but accepted as this is a missing chunk with tsn={} ssn={}", self.side, d.tsn, d.stream_sequence_number);
1171 self.payload_queue.push(d.clone(), self.peer_last_tsn);
1172 stream_handle_data = true; }
1174 } else {
1175 debug!(
1176 "[{}] receive buffer full. dropping DATA with tsn={} ssn={}",
1177 self.side, d.tsn, d.stream_sequence_number
1178 );
1179 }
1180 }
1181 } else {
1182 debug!("[{}] discard {}", self.side, d.stream_sequence_number);
1185 return Ok(vec![]);
1186 }
1187 }
1188
1189 let immediate_sack = d.immediate_sack;
1190
1191 if stream_handle_data {
1192 if let Some(s) = self.streams.get_mut(&d.stream_identifier) {
1193 self.events.push_back(Event::DatagramReceived);
1194 s.handle_data(d);
1195 if s.reassembly_queue.is_readable() {
1196 self.events.push_back(Event::Stream(StreamEvent::Readable {
1197 id: d.stream_identifier,
1198 }))
1199 }
1200 }
1201 }
1202
1203 self.handle_peer_last_tsn_and_acknowledgement(immediate_sack)
1204 }
1205
1206 fn handle_sack(&mut self, d: &ChunkSelectiveAck, now: Instant) -> Result<Vec<Packet>> {
1207 trace!(
1208 "[{}] {}, SACK: cumTSN={} a_rwnd={}",
1209 self.side,
1210 self.cumulative_tsn_ack_point,
1211 d.cumulative_tsn_ack,
1212 d.advertised_receiver_window_credit
1213 );
1214 let state = self.state();
1215 if state != AssociationState::Established
1216 && state != AssociationState::ShutdownPending
1217 && state != AssociationState::ShutdownReceived
1218 {
1219 return Ok(vec![]);
1220 }
1221
1222 self.stats.inc_sacks();
1223
1224 if sna32gt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1225 debug!(
1234 "[{}] SACK Cumulative ACK {} is older than ACK point {}",
1235 self.side, d.cumulative_tsn_ack, self.cumulative_tsn_ack_point
1236 );
1237
1238 return Ok(vec![]);
1239 }
1240
1241 let (bytes_acked_per_stream, htna) = self.process_selective_ack(d, now)?;
1243
1244 let mut total_bytes_acked = 0;
1245 for n_bytes_acked in bytes_acked_per_stream.values() {
1246 total_bytes_acked += *n_bytes_acked;
1247 }
1248
1249 let mut cum_tsn_ack_point_advanced = false;
1250 if sna32lt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1251 trace!(
1252 "[{}] SACK: cumTSN advanced: {} -> {}",
1253 self.side,
1254 self.cumulative_tsn_ack_point,
1255 d.cumulative_tsn_ack
1256 );
1257
1258 self.cumulative_tsn_ack_point = d.cumulative_tsn_ack;
1259 cum_tsn_ack_point_advanced = true;
1260 self.on_cumulative_tsn_ack_point_advanced(total_bytes_acked, now);
1261 }
1262
1263 for (si, n_bytes_acked) in &bytes_acked_per_stream {
1264 if let Some(s) = self.streams.get_mut(si) {
1265 if s.on_buffer_released(*n_bytes_acked) {
1266 self.events
1267 .push_back(Event::Stream(StreamEvent::BufferedAmountLow { id: *si }))
1268 }
1269 }
1270 }
1271
1272 let bytes_outstanding = self.inflight_queue.get_num_bytes() as u32;
1281 if bytes_outstanding >= d.advertised_receiver_window_credit {
1282 self.rwnd = 0;
1283 } else {
1284 self.rwnd = d.advertised_receiver_window_credit - bytes_outstanding;
1285 }
1286
1287 self.process_fast_retransmission(d.cumulative_tsn_ack, htna, cum_tsn_ack_point_advanced)?;
1288
1289 if self.use_forward_tsn {
1290 if sna32lt(
1292 self.advanced_peer_tsn_ack_point,
1293 self.cumulative_tsn_ack_point,
1294 ) {
1295 self.advanced_peer_tsn_ack_point = self.cumulative_tsn_ack_point
1296 }
1297
1298 let mut i = self.advanced_peer_tsn_ack_point + 1;
1300 while let Some(c) = self.inflight_queue.get(i) {
1301 if !c.abandoned() {
1302 break;
1303 }
1304 self.advanced_peer_tsn_ack_point = i;
1305 i += 1;
1306 }
1307
1308 if sna32gt(
1310 self.advanced_peer_tsn_ack_point,
1311 self.cumulative_tsn_ack_point,
1312 ) {
1313 self.will_send_forward_tsn = true;
1314 debug!(
1315 "[{}] handleSack {}: sna32GT({}, {})",
1316 self.side,
1317 self.will_send_forward_tsn,
1318 self.advanced_peer_tsn_ack_point,
1319 self.cumulative_tsn_ack_point
1320 );
1321 }
1322 self.awake_write_loop();
1323 }
1324
1325 self.postprocess_sack(state, cum_tsn_ack_point_advanced, now);
1326
1327 Ok(vec![])
1328 }
1329
1330 fn handle_reconfig(&mut self, c: &ChunkReconfig) -> Result<Vec<Packet>> {
1331 trace!("[{}] handle_reconfig", self.side);
1332
1333 let mut pp = vec![];
1334
1335 if let Some(param_a) = &c.param_a {
1336 self.handle_reconfig_param(param_a, &mut pp)?;
1337 }
1338
1339 if let Some(param_b) = &c.param_b {
1340 self.handle_reconfig_param(param_b, &mut pp)?;
1341 }
1342
1343 Ok(pp)
1344 }
1345
1346 fn handle_forward_tsn(&mut self, c: &ChunkForwardTsn) -> Result<Vec<Packet>> {
1347 trace!("[{}] FwdTSN: {}", self.side, c.to_string());
1348
1349 if !self.use_forward_tsn {
1350 warn!("[{}] received FwdTSN but not enabled", self.side);
1351 let cerr = ChunkError {
1353 error_causes: vec![ErrorCauseUnrecognizedChunkType::default()],
1354 };
1355
1356 let outbound = Packet {
1357 common_header: CommonHeader {
1358 verification_tag: self.peer_verification_tag,
1359 source_port: self.source_port,
1360 destination_port: self.destination_port,
1361 },
1362 chunks: vec![Box::new(cerr)],
1363 };
1364 return Ok(vec![outbound]);
1365 }
1366
1367 trace!(
1376 "[{}] should send ack? newCumTSN={} peer_last_tsn={}",
1377 self.side,
1378 c.new_cumulative_tsn,
1379 self.peer_last_tsn
1380 );
1381 if sna32lte(c.new_cumulative_tsn, self.peer_last_tsn) {
1382 trace!("[{}] sending ack on Forward TSN", self.side);
1383 self.ack_state = AckState::Immediate;
1384 self.timers.stop(Timer::Ack);
1385 self.awake_write_loop();
1386 return Ok(vec![]);
1387 }
1388
1389 while sna32lt(self.peer_last_tsn, c.new_cumulative_tsn) {
1401 self.payload_queue.pop(self.peer_last_tsn + 1); self.peer_last_tsn += 1;
1403 }
1404
1405 for forwarded in &c.streams {
1409 if let Some(s) = self.streams.get_mut(&forwarded.identifier) {
1410 s.handle_forward_tsn_for_ordered(forwarded.sequence);
1411 }
1412 }
1413
1414 for s in self.streams.values_mut() {
1420 s.handle_forward_tsn_for_unordered(c.new_cumulative_tsn);
1421 }
1422
1423 self.handle_peer_last_tsn_and_acknowledgement(false)
1424 }
1425
1426 fn handle_shutdown(&mut self, _: &ChunkShutdown) -> Result<Vec<Packet>> {
1427 let state = self.state();
1428
1429 if state == AssociationState::Established {
1430 if !self.inflight_queue.is_empty() {
1431 self.set_state(AssociationState::ShutdownReceived);
1432 } else {
1433 self.will_send_shutdown_ack = true;
1435 self.set_state(AssociationState::ShutdownAckSent);
1436
1437 self.awake_write_loop();
1438 }
1439 } else if state == AssociationState::ShutdownSent {
1440 self.will_send_shutdown_ack = true;
1443 self.set_state(AssociationState::ShutdownAckSent);
1444
1445 self.awake_write_loop();
1446 }
1447
1448 Ok(vec![])
1449 }
1450
1451 fn handle_shutdown_ack(&mut self, _: &ChunkShutdownAck) -> Result<Vec<Packet>> {
1452 let state = self.state();
1453 if state == AssociationState::ShutdownSent || state == AssociationState::ShutdownAckSent {
1454 self.timers.stop(Timer::T2Shutdown);
1455 self.will_send_shutdown_complete = true;
1456
1457 self.awake_write_loop();
1458 }
1459
1460 Ok(vec![])
1461 }
1462
1463 fn handle_shutdown_complete(&mut self, _: &ChunkShutdownComplete) -> Result<Vec<Packet>> {
1464 let state = self.state();
1465 if state == AssociationState::ShutdownAckSent {
1466 self.timers.stop(Timer::T2Shutdown);
1467 self.close()?;
1468 }
1469
1470 Ok(vec![])
1471 }
1472
1473 fn handle_peer_last_tsn_and_acknowledgement(
1475 &mut self,
1476 sack_immediately: bool,
1477 ) -> Result<Vec<Packet>> {
1478 let mut reply = vec![];
1479
1480 while self.payload_queue.pop(self.peer_last_tsn + 1).is_some() {
1489 self.peer_last_tsn += 1;
1490 let rst_reqs: Vec<ParamOutgoingResetRequest> =
1493 self.reconfig_requests.values().cloned().collect();
1494 for rst_req in rst_reqs {
1495 self.reset_streams_if_any(&rst_req, false, &mut reply)?;
1496 }
1497 }
1498
1499 let has_packet_loss = !self.payload_queue.is_empty();
1500 if has_packet_loss {
1501 trace!(
1502 "[{}] packetloss: {}",
1503 self.side,
1504 self.payload_queue
1505 .get_gap_ack_blocks_string(self.peer_last_tsn)
1506 );
1507 }
1508
1509 if (self.ack_state != AckState::Immediate
1510 && !sack_immediately
1511 && !has_packet_loss
1512 && self.ack_mode == AckMode::Normal)
1513 || self.ack_mode == AckMode::AlwaysDelay
1514 {
1515 if self.ack_state == AckState::Idle {
1516 self.delayed_ack_triggered = true;
1517 } else {
1518 self.immediate_ack_triggered = true;
1519 }
1520 } else {
1521 self.immediate_ack_triggered = true;
1522 }
1523
1524 Ok(reply)
1525 }
1526
1527 #[allow(clippy::borrowed_box)]
1528 fn handle_reconfig_param(
1529 &mut self,
1530 raw: &Box<dyn Param + Send + Sync>,
1531 reply: &mut Vec<Packet>,
1532 ) -> Result<()> {
1533 if let Some(p) = raw.as_any().downcast_ref::<ParamOutgoingResetRequest>() {
1534 self.reconfig_requests
1535 .insert(p.reconfig_request_sequence_number, p.clone());
1536 self.reset_streams_if_any(p, true, reply)?;
1537 Ok(())
1538 } else if let Some(p) = raw.as_any().downcast_ref::<ParamReconfigResponse>() {
1539 self.reconfigs.remove(&p.reconfig_response_sequence_number);
1540 if self.reconfigs.is_empty() {
1541 self.timers.stop(Timer::Reconfig);
1542 }
1543 Ok(())
1544 } else {
1545 Err(Error::ErrParameterType)
1546 }
1547 }
1548
1549 fn process_selective_ack(
1550 &mut self,
1551 d: &ChunkSelectiveAck,
1552 now: Instant,
1553 ) -> Result<(HashMap<u16, i64>, u32)> {
1554 let mut bytes_acked_per_stream = HashMap::new();
1555
1556 let mut i = self.cumulative_tsn_ack_point + 1;
1560 while sna32lte(i, d.cumulative_tsn_ack) {
1562 if let Some(c) = self.inflight_queue.pop(i) {
1563 if !c.acked {
1564 if i == self.cumulative_tsn_ack_point + 1 {
1570 self.timers.stop(Timer::T3RTX);
1572 }
1573
1574 let n_bytes_acked = c.user_data.len() as i64;
1575
1576 if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1578 *amount += n_bytes_acked;
1579 } else {
1580 bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1581 }
1582
1583 if c.nsent == 1 && sna32gte(c.tsn, self.min_tsn2measure_rtt) {
1593 self.min_tsn2measure_rtt = self.my_next_tsn;
1594 if let Some(since) = &c.since {
1595 let rtt = now.duration_since(*since);
1596 let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1597 trace!(
1598 "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1599 self.side,
1600 rtt.as_millis(),
1601 srtt,
1602 self.rto_mgr.get_rto()
1603 );
1604 } else {
1605 error!("[{}] invalid c.since", self.side);
1606 }
1607 }
1608 }
1609
1610 if self.in_fast_recovery && c.tsn == self.fast_recover_exit_point {
1611 debug!("[{}] exit fast-recovery", self.side);
1612 self.in_fast_recovery = false;
1613 }
1614 } else {
1615 return Err(Error::ErrInflightQueueTsnPop);
1616 }
1617
1618 i += 1;
1619 }
1620
1621 let mut htna = d.cumulative_tsn_ack;
1622
1623 for g in &d.gap_ack_blocks {
1625 for i in g.start..=g.end {
1626 let tsn = d.cumulative_tsn_ack + i as u32;
1627
1628 let (is_existed, is_acked) = if let Some(c) = self.inflight_queue.get(tsn) {
1629 (true, c.acked)
1630 } else {
1631 (false, false)
1632 };
1633 let n_bytes_acked = if is_existed && !is_acked {
1634 self.inflight_queue.mark_as_acked(tsn) as i64
1635 } else {
1636 0
1637 };
1638
1639 if let Some(c) = self.inflight_queue.get(tsn) {
1640 if !is_acked {
1641 if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1643 *amount += n_bytes_acked;
1644 } else {
1645 bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1646 }
1647
1648 trace!("[{}] tsn={} has been sacked", self.side, c.tsn);
1649
1650 if c.nsent == 1 {
1651 self.min_tsn2measure_rtt = self.my_next_tsn;
1652 if let Some(since) = &c.since {
1653 let rtt = now.duration_since(*since);
1654 let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1655 trace!(
1656 "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1657 self.side,
1658 rtt.as_millis(),
1659 srtt,
1660 self.rto_mgr.get_rto()
1661 );
1662 } else {
1663 error!("[{}] invalid c.since", self.side);
1664 }
1665 }
1666
1667 if sna32lt(htna, tsn) {
1668 htna = tsn;
1669 }
1670 }
1671 } else {
1672 return Err(Error::ErrTsnRequestNotExist);
1673 }
1674 }
1675 }
1676
1677 Ok((bytes_acked_per_stream, htna))
1678 }
1679
1680 fn on_cumulative_tsn_ack_point_advanced(&mut self, total_bytes_acked: i64, now: Instant) {
1681 if self.inflight_queue.is_empty() {
1685 trace!(
1686 "[{}] SACK: no more packet in-flight (pending={})",
1687 self.side,
1688 self.pending_queue.len()
1689 );
1690 self.timers.stop(Timer::T3RTX);
1691 } else {
1692 trace!("[{}] T3-rtx timer start (pt2)", self.side);
1693 self.timers
1694 .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1695 }
1696
1697 if self.cwnd <= self.ssthresh {
1699 if !self.in_fast_recovery && !self.pending_queue.is_empty() {
1711 self.cwnd += std::cmp::min(total_bytes_acked as u32, self.cwnd); trace!(
1714 "[{}] updated cwnd={} ssthresh={} acked={} (SS)",
1715 self.side,
1716 self.cwnd,
1717 self.ssthresh,
1718 total_bytes_acked
1719 );
1720 } else {
1721 trace!(
1722 "[{}] cwnd did not grow: cwnd={} ssthresh={} acked={} FR={} pending={}",
1723 self.side,
1724 self.cwnd,
1725 self.ssthresh,
1726 total_bytes_acked,
1727 self.in_fast_recovery,
1728 self.pending_queue.len()
1729 );
1730 }
1731 } else {
1732 self.partial_bytes_acked += total_bytes_acked as u32;
1739
1740 if self.partial_bytes_acked >= self.cwnd && !self.pending_queue.is_empty() {
1746 self.partial_bytes_acked -= self.cwnd;
1747 self.cwnd += self.mtu;
1748 trace!(
1749 "[{}] updated cwnd={} ssthresh={} acked={} (CA)",
1750 self.side,
1751 self.cwnd,
1752 self.ssthresh,
1753 total_bytes_acked
1754 );
1755 }
1756 }
1757 }
1758
1759 fn process_fast_retransmission(
1760 &mut self,
1761 cum_tsn_ack_point: u32,
1762 htna: u32,
1763 cum_tsn_ack_point_advanced: bool,
1764 ) -> Result<()> {
1765 if !self.in_fast_recovery || cum_tsn_ack_point_advanced {
1775 let max_tsn = if !self.in_fast_recovery {
1776 htna
1778 } else {
1779 cum_tsn_ack_point + (self.inflight_queue.len() as u32) + 1
1781 };
1782
1783 let mut tsn = cum_tsn_ack_point + 1;
1784 while sna32lt(tsn, max_tsn) {
1785 if let Some(c) = self.inflight_queue.get_mut(tsn) {
1786 if !c.acked && !c.abandoned() && c.miss_indicator < 3 {
1787 c.miss_indicator += 1;
1788 if c.miss_indicator == 3 && !self.in_fast_recovery {
1789 self.in_fast_recovery = true;
1793 self.fast_recover_exit_point = htna;
1794 self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
1795 self.cwnd = self.ssthresh;
1796 self.partial_bytes_acked = 0;
1797 self.will_retransmit_fast = true;
1798
1799 trace!(
1800 "[{}] updated cwnd={} ssthresh={} inflight={} (FR)",
1801 self.side,
1802 self.cwnd,
1803 self.ssthresh,
1804 self.inflight_queue.get_num_bytes()
1805 );
1806 }
1807 }
1808 } else {
1809 return Err(Error::ErrTsnRequestNotExist);
1810 }
1811
1812 tsn += 1;
1813 }
1814 }
1815
1816 if self.in_fast_recovery && cum_tsn_ack_point_advanced {
1817 self.will_retransmit_fast = true;
1818 }
1819
1820 Ok(())
1821 }
1822
1823 fn postprocess_sack(
1826 &mut self,
1827 state: AssociationState,
1828 mut should_awake_write_loop: bool,
1829 now: Instant,
1830 ) {
1831 if !self.inflight_queue.is_empty() {
1832 trace!("[{}] T3-rtx timer start (pt3)", self.side);
1834 self.timers
1835 .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1836 } else if state == AssociationState::ShutdownPending {
1837 should_awake_write_loop = true;
1839 self.will_send_shutdown = true;
1840 self.set_state(AssociationState::ShutdownSent);
1841 } else if state == AssociationState::ShutdownReceived {
1842 should_awake_write_loop = true;
1844 self.will_send_shutdown_ack = true;
1845 self.set_state(AssociationState::ShutdownAckSent);
1846 }
1847
1848 if should_awake_write_loop {
1849 self.awake_write_loop();
1850 }
1851 }
1852
1853 fn reset_streams_if_any(
1854 &mut self,
1855 p: &ParamOutgoingResetRequest,
1856 respond: bool,
1857 reply: &mut Vec<Packet>,
1858 ) -> Result<()> {
1859 let mut result = ReconfigResult::SuccessPerformed;
1860 let mut sis_to_reset = vec![];
1861
1862 if sna32lte(p.sender_last_tsn, self.peer_last_tsn) {
1863 debug!(
1864 "[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}",
1865 self.side, p.sender_last_tsn, self.peer_last_tsn
1866 );
1867 for id in &p.stream_identifiers {
1868 if self.streams.contains_key(id) {
1869 if respond {
1870 sis_to_reset.push(*id);
1871 }
1872 self.unregister_stream(*id);
1873 }
1874 }
1875 self.reconfig_requests
1876 .remove(&p.reconfig_request_sequence_number);
1877 } else {
1878 debug!(
1879 "[{}] resetStream(): senderLastTSN={} > peer_last_tsn={}",
1880 self.side, p.sender_last_tsn, self.peer_last_tsn
1881 );
1882 result = ReconfigResult::InProgress;
1883 }
1884
1885 if !sis_to_reset.is_empty() {
1888 let rsn = self.generate_next_rsn();
1889 let tsn = self.my_next_tsn - 1;
1890
1891 let c = ChunkReconfig {
1892 param_a: Some(Box::new(ParamOutgoingResetRequest {
1893 reconfig_request_sequence_number: rsn,
1894 reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1895 sender_last_tsn: tsn,
1896 stream_identifiers: sis_to_reset,
1897 })),
1898 ..Default::default()
1899 };
1900
1901 self.reconfigs.insert(rsn, c.clone()); let p = self.create_packet(vec![Box::new(c)]);
1904 reply.push(p);
1905 }
1906
1907 let packet = self.create_packet(vec![Box::new(ChunkReconfig {
1908 param_a: Some(Box::new(ParamReconfigResponse {
1909 reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1910 result,
1911 })),
1912 param_b: None,
1913 })]);
1914
1915 debug!("[{}] RESET RESPONSE: {}", self.side, packet);
1916
1917 reply.push(packet);
1918
1919 Ok(())
1920 }
1921
1922 pub(crate) fn create_packet(&self, chunks: Vec<Box<dyn Chunk + Send + Sync>>) -> Packet {
1925 Packet {
1926 common_header: CommonHeader {
1927 verification_tag: self.peer_verification_tag,
1928 source_port: self.source_port,
1929 destination_port: self.destination_port,
1930 },
1931 chunks,
1932 }
1933 }
1934
1935 fn create_stream(
1937 &mut self,
1938 stream_identifier: StreamId,
1939 accept: bool,
1940 default_payload_type: PayloadProtocolIdentifier,
1941 ) -> Option<Stream<'_>> {
1942 let s = StreamState::new(
1943 self.side,
1944 stream_identifier,
1945 self.max_payload_size,
1946 default_payload_type,
1947 );
1948
1949 if accept {
1950 self.stream_queue.push_back(stream_identifier);
1951 self.events.push_back(Event::Stream(StreamEvent::Opened));
1952 }
1953
1954 self.streams.insert(stream_identifier, s);
1955
1956 Some(Stream {
1957 stream_identifier,
1958 association: self,
1959 })
1960 }
1961
1962 fn get_or_create_stream(&mut self, stream_identifier: StreamId) -> Option<Stream<'_>> {
1964 if self.streams.contains_key(&stream_identifier) {
1965 Some(Stream {
1966 stream_identifier,
1967 association: self,
1968 })
1969 } else {
1970 self.create_stream(
1971 stream_identifier,
1972 true,
1973 PayloadProtocolIdentifier::default(),
1974 )
1975 }
1976 }
1977
1978 pub(crate) fn get_my_receiver_window_credit(&self) -> u32 {
1979 let mut bytes_queued = 0;
1980 for s in self.streams.values() {
1981 bytes_queued += s.get_num_bytes_in_reassembly_queue() as u32;
1982 }
1983
1984 if bytes_queued >= self.max_receive_buffer_size {
1985 0
1986 } else {
1987 self.max_receive_buffer_size - bytes_queued
1988 }
1989 }
1990
1991 fn gather_outbound(&mut self, now: Instant) -> (Vec<Bytes>, bool) {
1994 let mut raw_packets = vec![];
1995
1996 if !self.control_queue.is_empty() {
1997 for p in self.control_queue.drain(..) {
1998 if let Ok(raw) = p.marshal() {
1999 raw_packets.push(raw);
2000 } else {
2001 warn!("[{}] failed to serialize a control packet", self.side);
2002 continue;
2003 }
2004 }
2005 }
2006
2007 let state = self.state();
2008 match state {
2009 AssociationState::Established => {
2010 raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2011 raw_packets = self.gather_outbound_data_and_reconfig_packets(raw_packets, now);
2012 raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2013 raw_packets = self.gather_outbound_sack_packets(raw_packets);
2014 raw_packets = self.gather_outbound_forward_tsn_packets(raw_packets);
2015 (raw_packets, true)
2016 }
2017 AssociationState::ShutdownPending
2018 | AssociationState::ShutdownSent
2019 | AssociationState::ShutdownReceived => {
2020 raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2021 raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2022 raw_packets = self.gather_outbound_sack_packets(raw_packets);
2023 self.gather_outbound_shutdown_packets(raw_packets, now)
2024 }
2025 AssociationState::ShutdownAckSent => {
2026 self.gather_outbound_shutdown_packets(raw_packets, now)
2027 }
2028 _ => (raw_packets, true),
2029 }
2030 }
2031
2032 fn gather_data_packets_to_retransmit(
2033 &mut self,
2034 mut raw_packets: Vec<Bytes>,
2035 now: Instant,
2036 ) -> Vec<Bytes> {
2037 for p in &self.get_data_packets_to_retransmit(now) {
2038 if let Ok(raw) = p.marshal() {
2039 raw_packets.push(raw);
2040 } else {
2041 warn!(
2042 "[{}] failed to serialize a DATA packet to be retransmitted",
2043 self.side
2044 );
2045 }
2046 }
2047
2048 raw_packets
2049 }
2050
2051 fn gather_outbound_data_and_reconfig_packets(
2052 &mut self,
2053 mut raw_packets: Vec<Bytes>,
2054 now: Instant,
2055 ) -> Vec<Bytes> {
2056 let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send(now);
2059 if !chunks.is_empty() {
2060 trace!("[{}] T3-rtx timer start (pt1)", self.side);
2062 self.timers
2063 .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
2064
2065 for p in &self.bundle_data_chunks_into_packets(chunks) {
2066 if let Ok(raw) = p.marshal() {
2067 raw_packets.push(raw);
2068 } else {
2069 warn!("[{}] failed to serialize a DATA packet", self.side);
2070 }
2071 }
2072 }
2073
2074 if !sis_to_reset.is_empty() || self.will_retransmit_reconfig {
2075 if self.will_retransmit_reconfig {
2076 self.will_retransmit_reconfig = false;
2077 debug!(
2078 "[{}] retransmit {} RECONFIG chunk(s)",
2079 self.side,
2080 self.reconfigs.len()
2081 );
2082 for c in self.reconfigs.values() {
2083 let p = self.create_packet(vec![Box::new(c.clone())]);
2084 if let Ok(raw) = p.marshal() {
2085 raw_packets.push(raw);
2086 } else {
2087 warn!(
2088 "[{}] failed to serialize a RECONFIG packet to be retransmitted",
2089 self.side,
2090 );
2091 }
2092 }
2093 }
2094
2095 if !sis_to_reset.is_empty() {
2096 let rsn = self.generate_next_rsn();
2097 let tsn = self.my_next_tsn - 1;
2098 debug!(
2099 "[{}] sending RECONFIG: rsn={} tsn={} streams={:?}",
2100 self.side,
2101 rsn,
2102 self.my_next_tsn - 1,
2103 sis_to_reset
2104 );
2105
2106 let c = ChunkReconfig {
2107 param_a: Some(Box::new(ParamOutgoingResetRequest {
2108 reconfig_request_sequence_number: rsn,
2109 sender_last_tsn: tsn,
2110 stream_identifiers: sis_to_reset,
2111 ..Default::default()
2112 })),
2113 ..Default::default()
2114 };
2115 self.reconfigs.insert(rsn, c.clone()); let p = self.create_packet(vec![Box::new(c)]);
2118 if let Ok(raw) = p.marshal() {
2119 raw_packets.push(raw);
2120 } else {
2121 warn!(
2122 "[{}] failed to serialize a RECONFIG packet to be transmitted",
2123 self.side
2124 );
2125 }
2126 }
2127
2128 if !self.reconfigs.is_empty() {
2129 self.timers
2130 .start(Timer::Reconfig, now, self.rto_mgr.get_rto());
2131 }
2132 }
2133
2134 raw_packets
2135 }
2136
2137 fn gather_outbound_fast_retransmission_packets(
2138 &mut self,
2139 mut raw_packets: Vec<Bytes>,
2140 now: Instant,
2141 ) -> Vec<Bytes> {
2142 if self.will_retransmit_fast {
2143 self.will_retransmit_fast = false;
2144
2145 let mut to_fast_retrans: Vec<Box<dyn Chunk + Send + Sync>> = vec![];
2146 let mut fast_retrans_size = COMMON_HEADER_SIZE;
2147
2148 let mut i = 0;
2149 loop {
2150 let tsn = self.cumulative_tsn_ack_point + i + 1;
2151 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2152 if c.acked || c.abandoned() || c.nsent > 1 || c.miss_indicator < 3 {
2153 i += 1;
2154 continue;
2155 }
2156
2157 let data_chunk_size = DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2168 if self.mtu < fast_retrans_size + data_chunk_size {
2169 break;
2170 }
2171
2172 fast_retrans_size += data_chunk_size;
2173 self.stats.inc_fast_retrans();
2174 c.nsent += 1;
2175 } else {
2176 break; }
2178
2179 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2180 Association::check_partial_reliability_status(
2181 c,
2182 now,
2183 self.use_forward_tsn,
2184 self.side,
2185 &self.streams,
2186 );
2187 to_fast_retrans.push(Box::new(c.clone()));
2188 trace!(
2189 "[{}] fast-retransmit: tsn={} sent={} htna={}",
2190 self.side,
2191 c.tsn,
2192 c.nsent,
2193 self.fast_recover_exit_point
2194 );
2195 }
2196 i += 1;
2197 }
2198
2199 if !to_fast_retrans.is_empty() {
2200 if let Ok(raw) = self.create_packet(to_fast_retrans).marshal() {
2201 raw_packets.push(raw);
2202 } else {
2203 warn!(
2204 "[{}] failed to serialize a DATA packet to be fast-retransmitted",
2205 self.side
2206 );
2207 }
2208 }
2209 }
2210
2211 raw_packets
2212 }
2213
2214 fn gather_outbound_sack_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2215 if self.ack_state == AckState::Immediate {
2216 self.ack_state = AckState::Idle;
2217 let sack = self.create_selective_ack_chunk();
2218 trace!("[{}] sending SACK: {}", self.side, sack);
2219 if let Ok(raw) = self.create_packet(vec![Box::new(sack)]).marshal() {
2220 raw_packets.push(raw);
2221 } else {
2222 warn!("[{}] failed to serialize a SACK packet", self.side);
2223 }
2224 }
2225
2226 raw_packets
2227 }
2228
2229 fn gather_outbound_forward_tsn_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2230 if self.will_send_forward_tsn {
2236 self.will_send_forward_tsn = false;
2237 if sna32gt(
2238 self.advanced_peer_tsn_ack_point,
2239 self.cumulative_tsn_ack_point,
2240 ) {
2241 let fwd_tsn = self.create_forward_tsn();
2242 if let Ok(raw) = self.create_packet(vec![Box::new(fwd_tsn)]).marshal() {
2243 raw_packets.push(raw);
2244 } else {
2245 warn!("[{}] failed to serialize a Forward TSN packet", self.side);
2246 }
2247 }
2248 }
2249
2250 raw_packets
2251 }
2252
2253 fn gather_outbound_shutdown_packets(
2254 &mut self,
2255 mut raw_packets: Vec<Bytes>,
2256 now: Instant,
2257 ) -> (Vec<Bytes>, bool) {
2258 let mut ok = true;
2259
2260 if self.will_send_shutdown {
2261 self.will_send_shutdown = false;
2262
2263 let shutdown = ChunkShutdown {
2264 cumulative_tsn_ack: self.cumulative_tsn_ack_point,
2265 };
2266
2267 if let Ok(raw) = self.create_packet(vec![Box::new(shutdown)]).marshal() {
2268 self.timers
2269 .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2270 raw_packets.push(raw);
2271 } else {
2272 warn!("[{}] failed to serialize a Shutdown packet", self.side);
2273 }
2274 } else if self.will_send_shutdown_ack {
2275 self.will_send_shutdown_ack = false;
2276
2277 let shutdown_ack = ChunkShutdownAck {};
2278
2279 if let Ok(raw) = self.create_packet(vec![Box::new(shutdown_ack)]).marshal() {
2280 self.timers
2281 .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2282 raw_packets.push(raw);
2283 } else {
2284 warn!("[{}] failed to serialize a ShutdownAck packet", self.side);
2285 }
2286 } else if self.will_send_shutdown_complete {
2287 self.will_send_shutdown_complete = false;
2288
2289 let shutdown_complete = ChunkShutdownComplete {};
2290
2291 if let Ok(raw) = self
2292 .create_packet(vec![Box::new(shutdown_complete)])
2293 .marshal()
2294 {
2295 raw_packets.push(raw);
2296 ok = false;
2297 } else {
2298 warn!(
2299 "[{}] failed to serialize a ShutdownComplete packet",
2300 self.side
2301 );
2302 }
2303 }
2304
2305 (raw_packets, ok)
2306 }
2307
2308 fn get_data_packets_to_retransmit(&mut self, now: Instant) -> Vec<Packet> {
2311 let awnd = std::cmp::min(self.cwnd, self.rwnd);
2312 let mut chunks = vec![];
2313 let mut bytes_to_send = 0;
2314 let mut done = false;
2315 let mut i = 0;
2316 while !done {
2317 let tsn = self.cumulative_tsn_ack_point + i + 1;
2318 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2319 if !c.retransmit {
2320 i += 1;
2321 continue;
2322 }
2323
2324 if i == 0 && self.rwnd < c.user_data.len() as u32 {
2325 done = true;
2327 } else if bytes_to_send + c.user_data.len() > awnd as usize {
2328 break;
2329 }
2330
2331 c.retransmit = false;
2334 bytes_to_send += c.user_data.len();
2335
2336 c.nsent += 1;
2337 } else {
2338 break; }
2340
2341 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2342 Association::check_partial_reliability_status(
2343 c,
2344 now,
2345 self.use_forward_tsn,
2346 self.side,
2347 &self.streams,
2348 );
2349
2350 trace!(
2351 "[{}] retransmitting tsn={} ssn={} sent={}",
2352 self.side,
2353 c.tsn,
2354 c.stream_sequence_number,
2355 c.nsent
2356 );
2357
2358 chunks.push(c.clone());
2359 }
2360 i += 1;
2361 }
2362
2363 self.bundle_data_chunks_into_packets(chunks)
2364 }
2365
2366 fn pop_pending_data_chunks_to_send(
2369 &mut self,
2370 now: Instant,
2371 ) -> (Vec<ChunkPayloadData>, Vec<u16>) {
2372 let mut chunks = vec![];
2373 let mut sis_to_reset = vec![]; if !self.pending_queue.is_empty() {
2375 while let Some(c) = self.pending_queue.peek() {
2384 let (beginning_fragment, unordered, data_len, stream_identifier) = (
2385 c.beginning_fragment,
2386 c.unordered,
2387 c.user_data.len(),
2388 c.stream_identifier,
2389 );
2390
2391 if data_len == 0 {
2392 sis_to_reset.push(stream_identifier);
2393 if self
2394 .pending_queue
2395 .pop(beginning_fragment, unordered)
2396 .is_none()
2397 {
2398 error!("[{}] failed to pop from pending queue", self.side);
2399 }
2400 continue;
2401 }
2402
2403 if self.inflight_queue.get_num_bytes() + data_len > self.cwnd as usize {
2404 break; }
2406
2407 if data_len > self.rwnd as usize {
2408 break; }
2410
2411 self.rwnd -= data_len as u32;
2412
2413 if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2414 beginning_fragment,
2415 unordered,
2416 now,
2417 ) {
2418 chunks.push(chunk);
2419 }
2420 }
2421
2422 if chunks.is_empty() && self.inflight_queue.is_empty() {
2424 if let Some(c) = self.pending_queue.peek() {
2426 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
2427
2428 if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2429 beginning_fragment,
2430 unordered,
2431 now,
2432 ) {
2433 chunks.push(chunk);
2434 }
2435 }
2436 }
2437 }
2438
2439 (chunks, sis_to_reset)
2440 }
2441
2442 fn bundle_data_chunks_into_packets(&self, chunks: Vec<ChunkPayloadData>) -> Vec<Packet> {
2446 let mut packets = vec![];
2447 let mut chunks_to_send = vec![];
2448 let mut bytes_in_packet = COMMON_HEADER_SIZE;
2449
2450 for c in chunks {
2451 if bytes_in_packet + c.user_data.len() as u32 > self.mtu {
2457 packets.push(self.create_packet(chunks_to_send));
2458 chunks_to_send = vec![];
2459 bytes_in_packet = COMMON_HEADER_SIZE;
2460 }
2461
2462 bytes_in_packet += DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2463 chunks_to_send.push(Box::new(c));
2464 }
2465
2466 if !chunks_to_send.is_empty() {
2467 packets.push(self.create_packet(chunks_to_send));
2468 }
2469
2470 packets
2471 }
2472
2473 fn generate_next_tsn(&mut self) -> u32 {
2475 let tsn = self.my_next_tsn;
2476 self.my_next_tsn += 1;
2477 tsn
2478 }
2479
2480 fn generate_next_rsn(&mut self) -> u32 {
2482 let rsn = self.my_next_rsn;
2483 self.my_next_rsn += 1;
2484 rsn
2485 }
2486
2487 fn check_partial_reliability_status(
2488 c: &mut ChunkPayloadData,
2489 now: Instant,
2490 use_forward_tsn: bool,
2491 side: Side,
2492 streams: &FxHashMap<u16, StreamState>,
2493 ) {
2494 if !use_forward_tsn {
2495 return;
2496 }
2497
2498 if c.payload_type == PayloadProtocolIdentifier::Dcep {
2504 return;
2505 }
2506
2507 if let Some(s) = streams.get(&c.stream_identifier) {
2509 let reliability_type: ReliabilityType = s.reliability_type;
2510 let reliability_value = s.reliability_value;
2511
2512 if reliability_type == ReliabilityType::Rexmit {
2513 if c.nsent >= reliability_value {
2514 c.set_abandoned(true);
2515 trace!(
2516 "[{}] marked as abandoned: tsn={} ppi={} (remix: {})",
2517 side,
2518 c.tsn,
2519 c.payload_type,
2520 c.nsent
2521 );
2522 }
2523 } else if reliability_type == ReliabilityType::Timed {
2524 if let Some(since) = &c.since {
2525 let elapsed = now.duration_since(*since);
2526 if elapsed.as_millis() as u32 >= reliability_value {
2527 c.set_abandoned(true);
2528 trace!(
2529 "[{}] marked as abandoned: tsn={} ppi={} (timed: {:?})",
2530 side,
2531 c.tsn,
2532 c.payload_type,
2533 elapsed
2534 );
2535 }
2536 } else {
2537 error!("[{}] invalid c.since", side);
2538 }
2539 }
2540 } else {
2541 error!("[{}] stream {} not found)", side, c.stream_identifier);
2542 }
2543 }
2544
2545 fn create_selective_ack_chunk(&mut self) -> ChunkSelectiveAck {
2546 ChunkSelectiveAck {
2547 cumulative_tsn_ack: self.peer_last_tsn,
2548 advertised_receiver_window_credit: self.get_my_receiver_window_credit(),
2549 gap_ack_blocks: self.payload_queue.get_gap_ack_blocks(self.peer_last_tsn),
2550 duplicate_tsn: self.payload_queue.pop_duplicates(),
2551 }
2552 }
2553
2554 fn create_forward_tsn(&self) -> ChunkForwardTsn {
2557 let mut stream_map: HashMap<u16, u16> = HashMap::new(); let mut i = self.cumulative_tsn_ack_point + 1;
2560 while sna32lte(i, self.advanced_peer_tsn_ack_point) {
2561 if let Some(c) = self.inflight_queue.get(i) {
2562 if let Some(ssn) = stream_map.get(&c.stream_identifier) {
2563 if sna16lt(*ssn, c.stream_sequence_number) {
2564 stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2566 }
2567 } else {
2568 stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2569 }
2570 } else {
2571 break;
2572 }
2573
2574 i += 1;
2575 }
2576
2577 let mut fwd_tsn = ChunkForwardTsn {
2578 new_cumulative_tsn: self.advanced_peer_tsn_ack_point,
2579 streams: vec![],
2580 };
2581
2582 let mut stream_str = String::new();
2583 for (si, ssn) in &stream_map {
2584 stream_str += format!("(si={} ssn={})", si, ssn).as_str();
2585 fwd_tsn.streams.push(ChunkForwardTsnStream {
2586 identifier: *si,
2587 sequence: *ssn,
2588 });
2589 }
2590 trace!(
2591 "[{}] building fwd_tsn: newCumulativeTSN={} cumTSN={} - {}",
2592 self.side,
2593 fwd_tsn.new_cumulative_tsn,
2594 self.cumulative_tsn_ack_point,
2595 stream_str
2596 );
2597
2598 fwd_tsn
2599 }
2600
2601 fn move_pending_data_chunk_to_inflight_queue(
2603 &mut self,
2604 beginning_fragment: bool,
2605 unordered: bool,
2606 now: Instant,
2607 ) -> Option<ChunkPayloadData> {
2608 if let Some(mut c) = self.pending_queue.pop(beginning_fragment, unordered) {
2609 if c.ending_fragment {
2611 c.set_all_inflight();
2612 }
2613
2614 c.tsn = self.generate_next_tsn();
2616
2617 c.since = Some(now); c.nsent = 1; Association::check_partial_reliability_status(
2621 &mut c,
2622 now,
2623 self.use_forward_tsn,
2624 self.side,
2625 &self.streams,
2626 );
2627
2628 trace!(
2629 "[{}] sending ppi={} tsn={} ssn={} sent={} len={} ({},{})",
2630 self.side,
2631 c.payload_type as u32,
2632 c.tsn,
2633 c.stream_sequence_number,
2634 c.nsent,
2635 c.user_data.len(),
2636 c.beginning_fragment,
2637 c.ending_fragment
2638 );
2639
2640 self.inflight_queue.push_no_check(c.clone());
2641
2642 Some(c)
2643 } else {
2644 error!("[{}] failed to pop from pending queue", self.side);
2645 None
2646 }
2647 }
2648
2649 pub(crate) fn send_reset_request(&mut self, stream_identifier: StreamId) -> Result<()> {
2650 let state = self.state();
2651 if state != AssociationState::Established {
2652 return Err(Error::ErrResetPacketInStateNotExist);
2653 }
2654
2655 let c = ChunkPayloadData {
2658 stream_identifier,
2659 beginning_fragment: true,
2660 ending_fragment: true,
2661 user_data: Bytes::new(),
2662 ..Default::default()
2663 };
2664
2665 self.pending_queue.push(c);
2666 self.awake_write_loop();
2667
2668 Ok(())
2669 }
2670
2671 pub(crate) fn send_payload_data(&mut self, chunks: Vec<ChunkPayloadData>) -> Result<()> {
2673 let state = self.state();
2674 if state != AssociationState::Established {
2675 return Err(Error::ErrPayloadDataStateNotExist);
2676 }
2677
2678 for c in chunks {
2680 self.pending_queue.push(c);
2681 }
2682
2683 self.awake_write_loop();
2684 Ok(())
2685 }
2686
2687 pub(crate) fn buffered_amount(&self) -> usize {
2690 self.pending_queue.get_num_bytes() + self.inflight_queue.get_num_bytes()
2691 }
2692
2693 fn awake_write_loop(&self) {
2694 }
2696
2697 fn close_all_timers(&mut self) {
2698 for timer in Timer::VALUES {
2700 self.timers.stop(timer);
2701 }
2702 }
2703
2704 fn on_ack_timeout(&mut self) {
2705 trace!(
2706 "[{}] ack timed out (ack_state: {})",
2707 self.side,
2708 self.ack_state
2709 );
2710 self.stats.inc_ack_timeouts();
2711 self.ack_state = AckState::Immediate;
2712 self.awake_write_loop();
2713 }
2714
2715 fn on_retransmission_timeout(&mut self, timer_id: Timer, n_rtos: usize) {
2716 match timer_id {
2717 Timer::T1Init => {
2718 if let Err(err) = self.send_init() {
2719 debug!(
2720 "[{}] failed to retransmit init (n_rtos={}): {:?}",
2721 self.side, n_rtos, err
2722 );
2723 }
2724 }
2725
2726 Timer::T1Cookie => {
2727 if let Err(err) = self.send_cookie_echo() {
2728 debug!(
2729 "[{}] failed to retransmit cookie-echo (n_rtos={}): {:?}",
2730 self.side, n_rtos, err
2731 );
2732 }
2733 }
2734
2735 Timer::T2Shutdown => {
2736 debug!(
2737 "[{}] retransmission of shutdown timeout (n_rtos={})",
2738 self.side, n_rtos
2739 );
2740 let state = self.state();
2741 match state {
2742 AssociationState::ShutdownSent => {
2743 self.will_send_shutdown = true;
2744 self.awake_write_loop();
2745 }
2746 AssociationState::ShutdownAckSent => {
2747 self.will_send_shutdown_ack = true;
2748 self.awake_write_loop();
2749 }
2750 _ => {}
2751 }
2752 }
2753
2754 Timer::T3RTX => {
2755 self.stats.inc_t3timeouts();
2756
2757 self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
2768 self.cwnd = self.mtu;
2769 trace!(
2770 "[{}] updated cwnd={} ssthresh={} inflight={} (RTO)",
2771 self.side,
2772 self.cwnd,
2773 self.ssthresh,
2774 self.inflight_queue.get_num_bytes()
2775 );
2776
2777 if self.use_forward_tsn {
2782 let mut i = self.advanced_peer_tsn_ack_point + 1;
2784 while let Some(c) = self.inflight_queue.get(i) {
2785 if !c.abandoned() {
2786 break;
2787 }
2788 self.advanced_peer_tsn_ack_point = i;
2789 i += 1;
2790 }
2791
2792 if sna32gt(
2794 self.advanced_peer_tsn_ack_point,
2795 self.cumulative_tsn_ack_point,
2796 ) {
2797 self.will_send_forward_tsn = true;
2798 debug!(
2799 "[{}] on_retransmission_timeout {}: sna32GT({}, {})",
2800 self.side,
2801 self.will_send_forward_tsn,
2802 self.advanced_peer_tsn_ack_point,
2803 self.cumulative_tsn_ack_point
2804 );
2805 }
2806 }
2807
2808 debug!(
2809 "[{}] T3-rtx timed out: n_rtos={} cwnd={} ssthresh={}",
2810 self.side, n_rtos, self.cwnd, self.ssthresh
2811 );
2812
2813 self.inflight_queue.mark_all_to_retrasmit();
2814 self.awake_write_loop();
2815 }
2816
2817 Timer::Reconfig => {
2818 self.will_retransmit_reconfig = true;
2819 self.awake_write_loop();
2820 }
2821
2822 _ => {}
2823 }
2824 }
2825
2826 fn on_retransmission_failure(&mut self, id: Timer) {
2827 match id {
2828 Timer::T1Init => {
2829 error!("[{}] retransmission failure: T1-init", self.side);
2830 self.error = Some(AssociationError::HandshakeFailed(
2831 Error::ErrHandshakeInitAck,
2832 ));
2833 }
2834
2835 Timer::T1Cookie => {
2836 error!("[{}] retransmission failure: T1-cookie", self.side);
2837 self.error = Some(AssociationError::HandshakeFailed(
2838 Error::ErrHandshakeCookieEcho,
2839 ));
2840 }
2841
2842 Timer::T2Shutdown => {
2843 error!("[{}] retransmission failure: T2-shutdown", self.side);
2844 }
2845
2846 Timer::T3RTX => {
2847 error!("[{}] retransmission failure: T3-rtx (DATA)", self.side);
2852 }
2853
2854 _ => {}
2855 }
2856 }
2857
2858 #[cfg(test)]
2860 pub(crate) fn is_idle(&self) -> bool {
2861 Timer::VALUES
2862 .iter()
2863 .filter_map(|&t| Some((t, self.timers.get(t)?)))
2865 .min_by_key(|&(_, time)| time)
2866 .is_none()
2868 }
2869}