1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
use bytes::{Buf, BufMut, Bytes};

/// A WebTransport Session, able to accept/create streams and send/recv datagrams.
///
/// The session can be cloned to create multiple handles.
/// The session will be closed with on drop.
#[derive(Clone)]
pub struct Session(web_transport_quinn::Session);

impl Session {
    /// Block until the peer creates a new unidirectional stream.
    pub async fn accept_uni(&mut self) -> Result<RecvStream, SessionError> {
        self.0.accept_uni().await.map(RecvStream)
    }

    /// Block until the peer creates a new bidirectional stream.
    pub async fn accept_bi(&mut self) -> Result<(SendStream, RecvStream), SessionError> {
        self.0
            .accept_bi()
            .await
            .map(|(s, r)| (SendStream(s), RecvStream(r)))
    }

    /// Open a new bidirectional stream, which may block when there are too many concurrent streams.
    pub async fn open_bi(&mut self) -> Result<(SendStream, RecvStream), SessionError> {
        self.0
            .open_bi()
            .await
            .map(|(s, r)| (SendStream(s), RecvStream(r)))
    }

    /// Open a new unidirectional stream, which may block when there are too many concurrent streams.
    pub async fn open_uni(&mut self) -> Result<SendStream, SessionError> {
        self.0.open_uni().await.map(SendStream)
    }

    /// Send a datagram over the network.
    ///
    /// QUIC datagrams may be dropped for any reason:
    /// - Network congestion.
    /// - Random packet loss.
    /// - Payload is larger than `max_datagram_size()`
    /// - Peer is not receiving datagrams.
    /// - Peer has too many outstanding datagrams.
    /// - ???
    pub async fn send_datagram(&mut self, payload: Bytes) -> Result<(), SessionError> {
        // NOTE: This is not async, but we need to make it async to match the wasm implementation.
        self.0.send_datagram(payload)
    }

    /// The maximum size of a datagram that can be sent.
    pub async fn max_datagram_size(&self) -> usize {
        self.0.max_datagram_size()
    }

    /// Receive a datagram over the network.
    pub async fn recv_datagram(&mut self) -> Result<Bytes, SessionError> {
        self.0.read_datagram().await
    }

    /// Close the connection immediately with a code and reason.
    pub fn close(self, code: u32, reason: &str) {
        self.0.close(code, reason.as_bytes())
    }

    /// Block until the connection is closed.
    pub async fn closed(&self) -> SessionError {
        self.0.closed().await
    }
}

/// Convert a `web_transport_quinn::Session` into a `web_transport::Session`.
impl From<web_transport_quinn::Session> for Session {
    fn from(session: web_transport_quinn::Session) -> Self {
        Session(session)
    }
}

/// An outgoing stream of bytes to the peer.
///
/// QUIC streams have flow control, which means the send rate is limited by the peer's receive window.
/// The stream will be closed with a graceful FIN when dropped.
pub struct SendStream(web_transport_quinn::SendStream);

impl SendStream {
    /// Write some of the buffer to the stream, potentailly blocking on flow control.
    pub async fn write(&mut self, buf: &[u8]) -> Result<usize, WriteError> {
        self.0.write(buf).await
    }

    /// Write some of the given buffer to the stream, potentially blocking on flow control.
    pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, WriteError> {
        let size = self.0.write(buf.chunk()).await?;
        buf.advance(size);
        Ok(size)
    }

    /// Write the entire chunk of bytes to the stream.
    ///
    /// More efficient for some implementations, as it avoids a copy
    pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), WriteError> {
        self.0.write_chunk(buf).await
    }

    /// Set the stream's priority.
    ///
    /// Streams with lower values will be sent first, but are not guaranteed to arrive first.
    pub fn set_priority(&mut self, order: i32) {
        self.0.set_priority(order).ok();
    }

    /// Send an immediate reset code, closing the stream.
    pub fn reset(mut self, code: u32) {
        self.0.reset(code).ok();
    }
}

/// An incoming stream of bytes from the peer.
///
/// All bytes are flushed in order and the stream is flow controlled.
/// The stream will be closed with STOP_SENDING code=0 when dropped.
pub struct RecvStream(web_transport_quinn::RecvStream);

impl RecvStream {
    /// Read some data into the provided buffer.
    pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
        self.0.read(buf).await
    }

    /// Read some data into the provided buffer.
    pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<bool, ReadError> {
        let dst = buf.chunk_mut();
        let dst = unsafe { &mut *(dst as *mut _ as *mut [u8]) };

        let size = match self.0.read(dst).await? {
            Some(size) => size,
            None => return Ok(false),
        };

        unsafe { buf.advance_mut(size) };

        Ok(true)
    }

    /// Read the next chunk of data with the provided maximum size.
    ///
    /// More efficient for some implementations, as it avoids a copy
    pub async fn read_chunk(&mut self, max: usize) -> Result<Option<Bytes>, ReadError> {
        Ok(self.0.read_chunk(max, true).await?.map(|chunk| chunk.bytes))
    }

    /// Send a `STOP_SENDING` QUIC code.
    pub fn stop(mut self, code: u32) {
        self.0.stop(code).ok();
    }
}

/// A [Session] error
pub type SessionError = web_transport_quinn::SessionError;

/// A [SendStream] error
pub type WriteError = web_transport_quinn::WriteError;

/// A [RecvStream] error
pub type ReadError = web_transport_quinn::ReadError;