compio_net/
udp.rs

1use std::{future::Future, io, net::SocketAddr};
2
3use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4use compio_driver::impl_raw_fd;
5use socket2::{Protocol, SockAddr, Socket as Socket2, Type};
6
7use crate::{Socket, ToSocketAddrsAsync};
8
9/// A UDP socket.
10///
11/// UDP is "connectionless", unlike TCP. Meaning, regardless of what address
12/// you've bound to, a `UdpSocket` is free to communicate with many different
13/// remotes. There are basically two main ways to use `UdpSocket`:
14///
15/// * one to many: [`bind`](`UdpSocket::bind`) and use
16///   [`send_to`](`UdpSocket::send_to`) and
17///   [`recv_from`](`UdpSocket::recv_from`) to communicate with many different
18///   addresses
19/// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single
20///   address, using [`send`](`UdpSocket::send`) and [`recv`](`UdpSocket::recv`)
21///   to communicate only with that remote address
22///
23/// # Examples
24/// Bind and connect a pair of sockets and send a packet:
25///
26/// ```
27/// use std::net::SocketAddr;
28///
29/// use compio_net::UdpSocket;
30///
31/// # compio_runtime::Runtime::new().unwrap().block_on(async {
32/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
33/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
34///
35/// // bind sockets
36/// let mut socket = UdpSocket::bind(first_addr).await.unwrap();
37/// let first_addr = socket.local_addr().unwrap();
38/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap();
39/// let second_addr = other_socket.local_addr().unwrap();
40///
41/// // connect sockets
42/// socket.connect(second_addr).await.unwrap();
43/// other_socket.connect(first_addr).await.unwrap();
44///
45/// let buf = Vec::with_capacity(12);
46///
47/// // write data
48/// socket.send("Hello world!").await.unwrap();
49///
50/// // read data
51/// let (n_bytes, buf) = other_socket.recv(buf).await.unwrap();
52///
53/// assert_eq!(n_bytes, buf.len());
54/// assert_eq!(buf, b"Hello world!");
55/// # });
56/// ```
57/// Send and receive packets without connecting:
58///
59/// ```
60/// use std::net::SocketAddr;
61///
62/// use compio_net::UdpSocket;
63/// use socket2::SockAddr;
64///
65/// # compio_runtime::Runtime::new().unwrap().block_on(async {
66/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
67/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
68///
69/// // bind sockets
70/// let mut socket = UdpSocket::bind(first_addr).await.unwrap();
71/// let first_addr = socket.local_addr().unwrap();
72/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap();
73/// let second_addr = other_socket.local_addr().unwrap();
74///
75/// let buf = Vec::with_capacity(32);
76///
77/// // write data
78/// socket.send_to("hello world", second_addr).await.unwrap();
79///
80/// // read data
81/// let ((n_bytes, addr), buf) = other_socket.recv_from(buf).await.unwrap();
82///
83/// assert_eq!(addr, first_addr);
84/// assert_eq!(n_bytes, buf.len());
85/// assert_eq!(buf, b"hello world");
86/// # });
87/// ```
88#[derive(Debug, Clone)]
89pub struct UdpSocket {
90    inner: Socket,
91}
92
93impl UdpSocket {
94    /// Creates a new UDP socket and attempt to bind it to the addr provided.
95    pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
96        super::each_addr(addr, |addr| async move {
97            Ok(Self {
98                inner: Socket::bind(&SockAddr::from(addr), Type::DGRAM, Some(Protocol::UDP))
99                    .await?,
100            })
101        })
102        .await
103    }
104
105    /// Connects this UDP socket to a remote address, allowing the `send` and
106    /// `recv` to be used to send data and also applies filters to only
107    /// receive data from the specified address.
108    ///
109    /// Note that usually, a successful `connect` call does not specify
110    /// that there is a remote server listening on the port, rather, such an
111    /// error would only be detected after the first send.
112    pub async fn connect(&self, addr: impl ToSocketAddrsAsync) -> io::Result<()> {
113        super::each_addr(addr, |addr| async move {
114            self.inner.connect(&SockAddr::from(addr))
115        })
116        .await
117    }
118
119    /// Creates new UdpSocket from a std::net::UdpSocket.
120    pub fn from_std(socket: std::net::UdpSocket) -> io::Result<Self> {
121        Ok(Self {
122            inner: Socket::from_socket2(Socket2::from(socket))?,
123        })
124    }
125
126    /// Close the socket. If the returned future is dropped before polling, the
127    /// socket won't be closed.
128    pub fn close(self) -> impl Future<Output = io::Result<()>> {
129        self.inner.close()
130    }
131
132    /// Returns the socket address of the remote peer this socket was connected
133    /// to.
134    ///
135    /// # Examples
136    ///
137    /// ```no_run
138    /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
139    ///
140    /// use compio_net::UdpSocket;
141    /// use socket2::SockAddr;
142    ///
143    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
144    /// let socket = UdpSocket::bind("127.0.0.1:34254")
145    ///     .await
146    ///     .expect("couldn't bind to address");
147    /// socket
148    ///     .connect("192.168.0.1:41203")
149    ///     .await
150    ///     .expect("couldn't connect to address");
151    /// assert_eq!(
152    ///     socket.peer_addr().unwrap(),
153    ///     SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 0, 1), 41203))
154    /// );
155    /// # });
156    /// ```
157    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
158        self.inner
159            .peer_addr()
160            .map(|addr| addr.as_socket().expect("should be SocketAddr"))
161    }
162
163    /// Returns the local address that this socket is bound to.
164    ///
165    /// # Example
166    ///
167    /// ```
168    /// use std::net::SocketAddr;
169    ///
170    /// use compio_net::UdpSocket;
171    /// use socket2::SockAddr;
172    ///
173    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
174    /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
175    /// let sock = UdpSocket::bind(&addr).await.unwrap();
176    /// // the address the socket is bound to
177    /// let local_addr = sock.local_addr().unwrap();
178    /// assert_eq!(local_addr, addr);
179    /// # });
180    /// ```
181    pub fn local_addr(&self) -> io::Result<SocketAddr> {
182        self.inner
183            .local_addr()
184            .map(|addr| addr.as_socket().expect("should be SocketAddr"))
185    }
186
187    /// Receives a packet of data from the socket into the buffer, returning the
188    /// original buffer and quantity of data received.
189    pub async fn recv<T: IoBufMut>(&self, buffer: T) -> BufResult<usize, T> {
190        self.inner.recv(buffer).await
191    }
192
193    /// Receives a packet of data from the socket into the buffer, returning the
194    /// original buffer and quantity of data received.
195    pub async fn recv_vectored<T: IoVectoredBufMut>(&self, buffer: T) -> BufResult<usize, T> {
196        self.inner.recv_vectored(buffer).await
197    }
198
199    /// Sends some data to the socket from the buffer, returning the original
200    /// buffer and quantity of data sent.
201    pub async fn send<T: IoBuf>(&self, buffer: T) -> BufResult<usize, T> {
202        self.inner.send(buffer).await
203    }
204
205    /// Sends some data to the socket from the buffer, returning the original
206    /// buffer and quantity of data sent.
207    pub async fn send_vectored<T: IoVectoredBuf>(&self, buffer: T) -> BufResult<usize, T> {
208        self.inner.send_vectored(buffer).await
209    }
210
211    /// Receives a single datagram message on the socket. On success, returns
212    /// the number of bytes received and the origin.
213    pub async fn recv_from<T: IoBufMut>(&self, buffer: T) -> BufResult<(usize, SocketAddr), T> {
214        self.inner
215            .recv_from(buffer)
216            .await
217            .map_res(|(n, addr)| (n, addr.as_socket().expect("should be SocketAddr")))
218    }
219
220    /// Receives a single datagram message on the socket. On success, returns
221    /// the number of bytes received and the origin.
222    pub async fn recv_from_vectored<T: IoVectoredBufMut>(
223        &self,
224        buffer: T,
225    ) -> BufResult<(usize, SocketAddr), T> {
226        self.inner
227            .recv_from_vectored(buffer)
228            .await
229            .map_res(|(n, addr)| (n, addr.as_socket().expect("should be SocketAddr")))
230    }
231
232    /// Receives a single datagram message and ancillary data on the socket. On
233    /// success, returns the number of bytes received and the origin.
234    pub async fn recv_msg<T: IoBufMut, C: IoBufMut>(
235        &self,
236        buffer: T,
237        control: C,
238    ) -> BufResult<(usize, usize, SocketAddr), (T, C)> {
239        self.inner
240            .recv_msg(buffer, control)
241            .await
242            .map_res(|(n, m, addr)| (n, m, addr.as_socket().expect("should be SocketAddr")))
243    }
244
245    /// Receives a single datagram message and ancillary data on the socket. On
246    /// success, returns the number of bytes received and the origin.
247    pub async fn recv_msg_vectored<T: IoVectoredBufMut, C: IoBufMut>(
248        &self,
249        buffer: T,
250        control: C,
251    ) -> BufResult<(usize, usize, SocketAddr), (T, C)> {
252        self.inner
253            .recv_msg_vectored(buffer, control)
254            .await
255            .map_res(|(n, m, addr)| (n, m, addr.as_socket().expect("should be SocketAddr")))
256    }
257
258    /// Sends data on the socket to the given address. On success, returns the
259    /// number of bytes sent.
260    pub async fn send_to<T: IoBuf>(
261        &self,
262        buffer: T,
263        addr: impl ToSocketAddrsAsync,
264    ) -> BufResult<usize, T> {
265        super::first_addr_buf(addr, buffer, |addr, buffer| async move {
266            self.inner.send_to(buffer, &SockAddr::from(addr)).await
267        })
268        .await
269    }
270
271    /// Sends data on the socket to the given address. On success, returns the
272    /// number of bytes sent.
273    pub async fn send_to_vectored<T: IoVectoredBuf>(
274        &self,
275        buffer: T,
276        addr: impl ToSocketAddrsAsync,
277    ) -> BufResult<usize, T> {
278        super::first_addr_buf(addr, buffer, |addr, buffer| async move {
279            self.inner
280                .send_to_vectored(buffer, &SockAddr::from(addr))
281                .await
282        })
283        .await
284    }
285
286    /// Sends data on the socket to the given address accompanied by ancillary
287    /// data. On success, returns the number of bytes sent.
288    pub async fn send_msg<T: IoBuf, C: IoBuf>(
289        &self,
290        buffer: T,
291        control: C,
292        addr: impl ToSocketAddrsAsync,
293    ) -> BufResult<usize, (T, C)> {
294        super::first_addr_buf(
295            addr,
296            (buffer, control),
297            |addr, (buffer, control)| async move {
298                self.inner
299                    .send_msg(buffer, control, &SockAddr::from(addr))
300                    .await
301            },
302        )
303        .await
304    }
305
306    /// Sends data on the socket to the given address accompanied by ancillary
307    /// data. On success, returns the number of bytes sent.
308    pub async fn send_msg_vectored<T: IoVectoredBuf, C: IoBuf>(
309        &self,
310        buffer: T,
311        control: C,
312        addr: impl ToSocketAddrsAsync,
313    ) -> BufResult<usize, (T, C)> {
314        super::first_addr_buf(
315            addr,
316            (buffer, control),
317            |addr, (buffer, control)| async move {
318                self.inner
319                    .send_msg_vectored(buffer, control, &SockAddr::from(addr))
320                    .await
321            },
322        )
323        .await
324    }
325
326    /// Gets a socket option.
327    ///
328    /// # Safety
329    ///
330    /// The caller must ensure `T` is the correct type for `level` and `name`.
331    pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
332        self.inner.get_socket_option(level, name)
333    }
334
335    /// Sets a socket option.
336    ///
337    /// # Safety
338    ///
339    /// The caller must ensure `T` is the correct type for `level` and `name`.
340    pub unsafe fn set_socket_option<T: Copy>(
341        &self,
342        level: i32,
343        name: i32,
344        value: &T,
345    ) -> io::Result<()> {
346        self.inner.set_socket_option(level, name, value)
347    }
348}
349
350impl_raw_fd!(UdpSocket, socket2::Socket, inner, socket);