1use tokio::sync::mpsc;
2use util::marshal::*;
3use util::Buffer;
4
5use crate::error::{Error, Result};
6
7pub const SRTP_BUFFER_SIZE: usize = 1000 * 1000;
9
10pub const SRTCP_BUFFER_SIZE: usize = 100 * 1000;
12
13#[derive(Debug)]
15pub struct Stream {
16 ssrc: u32,
17 tx: mpsc::Sender<u32>,
18 pub(crate) buffer: Buffer,
19 is_rtp: bool,
20}
21
22impl Stream {
23 pub fn new(ssrc: u32, tx: mpsc::Sender<u32>, is_rtp: bool) -> Self {
25 Stream {
26 ssrc,
27 tx,
28 buffer: Buffer::new(
30 0,
31 if is_rtp {
32 SRTP_BUFFER_SIZE
33 } else {
34 SRTCP_BUFFER_SIZE
35 },
36 ),
37 is_rtp,
38 }
39 }
40
41 pub fn get_ssrc(&self) -> u32 {
43 self.ssrc
44 }
45
46 pub fn is_rtp_stream(&self) -> bool {
48 self.is_rtp
49 }
50
51 pub async fn read(&self, buf: &mut [u8]) -> Result<usize> {
53 Ok(self.buffer.read(buf, None).await?)
54 }
55
56 pub async fn read_rtp(&self, buf: &mut [u8]) -> Result<rtp::packet::Packet> {
58 if !self.is_rtp {
59 return Err(Error::InvalidRtpStream);
60 }
61
62 let n = self.buffer.read(buf, None).await?;
63 let mut b = &buf[..n];
64 let pkt = rtp::packet::Packet::unmarshal(&mut b)?;
65
66 Ok(pkt)
67 }
68
69 pub async fn read_rtcp(
71 &self,
72 buf: &mut [u8],
73 ) -> Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
74 if self.is_rtp {
75 return Err(Error::InvalidRtcpStream);
76 }
77
78 let n = self.buffer.read(buf, None).await?;
79 let mut b = &buf[..n];
80 let pkt = rtcp::packet::unmarshal(&mut b)?;
81
82 Ok(pkt)
83 }
84
85 pub async fn close(&self) -> Result<()> {
87 self.buffer.close().await;
88 let _ = self.tx.send(self.ssrc).await;
89 Ok(())
90 }
91}