1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
use std::marker::PhantomData;

use js_sys::Uint8Array;
use wasm_bindgen::{throw_val, JsCast, JsValue};
use wasm_bindgen_futures::JsFuture;

use crate::util::{checked_cast_to_usize, clamp_to_u32, promise_to_void_future};

use super::{sys, IntoAsyncRead, ReadableStream};

/// A [`ReadableStreamBYOBReader`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamBYOBReader)
/// that can be used to read chunks from a [`ReadableStream`](ReadableStream).
///
/// This is returned by the [`get_byob_reader`](ReadableStream::get_byob_reader) method.
///
/// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
#[derive(Debug)]
pub struct ReadableStreamBYOBReader<'stream> {
    raw: sys::ReadableStreamBYOBReader,
    _stream: PhantomData<&'stream mut ReadableStream>,
}

impl<'stream> ReadableStreamBYOBReader<'stream> {
    pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
        Ok(Self {
            raw: stream.as_raw().get_reader_with_options(
                sys::ReadableStreamGetReaderOptions::new(sys::ReadableStreamReaderMode::BYOB),
            )?,
            _stream: PhantomData,
        })
    }

    /// Acquires a reference to the underlying [JavaScript reader](sys::ReadableStreamBYOBReader).
    #[inline]
    pub fn as_raw(&self) -> &sys::ReadableStreamBYOBReader {
        &self.raw
    }

    /// Waits for the stream to become closed.
    ///
    /// This returns an error if the stream ever errors, or if the reader's lock is
    /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
    /// closing.
    pub async fn closed(&self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().closed()).await
    }

    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
    /// signaling a loss of interest in the stream by a consumer.
    ///
    /// Equivalent to [`ReadableStream.cancel`](ReadableStream::cancel).
    pub async fn cancel(&mut self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().cancel()).await
    }

    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
    /// signaling a loss of interest in the stream by a consumer.
    ///
    /// Equivalent to [`ReadableStream.cancel_with_reason`](ReadableStream::cancel_with_reason).
    pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
    }

    /// Reads the next chunk from the stream's internal queue into `dst`,
    /// and returns the number of bytes read.
    ///
    /// * If some bytes were read into `dst`, this returns `Ok(bytes_read)`.
    /// * If the stream closes and no more bytes are available, this returns `Ok(0)`.
    /// * If the stream cancels, this returns `Ok(0)`.
    /// * If the stream encounters an `error`, this returns `Err(error)`.
    ///
    /// This always allocated a new temporary `Uint8Array` with the same size as `dst` to hold
    /// the result before copying to `dst`. We cannot pass a view on the backing WebAssembly memory
    /// directly, because:
    /// * `reader.read(view)` needs to transfer `view.buffer`, but `WebAssembly.Memory` buffers
    ///    are non-transferable.
    /// * `view.buffer` can be invalidated if the WebAssembly memory grows while `read(view)`
    ///    is still in progress.
    ///
    /// Therefore, it is necessary to use a separate buffer living in the JavaScript heap.
    /// To avoid repeated allocations for repeated reads,
    /// use [`read_with_buffer`](Self::read_with_buffer).
    pub async fn read(&mut self, dst: &mut [u8]) -> Result<usize, JsValue> {
        let buffer = Uint8Array::new_with_length(clamp_to_u32(dst.len()));
        let (bytes_read, _) = self.read_with_buffer(dst, buffer).await?;
        Ok(bytes_read)
    }

    /// Reads the next chunk from the stream's internal queue into `dst`,
    /// and returns the number of bytes read.
    ///
    /// The given `buffer` is used to store the bytes before they are copied to `dst`.
    /// This buffer is returned back together with the result, so it can be re-used for subsequent
    /// reads without extra allocations. Note that the underlying `ArrayBuffer` is transferred
    /// in the process, so any other views on the original buffer will become unusable.
    ///
    /// * If some bytes were read into `dst`, this returns `Ok((bytes_read, Some(buffer)))`.
    /// * If the stream closes and no more bytes are available, this returns `Ok((0, Some(buffer)))`.
    /// * If the stream cancels, this returns `Ok((0, None))`. In this case, the given buffer is
    ///   not returned.
    /// * If the stream encounters an `error`, this returns `Err(error)`.
    pub async fn read_with_buffer(
        &mut self,
        dst: &mut [u8],
        buffer: Uint8Array,
    ) -> Result<(usize, Option<Uint8Array>), JsValue> {
        // Save the original buffer's byte offset and length.
        let buffer_offset = buffer.byte_offset();
        let buffer_len = buffer.byte_length();
        // Limit view to destination slice's length.
        let dst_len = clamp_to_u32(dst.len());
        let view = buffer
            .subarray(0, dst_len)
            .unchecked_into::<sys::ArrayBufferView>();
        // Read into view. This transfers `buffer.buffer()`.
        let promise = self.as_raw().read(&view);
        let js_value = JsFuture::from(promise).await?;
        let result = sys::ReadableStreamBYOBReadResult::from(js_value);
        let filled_view = match result.value() {
            Some(view) => view,
            None => {
                // No new view was returned. The stream must have been canceled.
                assert!(result.is_done());
                return Ok((0, None));
            }
        };
        let filled_len = checked_cast_to_usize(filled_view.byte_length());
        debug_assert!(filled_len <= dst.len());
        // Re-construct the original Uint8Array with the new ArrayBuffer.
        let new_buffer = Uint8Array::new_with_byte_offset_and_length(
            &filled_view.buffer(),
            buffer_offset,
            buffer_len,
        );
        if result.is_done() {
            debug_assert_eq!(filled_len, 0);
        } else {
            filled_view.copy_to(&mut dst[0..filled_len]);
        }
        Ok((filled_len, Some(new_buffer)))
    }

    /// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
    /// corresponding stream.
    ///
    /// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
    /// the Streams standard allows the lock to be released even when there are still pending read
    /// requests. Such requests will automatically become rejected, and this function will always
    /// succeed.
    ///
    /// However, if the Streams implementation is not yet up-to-date with this change, then
    /// releasing the lock while there are pending read requests will **panic**. For a non-panicking
    /// variant, use [`try_release_lock`](Self::try_release_lock).
    #[inline]
    pub fn release_lock(mut self) {
        self.release_lock_mut()
    }

    fn release_lock_mut(&mut self) {
        self.as_raw()
            .release_lock()
            .unwrap_or_else(|error| throw_val(error.into()))
    }

    /// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
    /// corresponding stream.
    ///
    /// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
    /// the Streams standard allows the lock to be released even when there are still pending read
    /// requests. Such requests will automatically become rejected, and this function will always
    /// return `Ok(())`.
    ///
    /// However, if the Streams implementation is not yet up-to-date with this change, then
    /// the lock cannot be released while there are pending read requests. Attempting to do so will
    /// return an error and leave the reader locked to the stream.
    #[inline]
    pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
        self.as_raw().release_lock().map_err(|error| (error, self))
    }

    /// Converts this `ReadableStreamBYOBReader` into an [`AsyncRead`].
    ///
    /// This is similar to [`ReadableStream.into_async_read`](ReadableStream::into_async_read),
    /// except that after the returned `AsyncRead` is dropped, the original `ReadableStream` is
    /// still usable. This allows reading only a few bytes from the `AsyncRead`, while still
    /// allowing another reader to read the remaining bytes later on.
    ///
    /// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
    #[inline]
    pub fn into_async_read(self) -> IntoAsyncRead<'stream> {
        IntoAsyncRead::new(self, false)
    }
}

impl Drop for ReadableStreamBYOBReader<'_> {
    fn drop(&mut self) {
        self.release_lock_mut();
    }
}