interceptor/
lib.rs

1#![warn(rust_2018_idioms)]
2#![allow(dead_code)]
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use error::Result;
11use stream_info::StreamInfo;
12
13pub mod chain;
14mod error;
15pub mod mock;
16pub mod nack;
17pub mod noop;
18pub mod registry;
19pub mod report;
20pub mod stats;
21pub mod stream_info;
22pub mod stream_reader;
23pub mod twcc;
24
25pub use error::Error;
26
27/// Attributes are a generic key/value store used by interceptors
28pub type Attributes = HashMap<usize, usize>;
29
30/// InterceptorBuilder provides an interface for constructing interceptors
31pub trait InterceptorBuilder {
32    fn build(&self, id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>>;
33}
34
35/// Interceptor can be used to add functionality to you PeerConnections by modifying any incoming/outgoing rtp/rtcp
36/// packets, or sending your own packets as needed.
37#[async_trait]
38pub trait Interceptor {
39    /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
40    /// change in the future. The returned method will be called once per packet batch.
41    async fn bind_rtcp_reader(
42        &self,
43        reader: Arc<dyn RTCPReader + Send + Sync>,
44    ) -> Arc<dyn RTCPReader + Send + Sync>;
45
46    /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
47    /// will be called once per packet batch.
48    async fn bind_rtcp_writer(
49        &self,
50        writer: Arc<dyn RTCPWriter + Send + Sync>,
51    ) -> Arc<dyn RTCPWriter + Send + Sync>;
52
53    /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
54    /// will be called once per rtp packet.
55    async fn bind_local_stream(
56        &self,
57        info: &StreamInfo,
58        writer: Arc<dyn RTPWriter + Send + Sync>,
59    ) -> Arc<dyn RTPWriter + Send + Sync>;
60
61    /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
62    async fn unbind_local_stream(&self, info: &StreamInfo);
63
64    /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
65    /// will be called once per rtp packet.
66    async fn bind_remote_stream(
67        &self,
68        info: &StreamInfo,
69        reader: Arc<dyn RTPReader + Send + Sync>,
70    ) -> Arc<dyn RTPReader + Send + Sync>;
71
72    /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
73    async fn unbind_remote_stream(&self, info: &StreamInfo);
74
75    async fn close(&self) -> Result<()>;
76}
77
78/// RTPWriter is used by Interceptor.bind_local_stream.
79#[async_trait]
80pub trait RTPWriter {
81    /// write a rtp packet
82    async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize>;
83}
84
85pub type RTPWriterBoxFn = Box<
86    dyn (Fn(
87            &rtp::packet::Packet,
88            &Attributes,
89        ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + Sync>>)
90        + Send
91        + Sync,
92>;
93pub struct RTPWriterFn(pub RTPWriterBoxFn);
94
95#[async_trait]
96impl RTPWriter for RTPWriterFn {
97    /// write a rtp packet
98    async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize> {
99        self.0(pkt, attributes).await
100    }
101}
102
103/// RTPReader is used by Interceptor.bind_remote_stream.
104#[async_trait]
105pub trait RTPReader {
106    /// read a rtp packet
107    async fn read(
108        &self,
109        buf: &mut [u8],
110        attributes: &Attributes,
111    ) -> Result<(rtp::packet::Packet, Attributes)>;
112}
113
114pub type RTPReaderBoxFn = Box<
115    dyn (Fn(
116            &mut [u8],
117            &Attributes,
118        )
119            -> Pin<Box<dyn Future<Output = Result<(rtp::packet::Packet, Attributes)>> + Send + Sync>>)
120        + Send
121        + Sync,
122>;
123pub struct RTPReaderFn(pub RTPReaderBoxFn);
124
125#[async_trait]
126impl RTPReader for RTPReaderFn {
127    /// read a rtp packet
128    async fn read(
129        &self,
130        buf: &mut [u8],
131        attributes: &Attributes,
132    ) -> Result<(rtp::packet::Packet, Attributes)> {
133        self.0(buf, attributes).await
134    }
135}
136
137/// RTCPWriter is used by Interceptor.bind_rtcpwriter.
138#[async_trait]
139pub trait RTCPWriter {
140    /// write a batch of rtcp packets
141    async fn write(
142        &self,
143        pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
144        attributes: &Attributes,
145    ) -> Result<usize>;
146}
147
148pub type RTCPWriterBoxFn = Box<
149    dyn (Fn(
150            &[Box<dyn rtcp::packet::Packet + Send + Sync>],
151            &Attributes,
152        ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + Sync>>)
153        + Send
154        + Sync,
155>;
156
157pub struct RTCPWriterFn(pub RTCPWriterBoxFn);
158
159#[async_trait]
160impl RTCPWriter for RTCPWriterFn {
161    /// write a batch of rtcp packets
162    async fn write(
163        &self,
164        pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
165        attributes: &Attributes,
166    ) -> Result<usize> {
167        self.0(pkts, attributes).await
168    }
169}
170
171/// RTCPReader is used by Interceptor.bind_rtcpreader.
172#[async_trait]
173pub trait RTCPReader {
174    /// read a batch of rtcp packets
175    async fn read(
176        &self,
177        buf: &mut [u8],
178        attributes: &Attributes,
179    ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>;
180}
181
182pub type RTCPReaderBoxFn = Box<
183    dyn (Fn(
184            &mut [u8],
185            &Attributes,
186        ) -> Pin<
187            Box<
188                dyn Future<
189                        Output = Result<(
190                            Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>,
191                            Attributes,
192                        )>,
193                    > + Send
194                    + Sync,
195            >,
196        >) + Send
197        + Sync,
198>;
199
200pub struct RTCPReaderFn(pub RTCPReaderBoxFn);
201
202#[async_trait]
203impl RTCPReader for RTCPReaderFn {
204    /// read a batch of rtcp packets
205    async fn read(
206        &self,
207        buf: &mut [u8],
208        attributes: &Attributes,
209    ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
210        self.0(buf, attributes).await
211    }
212}
213
214/// Helper for the tests.
215#[cfg(test)]
216mod test {
217    use std::future::Future;
218    use std::time::Duration;
219
220    pub async fn timeout_or_fail<T>(duration: Duration, future: T) -> T::Output
221    where
222        T: Future,
223    {
224        tokio::time::timeout(duration, future)
225            .await
226            .expect("should not time out")
227    }
228}