gloo_worker/reactor/
scope.rs

1use std::convert::Infallible;
2use std::fmt;
3use std::pin::Pin;
4
5use futures::stream::{FusedStream, Stream};
6use futures::task::{Context, Poll};
7use futures::Sink;
8
9/// A handle to communicate with bridges.
10pub struct ReactorScope<I, O> {
11    input_stream: Pin<Box<dyn FusedStream<Item = I>>>,
12    output_sink: Pin<Box<dyn Sink<O, Error = Infallible>>>,
13}
14
15impl<I, O> fmt::Debug for ReactorScope<I, O> {
16    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17        f.debug_struct("ReactorScope<_>").finish()
18    }
19}
20
21impl<I, O> Stream for ReactorScope<I, O> {
22    type Item = I;
23
24    #[inline(always)]
25    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26        Pin::new(&mut self.input_stream).poll_next(cx)
27    }
28
29    #[inline(always)]
30    fn size_hint(&self) -> (usize, Option<usize>) {
31        self.input_stream.size_hint()
32    }
33}
34
35impl<I, O> FusedStream for ReactorScope<I, O> {
36    #[inline(always)]
37    fn is_terminated(&self) -> bool {
38        self.input_stream.is_terminated()
39    }
40}
41
42/// A helper trait to extract the input and output type from a [ReactorStream].
43pub trait ReactorScoped: Stream + FusedStream {
44    /// The Input Message.
45    type Input;
46    /// The Output Message.
47    type Output;
48
49    /// Creates a ReactorReceiver.
50    fn new<IS, OS>(input_stream: IS, output_sink: OS) -> Self
51    where
52        IS: Stream<Item = Self::Input> + FusedStream + 'static,
53        OS: Sink<Self::Output, Error = Infallible> + 'static;
54}
55
56impl<I, O> ReactorScoped for ReactorScope<I, O> {
57    type Input = I;
58    type Output = O;
59
60    #[inline]
61    fn new<IS, OS>(input_stream: IS, output_sink: OS) -> Self
62    where
63        IS: Stream<Item = Self::Input> + FusedStream + 'static,
64        OS: Sink<Self::Output, Error = Infallible> + 'static,
65    {
66        Self {
67            input_stream: Box::pin(input_stream),
68            output_sink: Box::pin(output_sink),
69        }
70    }
71}
72
73impl<I, O> Sink<O> for ReactorScope<I, O> {
74    type Error = Infallible;
75
76    fn start_send(mut self: Pin<&mut Self>, item: O) -> Result<(), Self::Error> {
77        Pin::new(&mut self.output_sink).start_send(item)
78    }
79
80    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81        Pin::new(&mut self.output_sink).poll_close(cx)
82    }
83
84    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85        Pin::new(&mut self.output_sink).poll_flush(cx)
86    }
87
88    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
89        Pin::new(&mut self.output_sink).poll_flush(cx)
90    }
91}