iroh_quinn_proto/connection/paths.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
use std::{cmp, net::SocketAddr, time::Duration, time::Instant};
use tracing::trace;
use super::{
mtud::MtuDiscovery,
pacing::Pacer,
spaces::{PacketSpace, SentPacket},
};
use crate::{congestion, frame::ObservedAddr, packet::SpaceId, TransportConfig, TIMER_GRANULARITY};
/// Description of a particular network path
pub(super) struct PathData {
pub(super) remote: SocketAddr,
pub(super) rtt: RttEstimator,
/// Whether we're enabling ECN on outgoing packets
pub(super) sending_ecn: bool,
/// Congestion controller state
pub(super) congestion: Box<dyn congestion::Controller>,
/// Pacing state
pub(super) pacing: Pacer,
pub(super) challenge: Option<u64>,
pub(super) challenge_pending: bool,
/// Whether we're certain the peer can both send and receive on this address
///
/// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
/// migration. Always true for clients.
pub(super) validated: bool,
/// Total size of all UDP datagrams sent on this path
pub(super) total_sent: u64,
/// Total size of all UDP datagrams received on this path
pub(super) total_recvd: u64,
/// The state of the MTU discovery process
pub(super) mtud: MtuDiscovery,
/// Packet number of the first packet sent after an RTT sample was collected on this path
///
/// Used in persistent congestion determination.
pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
pub(super) in_flight: InFlight,
/// Whether this path has had it's remote address reported back to the peer. This only happens
/// if both peers agree to so based on their transport parameters.
pub(super) observed_addr_sent: bool,
/// Observed address frame with the largest sequence number received from the peer on this path.
pub(super) last_observed_addr_report: Option<ObservedAddr>,
/// Number of the first packet sent on this path
///
/// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if
/// a packet was sent on a later path.
first_packet: Option<u64>,
}
impl PathData {
pub(super) fn new(
remote: SocketAddr,
allow_mtud: bool,
peer_max_udp_payload_size: Option<u16>,
now: Instant,
validated: bool,
config: &TransportConfig,
) -> Self {
let congestion = config
.congestion_controller_factory
.clone()
.build(now, config.get_initial_mtu());
Self {
remote,
rtt: RttEstimator::new(config.initial_rtt),
sending_ecn: true,
pacing: Pacer::new(
config.initial_rtt,
congestion.initial_window(),
config.get_initial_mtu(),
now,
),
congestion,
challenge: None,
challenge_pending: false,
validated,
total_sent: 0,
total_recvd: 0,
mtud: config
.mtu_discovery_config
.as_ref()
.filter(|_| allow_mtud)
.map_or(
MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
|mtud_config| {
MtuDiscovery::new(
config.get_initial_mtu(),
config.min_mtu,
peer_max_udp_payload_size,
mtud_config.clone(),
)
},
),
first_packet_after_rtt_sample: None,
in_flight: InFlight::new(),
observed_addr_sent: false,
last_observed_addr_report: None,
first_packet: None,
}
}
/// Create a new path from a previous one.
///
/// This should only be called when migrating paths.
pub(super) fn from_previous(remote: SocketAddr, prev: &Self, now: Instant) -> Self {
let congestion = prev.congestion.clone_box();
let smoothed_rtt = prev.rtt.get();
Self {
remote,
rtt: prev.rtt,
pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
sending_ecn: true,
congestion,
challenge: None,
challenge_pending: false,
validated: false,
total_sent: 0,
total_recvd: 0,
mtud: prev.mtud.clone(),
first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
in_flight: InFlight::new(),
observed_addr_sent: false,
last_observed_addr_report: None,
first_packet: None,
}
}
/// Resets RTT, congestion control and MTU states.
///
/// This is useful when it is known the underlying path has changed.
pub(super) fn reset(&mut self, config: &TransportConfig) {
let now = Instant::now();
self.rtt = RttEstimator::new(config.initial_rtt);
self.congestion = config
.congestion_controller_factory
.clone()
.build(now, config.get_initial_mtu());
self.mtud.reset(config.get_initial_mtu(), config.min_mtu);
}
/// Indicates whether we're a server that hasn't validated the peer's address and hasn't
/// received enough data from the peer to permit sending `bytes_to_send` additional bytes
pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
!self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
}
/// Returns the path's current MTU
pub(super) fn current_mtu(&self) -> u16 {
self.mtud.current_mtu()
}
/// Account for transmission of `packet` with number `pn` in `space`
pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketSpace) {
self.in_flight.insert(&packet);
if self.first_packet.is_none() {
self.first_packet = Some(pn);
}
self.in_flight.bytes -= space.sent(pn, packet);
}
/// Remove `packet` with number `pn` from this path's congestion control counters, or return
/// `false` if `pn` was sent before this path was established.
pub(super) fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) -> bool {
if self.first_packet.map_or(true, |first| first > pn) {
return false;
}
self.in_flight.remove(packet);
true
}
/// Updates the last observed address report received on this path.
///
/// If the address was updated, it's returned to be informed to the application.
#[must_use = "updated observed address must be reported to the application"]
pub(super) fn update_observed_addr_report(
&mut self,
observed: ObservedAddr,
) -> Option<SocketAddr> {
match self.last_observed_addr_report.as_mut() {
Some(prev) => {
if prev.seq_no >= observed.seq_no {
// frames that do not increase the sequence number on this path are ignored
None
} else if prev.ip == observed.ip && prev.port == observed.port {
// keep track of the last seq_no but do not report the address as updated
prev.seq_no = observed.seq_no;
None
} else {
let addr = observed.socket_addr();
self.last_observed_addr_report = Some(observed);
Some(addr)
}
}
None => {
let addr = observed.socket_addr();
self.last_observed_addr_report = Some(observed);
Some(addr)
}
}
}
}
/// RTT estimation for a particular network path
#[derive(Copy, Clone)]
pub struct RttEstimator {
/// The most recent RTT measurement made when receiving an ack for a previously unacked packet
latest: Duration,
/// The smoothed RTT of the connection, computed as described in RFC6298
smoothed: Option<Duration>,
/// The RTT variance, computed as described in RFC6298
var: Duration,
/// The minimum RTT seen in the connection, ignoring ack delay.
min: Duration,
}
impl RttEstimator {
pub(super) fn new(initial_rtt: Duration) -> Self {
Self {
latest: initial_rtt,
smoothed: None,
var: initial_rtt / 2,
min: initial_rtt,
}
}
/// The current best RTT estimation.
pub fn get(&self) -> Duration {
self.smoothed.unwrap_or(self.latest)
}
/// Conservative estimate of RTT
///
/// Takes the maximum of smoothed and latest RTT, as recommended
/// in 6.1.2 of the recovery spec (draft 29).
pub fn conservative(&self) -> Duration {
self.get().max(self.latest)
}
/// Minimum RTT registered so far for this estimator.
pub fn min(&self) -> Duration {
self.min
}
// PTO computed as described in RFC9002#6.2.1
pub(crate) fn pto_base(&self) -> Duration {
self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
}
pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
self.latest = rtt;
// min_rtt ignores ack delay.
self.min = cmp::min(self.min, self.latest);
// Based on RFC6298.
if let Some(smoothed) = self.smoothed {
let adjusted_rtt = if self.min + ack_delay <= self.latest {
self.latest - ack_delay
} else {
self.latest
};
let var_sample = if smoothed > adjusted_rtt {
smoothed - adjusted_rtt
} else {
adjusted_rtt - smoothed
};
self.var = (3 * self.var + var_sample) / 4;
self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
} else {
self.smoothed = Some(self.latest);
self.var = self.latest / 2;
self.min = self.latest;
}
}
}
#[derive(Default)]
pub(crate) struct PathResponses {
pending: Vec<PathResponse>,
}
impl PathResponses {
pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
/// Arbitrary permissive limit to prevent abuse
const MAX_PATH_RESPONSES: usize = 16;
let response = PathResponse {
packet,
token,
remote,
};
let existing = self.pending.iter_mut().find(|x| x.remote == remote);
if let Some(existing) = existing {
// Update a queued response
if existing.packet <= packet {
*existing = response;
}
return;
}
if self.pending.len() < MAX_PATH_RESPONSES {
self.pending.push(response);
} else {
// We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
// older challenges.
trace!("ignoring excessive PATH_CHALLENGE");
}
}
pub(crate) fn pop_off_path(&mut self, remote: &SocketAddr) -> Option<(u64, SocketAddr)> {
let response = *self.pending.last()?;
if response.remote == *remote {
// We don't bother searching further because we expect that the on-path response will
// get drained in the immediate future by a call to `pop_on_path`
return None;
}
self.pending.pop();
Some((response.token, response.remote))
}
pub(crate) fn pop_on_path(&mut self, remote: &SocketAddr) -> Option<u64> {
let response = *self.pending.last()?;
if response.remote != *remote {
// We don't bother searching further because we expect that the off-path response will
// get drained in the immediate future by a call to `pop_off_path`
return None;
}
self.pending.pop();
Some(response.token)
}
pub(crate) fn is_empty(&self) -> bool {
self.pending.is_empty()
}
}
#[derive(Copy, Clone)]
struct PathResponse {
/// The packet number the corresponding PATH_CHALLENGE was received in
packet: u64,
token: u64,
/// The address the corresponding PATH_CHALLENGE was received from
remote: SocketAddr,
}
/// Summary statistics of packets that have been sent on a particular path, but which have not yet
/// been acked or deemed lost
pub(super) struct InFlight {
/// Sum of the sizes of all sent packets considered "in flight" by congestion control
///
/// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
/// count towards this to ensure congestion control does not impede congestion feedback.
pub(super) bytes: u64,
/// Number of packets in flight containing frames other than ACK and PADDING
///
/// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
/// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
/// also be nonzero.
pub(super) ack_eliciting: u64,
}
impl InFlight {
fn new() -> Self {
Self {
bytes: 0,
ack_eliciting: 0,
}
}
fn insert(&mut self, packet: &SentPacket) {
self.bytes += u64::from(packet.size);
self.ack_eliciting += u64::from(packet.ack_eliciting);
}
/// Update counters to account for a packet becoming acknowledged, lost, or abandoned
fn remove(&mut self, packet: &SentPacket) {
self.bytes -= u64::from(packet.size);
self.ack_eliciting -= u64::from(packet.ack_eliciting);
}
}