webrtc_srtp/
stream.rs

1use tokio::sync::mpsc;
2use util::marshal::*;
3use util::Buffer;
4
5use crate::error::{Error, Result};
6
7/// Limit the buffer size to 1MB
8pub const SRTP_BUFFER_SIZE: usize = 1000 * 1000;
9
10/// Limit the buffer size to 100KB
11pub const SRTCP_BUFFER_SIZE: usize = 100 * 1000;
12
13/// Stream handles decryption for a single RTP/RTCP SSRC
14#[derive(Debug)]
15pub struct Stream {
16    ssrc: u32,
17    tx: mpsc::Sender<u32>,
18    pub(crate) buffer: Buffer,
19    is_rtp: bool,
20}
21
22impl Stream {
23    /// Create a new stream
24    pub fn new(ssrc: u32, tx: mpsc::Sender<u32>, is_rtp: bool) -> Self {
25        Stream {
26            ssrc,
27            tx,
28            // Create a buffer with a 1MB limit
29            buffer: Buffer::new(
30                0,
31                if is_rtp {
32                    SRTP_BUFFER_SIZE
33                } else {
34                    SRTCP_BUFFER_SIZE
35                },
36            ),
37            is_rtp,
38        }
39    }
40
41    /// GetSSRC returns the SSRC we are demuxing for
42    pub fn get_ssrc(&self) -> u32 {
43        self.ssrc
44    }
45
46    /// Check if RTP is a stream.
47    pub fn is_rtp_stream(&self) -> bool {
48        self.is_rtp
49    }
50
51    /// Read reads and decrypts full RTP packet from the nextConn
52    pub async fn read(&self, buf: &mut [u8]) -> Result<usize> {
53        Ok(self.buffer.read(buf, None).await?)
54    }
55
56    /// ReadRTP reads and decrypts full RTP packet and its header from the nextConn
57    pub async fn read_rtp(&self, buf: &mut [u8]) -> Result<rtp::packet::Packet> {
58        if !self.is_rtp {
59            return Err(Error::InvalidRtpStream);
60        }
61
62        let n = self.buffer.read(buf, None).await?;
63        let mut b = &buf[..n];
64        let pkt = rtp::packet::Packet::unmarshal(&mut b)?;
65
66        Ok(pkt)
67    }
68
69    /// read_rtcp reads and decrypts full RTP packet and its header from the nextConn
70    pub async fn read_rtcp(
71        &self,
72        buf: &mut [u8],
73    ) -> Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
74        if self.is_rtp {
75            return Err(Error::InvalidRtcpStream);
76        }
77
78        let n = self.buffer.read(buf, None).await?;
79        let mut b = &buf[..n];
80        let pkt = rtcp::packet::unmarshal(&mut b)?;
81
82        Ok(pkt)
83    }
84
85    /// Close removes the ReadStream from the session and cleans up any associated state
86    pub async fn close(&self) -> Result<()> {
87        self.buffer.close().await;
88        let _ = self.tx.send(self.ssrc).await;
89        Ok(())
90    }
91}