webrtc_util/conn/
conn_pipe.rs

1use std::io::{Error, ErrorKind};
2use std::str::FromStr;
3
4use tokio::sync::{mpsc, Mutex};
5
6use super::*;
7
8struct Pipe {
9    rd_rx: Mutex<mpsc::Receiver<Vec<u8>>>,
10    wr_tx: Mutex<mpsc::Sender<Vec<u8>>>,
11}
12
13pub fn pipe() -> (impl Conn, impl Conn) {
14    let (cb1_tx, cb1_rx) = mpsc::channel(16);
15    let (cb2_tx, cb2_rx) = mpsc::channel(16);
16
17    let p1 = Pipe {
18        rd_rx: Mutex::new(cb1_rx),
19        wr_tx: Mutex::new(cb2_tx),
20    };
21
22    let p2 = Pipe {
23        rd_rx: Mutex::new(cb2_rx),
24        wr_tx: Mutex::new(cb1_tx),
25    };
26
27    (p1, p2)
28}
29
30#[async_trait]
31impl Conn for Pipe {
32    async fn connect(&self, _addr: SocketAddr) -> Result<()> {
33        Err(Error::new(ErrorKind::Other, "Not applicable").into())
34    }
35
36    async fn recv(&self, b: &mut [u8]) -> Result<usize> {
37        let mut rd_rx = self.rd_rx.lock().await;
38        let v = match rd_rx.recv().await {
39            Some(v) => v,
40            None => return Err(Error::new(ErrorKind::UnexpectedEof, "Unexpected EOF").into()),
41        };
42        let l = std::cmp::min(v.len(), b.len());
43        b[..l].copy_from_slice(&v[..l]);
44        Ok(l)
45    }
46
47    async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
48        let n = self.recv(buf).await?;
49        Ok((n, SocketAddr::from_str("0.0.0.0:0")?))
50    }
51
52    async fn send(&self, b: &[u8]) -> Result<usize> {
53        let wr_tx = self.wr_tx.lock().await;
54        match wr_tx.send(b.to_vec()).await {
55            Ok(_) => {}
56            Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string()).into()),
57        };
58        Ok(b.len())
59    }
60
61    async fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize> {
62        Err(Error::new(ErrorKind::Other, "Not applicable").into())
63    }
64
65    fn local_addr(&self) -> Result<SocketAddr> {
66        Err(Error::new(ErrorKind::AddrNotAvailable, "Addr Not Available").into())
67    }
68
69    fn remote_addr(&self) -> Option<SocketAddr> {
70        None
71    }
72
73    async fn close(&self) -> Result<()> {
74        Ok(())
75    }
76
77    fn as_any(&self) -> &(dyn std::any::Any + Send + Sync) {
78        self
79    }
80}