webrtc_util/conn/
conn_bridge.rs

1use std::collections::VecDeque;
2use std::io::{Error, ErrorKind};
3use std::str::FromStr;
4use std::sync::atomic::Ordering;
5use std::sync::Arc;
6
7use bytes::Bytes;
8use portable_atomic::AtomicUsize;
9use tokio::sync::{mpsc, Mutex};
10use tokio::time::Duration;
11
12use super::*;
13
14const TICK_WAIT: Duration = Duration::from_micros(10);
15
16/// BridgeConn is a Conn that represents an endpoint of the bridge.
17struct BridgeConn {
18    br: Arc<Bridge>,
19    id: usize,
20    rd_rx: Mutex<mpsc::Receiver<Bytes>>,
21    loss_chance: u8,
22}
23
24#[async_trait]
25impl Conn for BridgeConn {
26    async fn connect(&self, _addr: SocketAddr) -> Result<()> {
27        Err(Error::new(ErrorKind::Other, "Not applicable").into())
28    }
29
30    async fn recv(&self, b: &mut [u8]) -> Result<usize> {
31        let mut rd_rx = self.rd_rx.lock().await;
32        let v = match rd_rx.recv().await {
33            Some(v) => v,
34            None => return Err(Error::new(ErrorKind::UnexpectedEof, "Unexpected EOF").into()),
35        };
36        let l = std::cmp::min(v.len(), b.len());
37        b[..l].copy_from_slice(&v[..l]);
38        Ok(l)
39    }
40
41    async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
42        let n = self.recv(buf).await?;
43        Ok((n, SocketAddr::from_str("0.0.0.0:0")?))
44    }
45
46    async fn send(&self, b: &[u8]) -> Result<usize> {
47        if rand::random::<u8>() % 100 < self.loss_chance {
48            return Ok(b.len());
49        }
50
51        self.br.push(b, self.id).await
52    }
53
54    async fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize> {
55        Err(Error::new(ErrorKind::Other, "Not applicable").into())
56    }
57
58    fn local_addr(&self) -> Result<SocketAddr> {
59        Err(Error::new(ErrorKind::AddrNotAvailable, "Addr Not Available").into())
60    }
61
62    fn remote_addr(&self) -> Option<SocketAddr> {
63        None
64    }
65
66    async fn close(&self) -> Result<()> {
67        Ok(())
68    }
69
70    fn as_any(&self) -> &(dyn std::any::Any + Send + Sync) {
71        self
72    }
73}
74
75pub type FilterCbFn = Box<dyn Fn(&Bytes) -> bool + Send + Sync>;
76
77/// Bridge represents a network between the two endpoints.
78#[derive(Default)]
79pub struct Bridge {
80    drop_nwrites: [AtomicUsize; 2],
81    reorder_nwrites: [AtomicUsize; 2],
82
83    stack: [Mutex<VecDeque<Bytes>>; 2],
84    queue: [Mutex<VecDeque<Bytes>>; 2],
85
86    wr_tx: [Option<mpsc::Sender<Bytes>>; 2],
87    filter_cb: [Option<FilterCbFn>; 2],
88}
89
90impl Bridge {
91    pub fn new(
92        loss_chance: u8,
93        filter_cb0: Option<FilterCbFn>,
94        filter_cb1: Option<FilterCbFn>,
95    ) -> (Arc<Bridge>, impl Conn, impl Conn) {
96        let (wr_tx0, rd_rx0) = mpsc::channel(1024);
97        let (wr_tx1, rd_rx1) = mpsc::channel(1024);
98
99        let br = Arc::new(Bridge {
100            wr_tx: [Some(wr_tx0), Some(wr_tx1)],
101            filter_cb: [filter_cb0, filter_cb1],
102            ..Default::default()
103        });
104        let conn0 = BridgeConn {
105            br: Arc::clone(&br),
106            id: 0,
107            rd_rx: Mutex::new(rd_rx0),
108            loss_chance,
109        };
110        let conn1 = BridgeConn {
111            br: Arc::clone(&br),
112            id: 1,
113            rd_rx: Mutex::new(rd_rx1),
114            loss_chance,
115        };
116
117        (br, conn0, conn1)
118    }
119
120    /// Len returns number of queued packets.
121    #[allow(clippy::len_without_is_empty)]
122    pub async fn len(&self, id: usize) -> usize {
123        let q = self.queue[id].lock().await;
124        q.len()
125    }
126
127    pub async fn push(&self, b: &[u8], id: usize) -> Result<usize> {
128        // Push rate should be limited as same as Tick rate.
129        // Otherwise, queue grows too fast on free running Write.
130        tokio::time::sleep(TICK_WAIT).await;
131
132        let d = Bytes::from(b.to_vec());
133        if self.drop_nwrites[id].load(Ordering::SeqCst) > 0 {
134            self.drop_nwrites[id].fetch_sub(1, Ordering::SeqCst);
135        } else if self.reorder_nwrites[id].load(Ordering::SeqCst) > 0 {
136            let mut stack = self.stack[id].lock().await;
137            stack.push_back(d);
138            if self.reorder_nwrites[id].fetch_sub(1, Ordering::SeqCst) == 1 {
139                let ok = inverse(&mut stack);
140                if ok {
141                    let mut queue = self.queue[id].lock().await;
142                    queue.append(&mut stack);
143                }
144            }
145        } else if let Some(filter_cb) = &self.filter_cb[id] {
146            if filter_cb(&d) {
147                let mut queue = self.queue[id].lock().await;
148                queue.push_back(d);
149            }
150        } else {
151            //log::debug!("queue [{}] enter lock", id);
152            let mut queue = self.queue[id].lock().await;
153            queue.push_back(d);
154            //log::debug!("queue [{}] exit lock", id);
155        }
156
157        Ok(b.len())
158    }
159
160    /// Reorder inverses the order of packets currently in the specified queue.
161    pub async fn reorder(&self, id: usize) -> bool {
162        let mut queue = self.queue[id].lock().await;
163        inverse(&mut queue)
164    }
165
166    /// Drop drops the specified number of packets from the given offset index
167    /// of the specified queue.
168    pub async fn drop_offset(&self, id: usize, offset: usize, n: usize) {
169        let mut queue = self.queue[id].lock().await;
170        queue.drain(offset..offset + n);
171    }
172
173    /// drop_next_nwrites drops the next n packets that will be written
174    /// to the specified queue.
175    pub fn drop_next_nwrites(&self, id: usize, n: usize) {
176        self.drop_nwrites[id].store(n, Ordering::SeqCst);
177    }
178
179    /// reorder_next_nwrites drops the next n packets that will be written
180    /// to the specified queue.
181    pub fn reorder_next_nwrites(&self, id: usize, n: usize) {
182        self.reorder_nwrites[id].store(n, Ordering::SeqCst);
183    }
184
185    pub async fn clear(&self) {
186        for id in 0..2 {
187            let mut queue = self.queue[id].lock().await;
188            queue.clear();
189        }
190    }
191
192    /// Tick attempts to hand a packet from the queue for each directions, to readers,
193    /// if there are waiting on the queue. If there's no reader, it will return
194    /// immediately.
195    pub async fn tick(&self) -> usize {
196        let mut n = 0;
197
198        for id in 0..2 {
199            let mut queue = self.queue[id].lock().await;
200            if let Some(d) = queue.pop_front() {
201                n += 1;
202                if let Some(wr_tx) = &self.wr_tx[1 - id] {
203                    let _ = wr_tx.send(d).await;
204                }
205            }
206        }
207
208        n
209    }
210
211    /// Process repeats tick() calls until no more outstanding packet in the queues.
212    pub async fn process(&self) {
213        loop {
214            tokio::time::sleep(TICK_WAIT).await;
215            self.tick().await;
216            if self.len(0).await == 0 && self.len(1).await == 0 {
217                break;
218            }
219        }
220    }
221}
222
223pub(crate) fn inverse(s: &mut VecDeque<Bytes>) -> bool {
224    if s.len() < 2 {
225        return false;
226    }
227
228    let (mut i, mut j) = (0, s.len() - 1);
229    while i < j {
230        s.swap(i, j);
231        i += 1;
232        j -= 1;
233    }
234
235    true
236}