1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
use std::marker::PhantomData;
use wasm_bindgen::{throw_val, JsValue};
use wasm_bindgen_futures::JsFuture;
use crate::util::promise_to_void_future;
use super::{sys, IntoStream, ReadableStream};
/// A [`ReadableStreamDefaultReader`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader)
/// that can be used to read chunks from a [`ReadableStream`](ReadableStream).
///
/// This is returned by the [`get_reader`](ReadableStream::get_reader) method.
///
/// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
#[derive(Debug)]
pub struct ReadableStreamDefaultReader<'stream> {
raw: sys::ReadableStreamDefaultReader,
_stream: PhantomData<&'stream mut ReadableStream>,
}
impl<'stream> ReadableStreamDefaultReader<'stream> {
pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
Ok(Self {
raw: stream.as_raw().get_reader()?,
_stream: PhantomData,
})
}
/// Acquires a reference to the underlying [JavaScript reader](sys::ReadableStreamDefaultReader).
#[inline]
pub fn as_raw(&self) -> &sys::ReadableStreamDefaultReader {
&self.raw
}
/// Waits for the stream to become closed.
///
/// This returns an error if the stream ever errors, or if the reader's lock is
/// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
/// closing.
pub async fn closed(&self) -> Result<(), JsValue> {
promise_to_void_future(self.as_raw().closed()).await
}
/// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
/// signaling a loss of interest in the stream by a consumer.
///
/// Equivalent to [`ReadableStream.cancel`](ReadableStream::cancel).
pub async fn cancel(&mut self) -> Result<(), JsValue> {
promise_to_void_future(self.as_raw().cancel()).await
}
/// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
/// signaling a loss of interest in the stream by a consumer.
///
/// Equivalent to [`ReadableStream.cancel_with_reason`](ReadableStream::cancel_with_reason).
pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
}
/// Reads the next chunk from the stream's internal queue.
///
/// * If a next `chunk` becomes available, this returns `Ok(Some(chunk))`.
/// * If the stream closes and no more chunks are available, this returns `Ok(None)`.
/// * If the stream encounters an `error`, this returns `Err(error)`.
pub async fn read(&mut self) -> Result<Option<JsValue>, JsValue> {
let promise = self.as_raw().read();
let js_value = JsFuture::from(promise).await?;
let result = sys::ReadableStreamDefaultReadResult::from(js_value);
if result.is_done() {
Ok(None)
} else {
Ok(Some(result.value()))
}
}
/// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
/// corresponding stream.
///
/// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
/// the Streams standard allows the lock to be released even when there are still pending read
/// requests. Such requests will automatically become rejected, and this function will always
/// succeed.
///
/// However, if the Streams implementation is not yet up-to-date with this change, then
/// releasing the lock while there are pending read requests will **panic**. For a non-panicking
/// variant, use [`try_release_lock`](Self::try_release_lock).
#[inline]
pub fn release_lock(mut self) {
self.release_lock_mut()
}
fn release_lock_mut(&mut self) {
self.as_raw()
.release_lock()
.unwrap_or_else(|error| throw_val(error.into()))
}
/// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
/// corresponding stream.
///
/// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
/// the Streams standard allows the lock to be released even when there are still pending read
/// requests. Such requests will automatically become rejected, and this function will always
/// return `Ok(())`.
///
/// However, if the Streams implementation is not yet up-to-date with this change, then
/// the lock cannot be released while there are pending read requests. Attempting to do so will
/// return an error and leave the reader locked to the stream.
#[inline]
pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
self.as_raw().release_lock().map_err(|error| (error, self))
}
/// Converts this `ReadableStreamDefaultReader` into a [`Stream`].
///
/// This is similar to [`ReadableStream.into_stream`](ReadableStream::into_stream),
/// except that after the returned `Stream` is dropped, the original `ReadableStream` is still
/// usable. This allows reading only a few chunks from the `Stream`, while still allowing
/// another reader to read the remaining chunks later on.
///
/// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
#[inline]
pub fn into_stream(self) -> IntoStream<'stream> {
IntoStream::new(self, false)
}
}
impl Drop for ReadableStreamDefaultReader<'_> {
fn drop(&mut self) {
self.release_lock_mut();
}
}