webrtc_util/conn/
conn_bridge.rs1use std::collections::VecDeque;
2use std::io::{Error, ErrorKind};
3use std::str::FromStr;
4use std::sync::atomic::Ordering;
5use std::sync::Arc;
6
7use bytes::Bytes;
8use portable_atomic::AtomicUsize;
9use tokio::sync::{mpsc, Mutex};
10use tokio::time::Duration;
11
12use super::*;
13
14const TICK_WAIT: Duration = Duration::from_micros(10);
15
16struct BridgeConn {
18 br: Arc<Bridge>,
19 id: usize,
20 rd_rx: Mutex<mpsc::Receiver<Bytes>>,
21 loss_chance: u8,
22}
23
24#[async_trait]
25impl Conn for BridgeConn {
26 async fn connect(&self, _addr: SocketAddr) -> Result<()> {
27 Err(Error::new(ErrorKind::Other, "Not applicable").into())
28 }
29
30 async fn recv(&self, b: &mut [u8]) -> Result<usize> {
31 let mut rd_rx = self.rd_rx.lock().await;
32 let v = match rd_rx.recv().await {
33 Some(v) => v,
34 None => return Err(Error::new(ErrorKind::UnexpectedEof, "Unexpected EOF").into()),
35 };
36 let l = std::cmp::min(v.len(), b.len());
37 b[..l].copy_from_slice(&v[..l]);
38 Ok(l)
39 }
40
41 async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
42 let n = self.recv(buf).await?;
43 Ok((n, SocketAddr::from_str("0.0.0.0:0")?))
44 }
45
46 async fn send(&self, b: &[u8]) -> Result<usize> {
47 if rand::random::<u8>() % 100 < self.loss_chance {
48 return Ok(b.len());
49 }
50
51 self.br.push(b, self.id).await
52 }
53
54 async fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize> {
55 Err(Error::new(ErrorKind::Other, "Not applicable").into())
56 }
57
58 fn local_addr(&self) -> Result<SocketAddr> {
59 Err(Error::new(ErrorKind::AddrNotAvailable, "Addr Not Available").into())
60 }
61
62 fn remote_addr(&self) -> Option<SocketAddr> {
63 None
64 }
65
66 async fn close(&self) -> Result<()> {
67 Ok(())
68 }
69
70 fn as_any(&self) -> &(dyn std::any::Any + Send + Sync) {
71 self
72 }
73}
74
75pub type FilterCbFn = Box<dyn Fn(&Bytes) -> bool + Send + Sync>;
76
77#[derive(Default)]
79pub struct Bridge {
80 drop_nwrites: [AtomicUsize; 2],
81 reorder_nwrites: [AtomicUsize; 2],
82
83 stack: [Mutex<VecDeque<Bytes>>; 2],
84 queue: [Mutex<VecDeque<Bytes>>; 2],
85
86 wr_tx: [Option<mpsc::Sender<Bytes>>; 2],
87 filter_cb: [Option<FilterCbFn>; 2],
88}
89
90impl Bridge {
91 pub fn new(
92 loss_chance: u8,
93 filter_cb0: Option<FilterCbFn>,
94 filter_cb1: Option<FilterCbFn>,
95 ) -> (Arc<Bridge>, impl Conn, impl Conn) {
96 let (wr_tx0, rd_rx0) = mpsc::channel(1024);
97 let (wr_tx1, rd_rx1) = mpsc::channel(1024);
98
99 let br = Arc::new(Bridge {
100 wr_tx: [Some(wr_tx0), Some(wr_tx1)],
101 filter_cb: [filter_cb0, filter_cb1],
102 ..Default::default()
103 });
104 let conn0 = BridgeConn {
105 br: Arc::clone(&br),
106 id: 0,
107 rd_rx: Mutex::new(rd_rx0),
108 loss_chance,
109 };
110 let conn1 = BridgeConn {
111 br: Arc::clone(&br),
112 id: 1,
113 rd_rx: Mutex::new(rd_rx1),
114 loss_chance,
115 };
116
117 (br, conn0, conn1)
118 }
119
120 #[allow(clippy::len_without_is_empty)]
122 pub async fn len(&self, id: usize) -> usize {
123 let q = self.queue[id].lock().await;
124 q.len()
125 }
126
127 pub async fn push(&self, b: &[u8], id: usize) -> Result<usize> {
128 tokio::time::sleep(TICK_WAIT).await;
131
132 let d = Bytes::from(b.to_vec());
133 if self.drop_nwrites[id].load(Ordering::SeqCst) > 0 {
134 self.drop_nwrites[id].fetch_sub(1, Ordering::SeqCst);
135 } else if self.reorder_nwrites[id].load(Ordering::SeqCst) > 0 {
136 let mut stack = self.stack[id].lock().await;
137 stack.push_back(d);
138 if self.reorder_nwrites[id].fetch_sub(1, Ordering::SeqCst) == 1 {
139 let ok = inverse(&mut stack);
140 if ok {
141 let mut queue = self.queue[id].lock().await;
142 queue.append(&mut stack);
143 }
144 }
145 } else if let Some(filter_cb) = &self.filter_cb[id] {
146 if filter_cb(&d) {
147 let mut queue = self.queue[id].lock().await;
148 queue.push_back(d);
149 }
150 } else {
151 let mut queue = self.queue[id].lock().await;
153 queue.push_back(d);
154 }
156
157 Ok(b.len())
158 }
159
160 pub async fn reorder(&self, id: usize) -> bool {
162 let mut queue = self.queue[id].lock().await;
163 inverse(&mut queue)
164 }
165
166 pub async fn drop_offset(&self, id: usize, offset: usize, n: usize) {
169 let mut queue = self.queue[id].lock().await;
170 queue.drain(offset..offset + n);
171 }
172
173 pub fn drop_next_nwrites(&self, id: usize, n: usize) {
176 self.drop_nwrites[id].store(n, Ordering::SeqCst);
177 }
178
179 pub fn reorder_next_nwrites(&self, id: usize, n: usize) {
182 self.reorder_nwrites[id].store(n, Ordering::SeqCst);
183 }
184
185 pub async fn clear(&self) {
186 for id in 0..2 {
187 let mut queue = self.queue[id].lock().await;
188 queue.clear();
189 }
190 }
191
192 pub async fn tick(&self) -> usize {
196 let mut n = 0;
197
198 for id in 0..2 {
199 let mut queue = self.queue[id].lock().await;
200 if let Some(d) = queue.pop_front() {
201 n += 1;
202 if let Some(wr_tx) = &self.wr_tx[1 - id] {
203 let _ = wr_tx.send(d).await;
204 }
205 }
206 }
207
208 n
209 }
210
211 pub async fn process(&self) {
213 loop {
214 tokio::time::sleep(TICK_WAIT).await;
215 self.tick().await;
216 if self.len(0).await == 0 && self.len(1).await == 0 {
217 break;
218 }
219 }
220 }
221}
222
223pub(crate) fn inverse(s: &mut VecDeque<Bytes>) -> bool {
224 if s.len() < 2 {
225 return false;
226 }
227
228 let (mut i, mut j) = (0, s.len() - 1);
229 while i < j {
230 s.swap(i, j);
231 i += 1;
232 j -= 1;
233 }
234
235 true
236}