1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use std::{
io::{self, IoSliceMut},
net::SocketAddr,
task::{Context, Poll},
time::Instant,
};
use futures_util::ready;
use proto::Transmit;
use tokio::io::ReadBuf;
use super::{log_sendmsg_error, RecvMeta, UdpState, IO_ERROR_LOG_INTERVAL};
#[derive(Debug)]
pub struct UdpSocket {
io: tokio::net::UdpSocket,
last_send_error: Instant,
}
impl UdpSocket {
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<UdpSocket> {
socket.set_nonblocking(true)?;
let now = Instant::now();
Ok(UdpSocket {
io: tokio::net::UdpSocket::from_std(socket)?,
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
})
}
pub fn poll_send(
&mut self,
_state: &UdpState,
cx: &mut Context,
transmits: &[Transmit],
) -> Poll<Result<usize, io::Error>> {
let mut sent = 0;
for transmit in transmits {
match self
.io
.poll_send_to(cx, &transmit.contents, transmit.destination)
{
Poll::Ready(Ok(_)) => {
sent += 1;
}
Poll::Ready(Err(_)) | Poll::Pending if sent != 0 => return Poll::Ready(Ok(sent)),
Poll::Ready(Err(e)) => {
debug_assert!(e.kind() != io::ErrorKind::WouldBlock);
log_sendmsg_error(&mut self.last_send_error, e, transmit);
sent += 1;
}
Poll::Pending => return Poll::Pending,
}
}
Poll::Ready(Ok(sent))
}
pub fn poll_recv(
&self,
cx: &mut Context,
bufs: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> Poll<io::Result<usize>> {
debug_assert!(!bufs.is_empty());
let mut buf = ReadBuf::new(&mut bufs[0]);
let addr = ready!(self.io.poll_recv_from(cx, &mut buf))?;
meta[0] = RecvMeta {
len: buf.filled().len(),
addr,
ecn: None,
dst_ip: None,
};
Poll::Ready(Ok(1))
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.local_addr()
}
}
pub fn udp_state() -> super::UdpState {
super::UdpState {
max_gso_segments: std::sync::atomic::AtomicUsize::new(1),
}
}
pub const BATCH_SIZE: usize = 1;