futures_rx/subject/
shareable_subject.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
use std::{
    pin::Pin,
    task::{Context, Poll},
};

use futures::{stream::Fuse, Stream, StreamExt};

use crate::Observable;

use super::Subject;

pub(crate) struct ShareableSubject<S: Stream, Sub: Subject<Item = S::Item>> {
    stream: Pin<Box<Fuse<S>>>,
    subject: Sub,
}

impl<S: Stream, Sub: Subject<Item = S::Item>> ShareableSubject<S, Sub> {
    pub(crate) fn new(stream: S, subject: Sub) -> Self {
        Self {
            stream: Box::pin(stream.fuse()),
            subject,
        }
    }

    pub(crate) fn subscribe(&mut self) -> Observable<S::Item> {
        self.subject.subscribe()
    }

    pub(crate) fn poll_next(&mut self, cx: &mut Context<'_>) {
        match self.stream.poll_next_unpin(cx) {
            Poll::Ready(Some(value)) => self.subject.next(value),
            Poll::Ready(None) => self.subject.close(),
            Poll::Pending => {}
        }
    }
}