turn_rs/router/
channels.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
use super::ports::capacity;

use ahash::AHashMap;

use std::iter::{IntoIterator, Iterator};
use std::{net::SocketAddr, sync::RwLock, time::Instant};

/// channels iterator.
pub struct Iter {
    index: usize,
    inner: Channel,
}

impl Iter {
    pub fn new(channel: Channel) -> Self {
        Self {
            inner: channel,
            index: 0,
        }
    }
}

impl Iterator for Iter {
    type Item = SocketAddr;

    /// Iterator for channels.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::net::SocketAddr;
    /// use turn_rs::router::channels::*;
    ///
    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    /// let mut iter = Iter::new(Channel::new(&addr));
    ///
    /// assert_eq!(iter.next(), Some(addr));
    /// ```
    fn next(&mut self) -> Option<Self::Item> {
        let item = match self.index < 2 {
            true => self.inner.bound[self.index],
            false => None,
        };

        self.index += 1;
        item
    }
}

/// Peer channels.
///
/// A channel binding consists of:
///
/// * a channel number;
///
/// * a transport address (of the peer); and
///
/// * A time-to-expiry timer.
///
///  Within the context of an allocation, a channel binding is uniquely
/// identified either by the channel number or by the peer's transport
/// address.  Thus, the same channel cannot be bound to two different
/// transport addresses, nor can the same transport address be bound to
/// two different channels.
///
/// A channel binding lasts for 10 minutes unless refreshed.  Refreshing
/// the binding (by the server receiving a ChannelBind request rebinding
/// the channel to the same peer) resets the time-to-expiry timer back to
/// 10 minutes.
///
/// When the channel binding expires, the channel becomes unbound.  Once
/// unbound, the channel number can be bound to a different transport
/// address, and the transport address can be bound to a different
/// channel number.  To prevent race conditions, the client MUST wait 5
/// minutes after the channel binding expires before attempting to bind
/// the channel number to a different transport address or the transport
/// address to a different channel number.
///
/// When binding a channel to a peer, the client SHOULD be prepared to
/// receive ChannelData messages on the channel from the server as soon
/// as it has sent the ChannelBind request.  Over UDP, it is possible for
/// the client to receive ChannelData messages from the server before it
/// receives a ChannelBind success response.
///
/// In the other direction, the client MAY elect to send ChannelData
/// messages before receiving the ChannelBind success response.  Doing
/// so, however, runs the risk of having the ChannelData messages dropped
/// by the server if the ChannelBind request does not succeed for some
/// reason (e.g., packet lost if the request is sent over UDP or the
/// server being unable to fulfill the request).  A client that wishes to
/// be safe should either queue the data or use Send indications until
/// the channel binding is confirmed.
pub struct Channel {
    timer: Instant,
    bound: [Option<SocketAddr>; 2],
}

impl Channel {
    pub fn new(a: &SocketAddr) -> Self {
        Self {
            bound: [Some(*a), None],
            timer: Instant::now(),
        }
    }

    /// whether to include the current socketaddr.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::net::SocketAddr;
    /// use turn_rs::router::channels::*;
    ///
    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    /// let channel = Channel::new(&addr);
    /// assert!(channel.includes(&addr));
    /// ```
    pub fn includes(&self, a: &SocketAddr) -> bool {
        self.bound.contains(&Some(*a))
    }

    /// wether the peer addr has been established.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::net::SocketAddr;
    /// use turn_rs::router::channels::*;
    ///
    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    /// let channel = Channel::new(&addr);
    /// assert!(channel.is_half());
    /// ```
    pub fn is_half(&self) -> bool {
        self.bound.contains(&None)
    }

    /// update half addr.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::net::SocketAddr;
    /// use turn_rs::router::channels::*;
    ///
    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    /// let peer = "127.0.0.1:8081".parse::<SocketAddr>().unwrap();
    /// let mut channel = Channel::new(&addr);
    ///
    /// channel.up(&peer);
    /// assert!(!channel.is_half());
    /// ```
    pub fn up(&mut self, a: &SocketAddr) {
        self.bound[1] = Some(*a)
    }

    /// refresh channel lifetime.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::net::SocketAddr;
    /// use turn_rs::router::channels::*;
    ///
    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    /// let mut channel = Channel::new(&addr);
    ///
    /// channel.refresh();
    /// assert!(!channel.is_death());
    /// ```
    pub fn refresh(&mut self) {
        self.timer = Instant::now();
    }

