broker_tokio/net/udp/split.rs
1//! [`UdpSocket`](../struct.UdpSocket.html) split support.
2//!
3//! The [`split`](../struct.UdpSocket.html#method.split) method splits a
4//! `UdpSocket` into a receive half and a send half, which can be used to
5//! receive and send datagrams concurrently, even from two different tasks.
6//!
7//! The halves provide access to the underlying socket, implementing
8//! `AsRef<UdpSocket>`. This allows you to call `UdpSocket` methods that takes
9//! `&self`, e.g., to get local address, to get and set socket options, to join
10//! or leave multicast groups, etc.
11//!
12//! The halves can be reunited to the original socket with their `reunite`
13//! methods.
14
15use crate::future::poll_fn;
16use crate::net::udp::UdpSocket;
17
18use std::error::Error;
19use std::fmt;
20use std::io;
21use std::net::SocketAddr;
22use std::sync::Arc;
23
24/// The send half after [`split`](super::UdpSocket::split).
25///
26/// Use [`send_to`](#method.send_to) or [`send`](#method.send) to send
27/// datagrams.
28#[derive(Debug)]
29pub struct SendHalf(Arc<UdpSocket>);
30
31/// The recv half after [`split`](super::UdpSocket::split).
32///
33/// Use [`recv_from`](#method.recv_from) or [`recv`](#method.recv) to receive
34/// datagrams.
35#[derive(Debug)]
36pub struct RecvHalf(Arc<UdpSocket>);
37
38pub(crate) fn split(socket: UdpSocket) -> (RecvHalf, SendHalf) {
39 let shared = Arc::new(socket);
40 let send = shared.clone();
41 let recv = shared;
42 (RecvHalf(recv), SendHalf(send))
43}
44
45/// Error indicating two halves were not from the same socket, and thus could
46/// not be `reunite`d.
47#[derive(Debug)]
48pub struct ReuniteError(pub SendHalf, pub RecvHalf);
49
50impl fmt::Display for ReuniteError {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 write!(
53 f,
54 "tried to reunite halves that are not from the same socket"
55 )
56 }
57}
58
59impl Error for ReuniteError {}
60
61fn reunite(s: SendHalf, r: RecvHalf) -> Result<UdpSocket, ReuniteError> {
62 if Arc::ptr_eq(&s.0, &r.0) {
63 drop(r);
64 // Only two instances of the `Arc` are ever created, one for the
65 // receiver and one for the sender, and those `Arc`s are never exposed
66 // externally. And so when we drop one here, the other one must be the
67 // only remaining one.
68 Ok(Arc::try_unwrap(s.0).expect("udp: try_unwrap failed in reunite"))
69 } else {
70 Err(ReuniteError(s, r))
71 }
72}
73
74impl RecvHalf {
75 /// Attempts to put the two "halves" of a `UdpSocket` back together and
76 /// recover the original socket. Succeeds only if the two "halves"
77 /// originated from the same call to `UdpSocket::split`.
78 pub fn reunite(self, other: SendHalf) -> Result<UdpSocket, ReuniteError> {
79 reunite(other, self)
80 }
81
82 /// Returns a future that receives a single datagram on the socket. On success,
83 /// the future resolves to the number of bytes read and the origin.
84 ///
85 /// The function must be called with valid byte array `buf` of sufficient size
86 /// to hold the message bytes. If a message is too long to fit in the supplied
87 /// buffer, excess bytes may be discarded.
88 pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
89 poll_fn(|cx| self.0.poll_recv_from(cx, buf)).await
90 }
91
92 /// Returns a future that receives a single datagram message on the socket from
93 /// the remote address to which it is connected. On success, the future will resolve
94 /// to the number of bytes read.
95 ///
96 /// The function must be called with valid byte array `buf` of sufficient size to
97 /// hold the message bytes. If a message is too long to fit in the supplied buffer,
98 /// excess bytes may be discarded.
99 ///
100 /// The [`connect`] method will connect this socket to a remote address. The future
101 /// will fail if the socket is not connected.
102 ///
103 /// [`connect`]: super::UdpSocket::connect
104 pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
105 poll_fn(|cx| self.0.poll_recv(cx, buf)).await
106 }
107}
108
109impl SendHalf {
110 /// Attempts to put the two "halves" of a `UdpSocket` back together and
111 /// recover the original socket. Succeeds only if the two "halves"
112 /// originated from the same call to `UdpSocket::split`.
113 pub fn reunite(self, other: RecvHalf) -> Result<UdpSocket, ReuniteError> {
114 reunite(self, other)
115 }
116
117 /// Returns a future that sends data on the socket to the given address.
118 /// On success, the future will resolve to the number of bytes written.
119 ///
120 /// The future will resolve to an error if the IP version of the socket does
121 /// not match that of `target`.
122 pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
123 poll_fn(|cx| self.0.poll_send_to(cx, buf, target)).await
124 }
125
126 /// Returns a future that sends data on the socket to the remote address to which it is connected.
127 /// On success, the future will resolve to the number of bytes written.
128 ///
129 /// The [`connect`] method will connect this socket to a remote address. The future
130 /// will resolve to an error if the socket is not connected.
131 ///
132 /// [`connect`]: super::UdpSocket::connect
133 pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
134 poll_fn(|cx| self.0.poll_send(cx, buf)).await
135 }
136}
137
138impl AsRef<UdpSocket> for SendHalf {
139 fn as_ref(&self) -> &UdpSocket {
140 &self.0
141 }
142}
143
144impl AsRef<UdpSocket> for RecvHalf {
145 fn as_ref(&self) -> &UdpSocket {
146 &self.0
147 }
148}