gloo_worker/reactor/
scope.rs1use std::convert::Infallible;
2use std::fmt;
3use std::pin::Pin;
4
5use futures::stream::{FusedStream, Stream};
6use futures::task::{Context, Poll};
7use futures::Sink;
8
9pub struct ReactorScope<I, O> {
11 input_stream: Pin<Box<dyn FusedStream<Item = I>>>,
12 output_sink: Pin<Box<dyn Sink<O, Error = Infallible>>>,
13}
14
15impl<I, O> fmt::Debug for ReactorScope<I, O> {
16 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17 f.debug_struct("ReactorScope<_>").finish()
18 }
19}
20
21impl<I, O> Stream for ReactorScope<I, O> {
22 type Item = I;
23
24 #[inline(always)]
25 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26 Pin::new(&mut self.input_stream).poll_next(cx)
27 }
28
29 #[inline(always)]
30 fn size_hint(&self) -> (usize, Option<usize>) {
31 self.input_stream.size_hint()
32 }
33}
34
35impl<I, O> FusedStream for ReactorScope<I, O> {
36 #[inline(always)]
37 fn is_terminated(&self) -> bool {
38 self.input_stream.is_terminated()
39 }
40}
41
42pub trait ReactorScoped: Stream + FusedStream {
44 type Input;
46 type Output;
48
49 fn new<IS, OS>(input_stream: IS, output_sink: OS) -> Self
51 where
52 IS: Stream<Item = Self::Input> + FusedStream + 'static,
53 OS: Sink<Self::Output, Error = Infallible> + 'static;
54}
55
56impl<I, O> ReactorScoped for ReactorScope<I, O> {
57 type Input = I;
58 type Output = O;
59
60 #[inline]
61 fn new<IS, OS>(input_stream: IS, output_sink: OS) -> Self
62 where
63 IS: Stream<Item = Self::Input> + FusedStream + 'static,
64 OS: Sink<Self::Output, Error = Infallible> + 'static,
65 {
66 Self {
67 input_stream: Box::pin(input_stream),
68 output_sink: Box::pin(output_sink),
69 }
70 }
71}
72
73impl<I, O> Sink<O> for ReactorScope<I, O> {
74 type Error = Infallible;
75
76 fn start_send(mut self: Pin<&mut Self>, item: O) -> Result<(), Self::Error> {
77 Pin::new(&mut self.output_sink).start_send(item)
78 }
79
80 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81 Pin::new(&mut self.output_sink).poll_close(cx)
82 }
83
84 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85 Pin::new(&mut self.output_sink).poll_flush(cx)
86 }
87
88 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
89 Pin::new(&mut self.output_sink).poll_flush(cx)
90 }
91}