1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use core::pin::Pin;
use core::task::{Context, Poll};

use futures_util::io::{AsyncRead, Error};
use futures_util::ready;
use futures_util::FutureExt;
use js_sys::Uint8Array;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;

use crate::util::{checked_cast_to_usize, clamp_to_u32, js_to_io_error};

use super::sys::{ArrayBufferView, ReadableStreamBYOBReadResult};
use super::ReadableStreamBYOBReader;

/// An [`AsyncRead`] for the [`into_async_read`](super::ReadableStream::into_async_read) method.
///
/// This `AsyncRead` holds a reader, and therefore locks the [`ReadableStream`](super::ReadableStream).
/// When this `AsyncRead` is dropped, it also drops its reader which in turn
/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
///
/// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
#[must_use = "readers do nothing unless polled"]
#[derive(Debug)]
pub struct IntoAsyncRead<'reader> {
    reader: Option<ReadableStreamBYOBReader<'reader>>,
    buffer: Option<Uint8Array>,
    fut: Option<JsFuture>,
    cancel_on_drop: bool,
}

impl<'reader> IntoAsyncRead<'reader> {
    #[inline]
    pub(super) fn new(reader: ReadableStreamBYOBReader, cancel_on_drop: bool) -> IntoAsyncRead {
        IntoAsyncRead {
            reader: Some(reader),
            buffer: None,
            fut: None,
            cancel_on_drop,
        }
    }

    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
    /// signaling a loss of interest in the stream by a consumer.
    pub async fn cancel(mut self) -> Result<(), JsValue> {
        match self.reader.take() {
            Some(mut reader) => reader.cancel().await,
            None => Ok(()),
        }
    }

    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
    /// signaling a loss of interest in the stream by a consumer.
    pub async fn cancel_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
        match self.reader.take() {
            Some(mut reader) => reader.cancel_with_reason(reason).await,
            None => Ok(()),
        }
    }

    #[inline]
    fn discard_reader(mut self: Pin<&mut Self>) {
        self.reader = None;
        self.buffer = None;
    }
}

impl<'reader> AsyncRead for IntoAsyncRead<'reader> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize, Error>> {
        let read_fut = match self.fut.as_mut() {
            Some(fut) => fut,
            None => {
                // No pending read, start reading the next bytes
                let buf_len = clamp_to_u32(buf.len());
                let buffer = match self.buffer.take() {
                    // Re-use the internal buffer if it is large enough,
                    // otherwise allocate a new one
                    Some(buffer) if buffer.byte_length() >= buf_len => buffer,
                    _ => Uint8Array::new_with_length(buf_len),
                };
                // Limit to output buffer size
                let buffer = buffer
                    .subarray(0, buf_len)
                    .unchecked_into::<ArrayBufferView>();
                match &self.reader {
                    Some(reader) => {
                        // Read into internal buffer and store its future
                        let fut = JsFuture::from(reader.as_raw().read(&buffer));
                        self.fut.insert(fut)
                    }
                    None => {
                        // Reader was already dropped
                        return Poll::Ready(Ok(0));
                    }
                }
            }
        };

        // Poll the future for the pending read
        let js_result = ready!(read_fut.poll_unpin(cx));
        self.fut = None;

        // Read completed
        Poll::Ready(match js_result {
            Ok(js_value) => {
                let result = ReadableStreamBYOBReadResult::from(js_value);
                if result.is_done() {
                    // End of stream
                    self.discard_reader();
                    Ok(0)
                } else {
                    // Cannot be canceled, so view must exist
                    let filled_view = result.value().unwrap_throw();
                    // Copy bytes to output buffer
                    let filled_len = checked_cast_to_usize(filled_view.byte_length());
                    debug_assert!(filled_len <= buf.len());
                    filled_view.copy_to(&mut buf[0..filled_len]);
                    // Re-construct internal buffer with the new ArrayBuffer
                    self.buffer = Some(Uint8Array::new(&filled_view.buffer()));
                    Ok(filled_len)
                }
            }
            Err(js_value) => {
                // Error
                self.discard_reader();
                Err(js_to_io_error(js_value))
            }
        })
    }
}

impl<'reader> Drop for IntoAsyncRead<'reader> {
    fn drop(&mut self) {
        if self.cancel_on_drop {
            if let Some(reader) = self.reader.take() {
                let _ = reader.as_raw().cancel().catch(&Closure::once(|_| {}));
            }
        }
    }
}