wasm_bindgen_futures/
stream.rs1use crate::JsFuture;
9use core::future::Future;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12use futures_core::stream::Stream;
13use js_sys::{AsyncIterator, IteratorNext};
14use wasm_bindgen::prelude::*;
15
16pub struct JsStream {
18 iter: AsyncIterator,
19 next: Option<JsFuture>,
20 done: bool,
21}
22
23impl JsStream {
24 fn next_future(&self) -> Result<JsFuture, JsValue> {
25 self.iter.next().map(JsFuture::from)
26 }
27}
28
29impl From<AsyncIterator> for JsStream {
30 fn from(iter: AsyncIterator) -> Self {
31 JsStream {
32 iter,
33 next: None,
34 done: false,
35 }
36 }
37}
38
39impl Stream for JsStream {
40 type Item = Result<JsValue, JsValue>;
41
42 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
43 if self.done {
44 return Poll::Ready(None);
45 }
46
47 let future = match self.next.as_mut() {
48 Some(val) => val,
49 None => match self.next_future() {
50 Ok(val) => {
51 self.next = Some(val);
52 self.next.as_mut().unwrap()
53 }
54 Err(e) => {
55 self.done = true;
56 return Poll::Ready(Some(Err(e)));
57 }
58 },
59 };
60
61 match Pin::new(future).poll(cx) {
62 Poll::Ready(res) => match res {
63 Ok(iter_next) => {
64 let next = iter_next.unchecked_into::<IteratorNext>();
65 if next.done() {
66 self.done = true;
67 Poll::Ready(None)
68 } else {
69 self.next.take();
70 Poll::Ready(Some(Ok(next.value())))
71 }
72 }
73 Err(e) => {
74 self.done = true;
75 Poll::Ready(Some(Err(e)))
76 }
77 },
78 Poll::Pending => Poll::Pending,
79 }
80 }
81}