hylarana_transport/multicast/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
mod dequeue;
mod fragments;

use std::{
    io::Error,
    net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
};

use bytes::Bytes;
use crossbeam::channel::{bounded, Receiver};
use fragments::FragmentEncoder;
use once_cell::sync::Lazy;
use tokio::{runtime::Runtime, sync::mpsc::unbounded_channel};

use self::{
    dequeue::Dequeue,
    fragments::{Fragment, FragmentDecoder},
};

static RUNTIME: Lazy<Runtime> =
    Lazy::new(|| Runtime::new().expect("failed to create tokio runtime, this is a bug"));

/// A UDP socket.
///
/// After creating a UdpSocket by binding it to a socket address, data can be
/// sent to and received from any other socket address.
///
/// Although UDP is a connectionless protocol, this implementation provides an
/// interface to set an address where data should be sent and received from.
/// After setting a remote address with connect, data can be sent to and
/// received from that address with send and recv.
///
/// As stated in the User Datagram Protocol’s specification in IETF RFC 768, UDP
/// is an unordered, unreliable protocol;
///
/// This client is only used to receive multicast packets and does not send
/// multicast packets.
pub struct Socket {
    rx: Receiver<(u64, Bytes)>,
    close_signal: tokio::sync::mpsc::UnboundedSender<()>,
}

unsafe impl Send for Socket {}
unsafe impl Sync for Socket {}

impl Socket {
    /// Creates a UDP socket from the given address.
    ///
    /// You need to specify the multicast group for the udp session to join to
    /// the specified multicast group.
    ///
    /// Note that only IPV4 is supported.
    pub fn new(multicast: Ipv4Addr, bind: SocketAddr) -> Result<Self, Error> {
        assert!(bind.is_ipv4());

        RUNTIME.block_on(Self::create(multicast, bind))
    }

    /// Reads packets sent from the multicast server.
    ///
    /// Because the packets are reordered, it is possible to read out more than
    /// one packet at a time.
    ///
    /// Note that there may be packet loss.
    pub fn read(&self) -> Option<(u64, Bytes)> {
        self.rx.recv().ok()
    }

    pub fn close(&self) {
        let _ = self.close_signal.send(());
    }

    async fn create(multicast: Ipv4Addr, bind: SocketAddr) -> Result<Self, Error> {
        let socket = socket2::Socket::from(UdpSocket::bind(bind)?);
        socket.set_recv_buffer_size(4 * 1024 * 1024)?;
        socket.set_nonblocking(true)?;

        let socket = tokio::net::UdpSocket::from_std(socket.into())?;
        if let IpAddr::V4(bind) = bind.ip() {
            socket.join_multicast_v4(multicast, bind)?;
            socket.set_broadcast(true)?;
        }

        let (close_signal, mut closed) = unbounded_channel();
        let (tx, rx) = bounded(5);

        tokio::spawn(async move {
            let mut buf = vec![0u8; 2048];
            let mut queue = Dequeue::new(50);
            let mut decoder = FragmentDecoder::new();

            'a: loop {
                tokio::select! {
                    Ok(size) = socket.recv(&mut buf[..]) => {
                        if size == 0 {
                            break;
                        }

                        if let Ok(packet) = Fragment::try_from(&buf[..size]) {
                            queue.push(packet);

                            while let Some(chunk) = queue.pop() {
                                if let Some(packet) = decoder.decode(chunk) {
                                    if tx.send(packet).is_err() {
                                        break 'a;
                                    }
                                }
                            }
                        }
                    }
                    Some(_) = closed.recv() => {
                        break
                    }
                    else => break
                }
            }
        });

        Ok(Self { close_signal, rx })
    }
}

/// A UDP server.
///
/// After creating a UdpSocket by binding it to a socket address, data can be
/// sent to and received from any other socket address.
///
/// Although UDP is a connectionless protocol, this implementation provides an
/// interface to set an address where data should be sent and received from.
/// After setting a remote address with connect, data can be sent to and
/// received from that address with send and recv.
///
/// As stated in the User Datagram Protocol’s specification in IETF RFC 768, UDP
/// is an unordered, unreliable protocol;
///
/// This server is used to send multicast packets to all members of a multicast
/// group.
pub struct Server {
    target: SocketAddr,
    socket: UdpSocket,
    encoder: FragmentEncoder,
}

impl Server {
    /// Creates a UDP socket from the given address.
    ///
    /// You need to specify the multicast group for the udp session to join to
    /// the specified multicast group.
    ///
    /// Note that only IPV4 is supported.
    ///
    /// MTU is used to specify the network unit size, this is used to limit the
    /// maximum size of packets sent.
    pub fn new(multicast: Ipv4Addr, bind: SocketAddr, mtu: usize) -> Result<Self, Error> {
        assert!(bind.is_ipv4());

        let socket = UdpSocket::bind(SocketAddr::new(bind.ip(), 0))?;
        if let IpAddr::V4(bind) = bind.ip() {
            socket.join_multicast_v4(&multicast, &bind)?;
            socket.set_multicast_loop_v4(false)?;
        }

        Ok(Self {
            target: SocketAddr::new(IpAddr::V4(multicast), bind.port()),
            encoder: FragmentEncoder::new(mtu),
            socket,
        })
    }

    /// Sends data on the socket to the remote address to which it is connected.
    ///
    /// Sends the packet to all members of the multicast group.
    ///
    /// Note that there may be packet loss.
    pub fn send(&mut self, bytes: &[u8]) -> Result<(), Error> {
        if bytes.is_empty() {
            return Ok(());
        }

        for chunk in self.encoder.encode(bytes) {
            self.socket.send_to(chunk, self.target)?;
        }

        Ok(())
    }
}