1use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
19use futures::{prelude::*, lock::Mutex};
20use futures_timer::Delay;
21use std::{pin::Pin, task::{Poll, Context}, time::Duration};
22
23pub struct StatusSinks<T> {
28 inner: Mutex<Inner<T>>,
30 entries_tx: TracingUnboundedSender<YieldAfter<T>>,
32}
33
34struct Inner<T> {
35 entries: stream::FuturesUnordered<YieldAfter<T>>,
37 entries_rx: TracingUnboundedReceiver<YieldAfter<T>>,
39}
40
41struct YieldAfter<T> {
42 delay: Delay,
43 interval: Duration,
44 sender: Option<TracingUnboundedSender<T>>,
45}
46
47impl <T> Default for StatusSinks<T> {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl<T> StatusSinks<T> {
54 pub fn new() -> StatusSinks<T> {
56 let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries");
57
58 StatusSinks {
59 inner: Mutex::new(Inner {
60 entries: stream::FuturesUnordered::new(),
61 entries_rx,
62 }),
63 entries_tx,
64 }
65 }
66
67 pub fn push(&self, interval: Duration, sender: TracingUnboundedSender<T>) {
71 let _ = self.entries_tx.unbounded_send(YieldAfter {
72 delay: Delay::new(interval),
73 interval,
74 sender: Some(sender),
75 });
76 }
77
78 pub async fn next(&self) -> ReadySinkEvent<'_, T> {
83 let mut inner = self.inner.lock().await;
85 let inner = &mut *inner;
86
87 loop {
88 let next_ready_entry = {
91 let entries = &mut inner.entries;
92 async move {
93 if let Some(v) = entries.next().await {
94 v
95 } else {
96 loop {
97 futures::pending!()
98 }
99 }
100 }
101 };
102
103 futures::select!{
104 new_entry = inner.entries_rx.next() => {
105 if let Some(new_entry) = new_entry {
106 inner.entries.push(new_entry);
107 }
108 },
109 (sender, interval) = next_ready_entry.fuse() => {
110 return ReadySinkEvent {
111 sinks: self,
112 sender: Some(sender),
113 interval,
114 }
115 }
116 }
117 }
118 }
119}
120
121#[must_use]
123pub struct ReadySinkEvent<'a, T> {
124 sinks: &'a StatusSinks<T>,
125 sender: Option<TracingUnboundedSender<T>>,
126 interval: Duration,
127}
128
129impl<'a, T> ReadySinkEvent<'a, T> {
130 pub fn send(mut self, element: T) {
132 if let Some(sender) = self.sender.take() {
133 if sender.unbounded_send(element).is_ok() {
134 let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
135 delay: Delay::new(self.interval),
140 interval: self.interval,
141 sender: Some(sender),
142 });
143 }
144 }
145 }
146}
147
148impl<'a, T> Drop for ReadySinkEvent<'a, T> {
149 fn drop(&mut self) {
150 if let Some(sender) = self.sender.take() {
151 if sender.is_closed() {
152 return;
153 }
154
155 let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
156 delay: Delay::new(self.interval),
157 interval: self.interval,
158 sender: Some(sender),
159 });
160 }
161 }
162}
163
164impl<T> futures::Future for YieldAfter<T> {
165 type Output = (TracingUnboundedSender<T>, Duration);
166
167 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
168 let this = Pin::into_inner(self);
169
170 match Pin::new(&mut this.delay).poll(cx) {
171 Poll::Pending => Poll::Pending,
172 Poll::Ready(()) => {
173 let sender = this.sender.take()
174 .expect("sender is always Some unless the future is finished; qed");
175 Poll::Ready((sender, this.interval))
176 }
177 }
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use crate::mpsc::tracing_unbounded;
184 use super::StatusSinks;
185 use futures::prelude::*;
186 use std::time::Duration;
187
188 #[test]
189 fn works() {
190 let status_sinks = StatusSinks::new();
194
195 let (tx, rx) = tracing_unbounded("test");
196 status_sinks.push(Duration::from_millis(100), tx);
197
198 let mut val_order = 5;
199
200 futures::executor::block_on(futures::future::select(
201 Box::pin(async move {
202 loop {
203 let ev = status_sinks.next().await;
204 val_order += 1;
205 ev.send(val_order);
206 }
207 }),
208 Box::pin(async {
209 let items: Vec<i32> = rx.take(3).collect().await;
210 assert_eq!(items, [6, 7, 8]);
211 })
212 ));
213 }
214}