    /// whether the channel lifetime has ended.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::net::SocketAddr;
    /// use turn_rs::router::channels::*;
    ///
    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    /// let mut channel = Channel::new(&addr);
    /// // channel.is_death()
    /// ```
    pub fn is_death(&self) -> bool {
        self.timer.elapsed().as_secs() >= 600
    }
}

impl IntoIterator for Channel {
    type IntoIter = Iter;
    type Item = SocketAddr;

    /// Into iterator for channels.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::net::SocketAddr;
    /// use turn_rs::router::channels::*;
    ///
    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    /// let channel = Channel::new(&addr);
    /// let iter = channel.into_iter();
    /// // iter.next()
    /// ```
    fn into_iter(self) -> Self::IntoIter {
        Iter {
            inner: self,
            index: 0,
        }
    }
}

/// channels table.
pub struct Channels {
    map: RwLock<AHashMap<u16, Channel>>,
    bounds: RwLock<AHashMap<(SocketAddr, u16), SocketAddr>>,
}

impl Default for Channels {
    fn default() -> Self {
        Self::new()
    }
}

impl Channels {
    pub fn new() -> Self {
        Self {
            map: RwLock::new(AHashMap::with_capacity(capacity())),
            bounds: RwLock::new(AHashMap::with_capacity(capacity())),
        }
    }

    /// get bound address.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::net::SocketAddr;
    /// use turn_rs::router::channels::*;
    ///
    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    /// let peer = "127.0.0.1:8081".parse::<SocketAddr>().unwrap();
    /// let channels = Channels::new();
    ///
    /// channels.insert(&addr, 43159, &peer).unwrap();
    /// channels.insert(&peer, 43160, &addr).unwrap();
    ///
    /// assert_eq!(channels.get_bound(&addr, 43159).unwrap(), peer);
    /// ```
    pub fn get_bound(&self, a: &SocketAddr, c: u16) -> Option<SocketAddr> {
        self.bounds.read().unwrap().get(&(*a, c)).cloned()
    }

    /// insert address for peer address to channel table.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::net::SocketAddr;
    /// use turn_rs::router::channels::*;
    ///
    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    /// let peer = "127.0.0.1:8081".parse::<SocketAddr>().unwrap();
    /// let channels = Channels::new();
    ///
    /// channels.insert(&addr, 43159, &peer).unwrap();
    /// channels.insert(&peer, 43160, &addr).unwrap();
    ///
    /// assert_eq!(channels.get_bound(&addr, 43159).unwrap(), peer);
    /// ```
    pub fn insert(&self, a: &SocketAddr, c: u16, p: &SocketAddr) -> Option<()> {
        let mut map = self.map.write().unwrap();
        let mut is_empty = false;

        let channel = map.entry(c).or_insert_with(|| {
            is_empty = true;
            Channel::new(a)
        });

        let is_include = if !is_empty { channel.includes(a) } else { true };
        if !channel.is_half() && !is_include {
            return None;
        }

        if !is_include {
            channel.up(a);
        }

        if !is_empty && is_include {
            channel.refresh();
        }

        self.bounds
            .write()
            .unwrap()
            .entry((*a, c))
            .or_insert_with(|| *p);
        Some(())
    }

    /// remove channel allocate in channel table.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::net::SocketAddr;
    /// use turn_rs::router::channels::*;
    ///
    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    /// let peer = "127.0.0.1:8081".parse::<SocketAddr>().unwrap();
    /// let channels = Channels::new();
    ///
    /// channels.insert(&addr, 43159, &peer).unwrap();
    /// channels.insert(&peer, 43160, &addr).unwrap();
    ///
    /// assert!(channels.remove(43159).is_some());
    /// assert!(channels.remove(43160).is_some());
    /// ```
    pub fn remove(&self, c: u16) -> Option<()> {
        let mut bounds = self.bounds.write().unwrap();
        for a in self.map.write().unwrap().remove(&c)? {
            bounds.remove(&(a, c));
        }

        Some(())
    }

    /// get death channels.
    ///
    /// ```
    /// use turn_rs::router::channels::*;
    ///
    /// let channels = Channels::new();
    /// assert_eq!(channels.get_deaths().len(), 0);
    /// ```
    pub fn get_deaths(&self) -> Vec<u16> {
        self.map
            .read()
            .unwrap()
            .iter()
            .filter(|(_, v)| v.is_death())
            .map(|(k, _)| *k)
            .collect::<Vec<u16>>()
    }
}