use std::collections::VecDeque;
use bytes::{Bytes, BytesMut};
use thiserror::Error;
use tracing::{debug, trace};
use super::Connection;
use crate::{
frame::{Datagram, FrameStruct},
packet::SpaceId,
TransportError,
};
pub struct Datagrams<'a> {
pub(super) conn: &'a mut Connection,
}
impl<'a> Datagrams<'a> {
pub fn send(&mut self, data: Bytes) -> Result<(), SendDatagramError> {
if self.conn.config.datagram_receive_buffer_size.is_none() {
return Err(SendDatagramError::Disabled);
}
let max = self
.max_size()
.ok_or(SendDatagramError::UnsupportedByPeer)?;
while self.conn.datagrams.outgoing_total > self.conn.config.datagram_send_buffer_size {
let prev = self
.conn
.datagrams
.outgoing
.pop_front()
.expect("datagrams.outgoing_total desynchronized");
trace!(len = prev.data.len(), "dropping outgoing datagram");
self.conn.datagrams.outgoing_total -= prev.data.len();
}
if data.len() > max {
return Err(SendDatagramError::TooLarge);
}
self.conn.datagrams.outgoing_total += data.len();
self.conn.datagrams.outgoing.push_back(Datagram { data });
Ok(())
}
pub fn max_size(&self) -> Option<usize> {
let max_size = self.conn.path.current_mtu() as usize
- 1 - self.conn.rem_cids.active().len()
- 4 - self.conn.spaces[SpaceId::Data].crypto.as_ref().map_or_else(|| &self.conn.zero_rtt_crypto.as_ref().unwrap().packet, |x| &x.packet.local).tag_len()
- Datagram::SIZE_BOUND;
let limit = self
.conn
.peer_params
.max_datagram_frame_size?
.into_inner()
.saturating_sub(Datagram::SIZE_BOUND as u64);
Some(limit.min(max_size as u64) as usize)
}
pub fn recv(&mut self) -> Option<Bytes> {
self.conn.datagrams.recv()
}
pub fn send_buffer_space(&self) -> usize {
self.conn
.config
.datagram_send_buffer_size
.saturating_sub(self.conn.datagrams.outgoing_total)
}
}
#[derive(Default)]
pub(super) struct DatagramState {
pub(super) recv_buffered: usize,
pub(super) incoming: VecDeque<Datagram>,
pub(super) outgoing: VecDeque<Datagram>,
pub(super) outgoing_total: usize,
}
impl DatagramState {
pub(super) fn received(
&mut self,
datagram: Datagram,
window: &Option<usize>,
) -> Result<bool, TransportError> {
let window = match window {
None => {
return Err(TransportError::PROTOCOL_VIOLATION(
"unexpected DATAGRAM frame",
));
}
Some(x) => *x,
};
if datagram.data.len() > window {
return Err(TransportError::PROTOCOL_VIOLATION("oversized datagram"));
}
let was_empty = self.recv_buffered == 0;
while datagram.data.len() + self.recv_buffered > window {
debug!("dropping stale datagram");
self.recv();
}
self.recv_buffered += datagram.data.len();
self.incoming.push_back(datagram);
Ok(was_empty)
}
pub(super) fn write(&mut self, buf: &mut BytesMut, max_size: usize) -> bool {
let datagram = match self.outgoing.pop_front() {
Some(x) => x,
None => return false,
};
if buf.len() + datagram.size(true) > max_size {
self.outgoing.push_front(datagram);
return false;
}
self.outgoing_total -= datagram.data.len();
datagram.encode(true, buf);
true
}
pub(super) fn recv(&mut self) -> Option<Bytes> {
let x = self.incoming.pop_front()?.data;
self.recv_buffered -= x.len();
Some(x)
}
}
#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum SendDatagramError {
#[error("datagrams not supported by peer")]
UnsupportedByPeer,
#[error("datagram support disabled")]
Disabled,
#[error("datagram too large")]
TooLarge,
}