1use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
20use futures::{lock::Mutex, prelude::*};
21use futures_timer::Delay;
22use std::{
23 pin::Pin,
24 task::{Context, Poll},
25 time::Duration,
26};
27
28pub struct StatusSinks<T> {
33 inner: Mutex<Inner<T>>,
35 entries_tx: TracingUnboundedSender<YieldAfter<T>>,
37}
38
39struct Inner<T> {
40 entries: stream::FuturesUnordered<YieldAfter<T>>,
42 entries_rx: TracingUnboundedReceiver<YieldAfter<T>>,
44}
45
46struct YieldAfter<T> {
47 delay: Delay,
48 interval: Duration,
49 sender: Option<TracingUnboundedSender<T>>,
50}
51
52impl<T> Default for StatusSinks<T> {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl<T> StatusSinks<T> {
59 pub fn new() -> StatusSinks<T> {
61 let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries", 100_000);
62
63 StatusSinks {
64 inner: Mutex::new(Inner { entries: stream::FuturesUnordered::new(), entries_rx }),
65 entries_tx,
66 }
67 }
68
69 pub fn push(&self, interval: Duration, sender: TracingUnboundedSender<T>) {
73 let _ = self.entries_tx.unbounded_send(YieldAfter {
74 delay: Delay::new(interval),
75 interval,
76 sender: Some(sender),
77 });
78 }
79
80 pub async fn next(&self) -> ReadySinkEvent<'_, T> {
85 let mut inner = self.inner.lock().await;
87 let inner = &mut *inner;
88
89 loop {
90 let next_ready_entry = {
93 let entries = &mut inner.entries;
94 async move {
95 if let Some(v) = entries.next().await {
96 v
97 } else {
98 loop {
99 futures::pending!()
100 }
101 }
102 }
103 };
104
105 futures::select! {
106 new_entry = inner.entries_rx.next() => {
107 if let Some(new_entry) = new_entry {
108 inner.entries.push(new_entry);
109 }
110 },
111 (sender, interval) = next_ready_entry.fuse() => {
112 return ReadySinkEvent {
113 sinks: self,
114 sender: Some(sender),
115 interval,
116 }
117 }
118 }
119 }
120 }
121}
122
123#[must_use]
125pub struct ReadySinkEvent<'a, T> {
126 sinks: &'a StatusSinks<T>,
127 sender: Option<TracingUnboundedSender<T>>,
128 interval: Duration,
129}
130
131impl<'a, T> ReadySinkEvent<'a, T> {
132 pub fn send(mut self, element: T) {
134 if let Some(sender) = self.sender.take() {
135 if sender.unbounded_send(element).is_ok() {
136 let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
137 delay: Delay::new(self.interval),
142 interval: self.interval,
143 sender: Some(sender),
144 });
145 }
146 }
147 }
148}
149
150impl<'a, T> Drop for ReadySinkEvent<'a, T> {
151 fn drop(&mut self) {
152 if let Some(sender) = self.sender.take() {
153 if sender.is_closed() {
154 return
155 }
156
157 let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
158 delay: Delay::new(self.interval),
159 interval: self.interval,
160 sender: Some(sender),
161 });
162 }
163 }
164}
165
166impl<T> futures::Future for YieldAfter<T> {
167 type Output = (TracingUnboundedSender<T>, Duration);
168
169 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
170 let this = Pin::into_inner(self);
171
172 match Pin::new(&mut this.delay).poll(cx) {
173 Poll::Pending => Poll::Pending,
174 Poll::Ready(()) => {
175 let sender = this
176 .sender
177 .take()
178 .expect("sender is always Some unless the future is finished; qed");
179 Poll::Ready((sender, this.interval))
180 },
181 }
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::StatusSinks;
188 use crate::mpsc::tracing_unbounded;
189 use futures::prelude::*;
190 use std::time::Duration;
191
192 #[test]
193 fn works() {
194 let status_sinks = StatusSinks::new();
198
199 let (tx, rx) = tracing_unbounded("test", 100_000);
200 status_sinks.push(Duration::from_millis(100), tx);
201
202 let mut val_order = 5;
203
204 futures::executor::block_on(futures::future::select(
205 Box::pin(async move {
206 loop {
207 let ev = status_sinks.next().await;
208 val_order += 1;
209 ev.send(val_order);
210 }
211 }),
212 Box::pin(async {
213 let items: Vec<i32> = rx.take(3).collect().await;
214 assert_eq!(items, [6, 7, 8]);
215 }),
216 ));
217 }
218}