1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use crate::preview2::bindings::sockets::network::IpSocketAddress;
use crate::preview2::poll::Subscribe;
use crate::preview2::with_ambient_tokio_runtime;
use async_trait::async_trait;
use cap_net_ext::{AddressFamily, Blocking, UdpSocketExt};
use io_lifetimes::raw::{FromRawSocketlike, IntoRawSocketlike};
use std::io;
use std::sync::Arc;
use tokio::io::Interest;

/// The state of a UDP socket.
///
/// This represents the various states a socket can be in during the
/// activities of binding, and connecting.
pub(crate) enum UdpState {
    /// The initial state for a newly-created socket.
    Default,

    /// Binding started via `start_bind`.
    BindStarted,

    /// Binding finished via `finish_bind`. The socket has an address but
    /// is not yet listening for connections.
    Bound,

    /// A connect call is in progress.
    Connecting(IpSocketAddress),

    /// The socket is "connected" to a peer address.
    Connected(IpSocketAddress),
}

/// A host UDP socket, plus associated bookkeeping.
///
/// The inner state is wrapped in an Arc because the same underlying socket is
/// used for implementing the stream types.
pub struct UdpSocket {
    /// The part of a `UdpSocket` which is reference-counted so that we
    /// can pass it to async tasks.
    pub(crate) inner: Arc<tokio::net::UdpSocket>,

    /// The current state in the bind/connect progression.
    pub(crate) udp_state: UdpState,

    /// Socket address family.
    pub(crate) family: AddressFamily,
}

#[async_trait]
impl Subscribe for UdpSocket {
    async fn ready(&mut self) {
        // Some states are ready immediately.
        match self.udp_state {
            UdpState::BindStarted => return,
            _ => {}
        }

        // FIXME: Add `Interest::ERROR` when we update to tokio 1.32.
        self.inner
            .ready(Interest::READABLE | Interest::WRITABLE)
            .await
            .expect("failed to await UDP socket readiness");
    }
}

impl UdpSocket {
    /// Create a new socket in the given family.
    pub fn new(family: AddressFamily) -> io::Result<Self> {
        // Create a new host socket and set it to non-blocking, which is needed
        // by our async implementation.
        let udp_socket = cap_std::net::UdpSocket::new(family, Blocking::No)?;
        Self::from_udp_socket(udp_socket, family)
    }

    pub fn from_udp_socket(
        udp_socket: cap_std::net::UdpSocket,
        family: AddressFamily,
    ) -> io::Result<Self> {
        let fd = udp_socket.into_raw_socketlike();
        let std_socket = unsafe { std::net::UdpSocket::from_raw_socketlike(fd) };
        let socket = with_ambient_tokio_runtime(|| tokio::net::UdpSocket::try_from(std_socket))?;
        Ok(Self {
            inner: Arc::new(socket),
            udp_state: UdpState::Default,
            family,
        })
    }

    pub fn udp_socket(&self) -> &tokio::net::UdpSocket {
        &self.inner
    }
}