sp_utils/
status_sinks.rs

1// This file is part of Substrate.
2
3// Copyright (C) 2019-2021 Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
19use futures::{prelude::*, lock::Mutex};
20use futures_timer::Delay;
21use std::{pin::Pin, task::{Poll, Context}, time::Duration};
22
23/// Holds a list of `UnboundedSender`s, each associated with a certain time period. Every time the
24/// period elapses, we push an element on the sender.
25///
26/// Senders are removed only when they are closed.
27pub struct StatusSinks<T> {
28	/// Should only be locked by `next`.
29	inner: Mutex<Inner<T>>,
30	/// Sending side of `Inner::entries_rx`.
31	entries_tx: TracingUnboundedSender<YieldAfter<T>>,
32}
33
34struct Inner<T> {
35	/// The actual entries of the list.
36	entries: stream::FuturesUnordered<YieldAfter<T>>,
37	/// Receives new entries and puts them in `entries`.
38	entries_rx: TracingUnboundedReceiver<YieldAfter<T>>,
39}
40
41struct YieldAfter<T> {
42	delay: Delay,
43	interval: Duration,
44	sender: Option<TracingUnboundedSender<T>>,
45}
46
47impl <T> Default for StatusSinks<T> {
48	fn default() -> Self {
49		Self::new()
50	}
51}
52
53impl<T> StatusSinks<T> {
54	/// Builds a new empty collection.
55	pub fn new() -> StatusSinks<T> {
56		let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries");
57
58		StatusSinks {
59			inner: Mutex::new(Inner {
60				entries: stream::FuturesUnordered::new(),
61				entries_rx,
62			}),
63			entries_tx,
64		}
65	}
66
67	/// Adds a sender to the collection.
68	///
69	/// The `interval` is the time period between two pushes on the sender.
70	pub fn push(&self, interval: Duration, sender: TracingUnboundedSender<T>) {
71		let _ = self.entries_tx.unbounded_send(YieldAfter {
72			delay: Delay::new(interval),
73			interval,
74			sender: Some(sender),
75		});
76	}
77
78	/// Waits until one of the sinks is ready, then returns an object that can be used to send
79	/// an element on said sink.
80	///
81	/// If the object isn't used to send an element, the slot is skipped.
82	pub async fn next(&self) -> ReadySinkEvent<'_, T> {
83		// This is only ever locked by `next`, which means that one `next` at a time can run.
84		let mut inner = self.inner.lock().await;
85		let inner = &mut *inner;
86
87		loop {
88			// Future that produces the next ready entry in `entries`, or doesn't produce anything if
89			// the list is empty.
90			let next_ready_entry = {
91				let entries = &mut inner.entries;
92				async move {
93					if let Some(v) = entries.next().await {
94						v
95					} else {
96						loop {
97							futures::pending!()
98						}
99					}
100				}
101			};
102
103			futures::select!{
104				new_entry = inner.entries_rx.next() => {
105					if let Some(new_entry) = new_entry {
106						inner.entries.push(new_entry);
107					}
108				},
109				(sender, interval) = next_ready_entry.fuse() => {
110					return ReadySinkEvent {
111						sinks: self,
112						sender: Some(sender),
113						interval,
114					}
115				}
116			}
117		}
118	}
119}
120
121/// One of the sinks is ready.
122#[must_use]
123pub struct ReadySinkEvent<'a, T> {
124	sinks: &'a StatusSinks<T>,
125	sender: Option<TracingUnboundedSender<T>>,
126	interval: Duration,
127}
128
129impl<'a, T> ReadySinkEvent<'a, T> {
130	/// Sends an element on the sender.
131	pub fn send(mut self, element: T) {
132		if let Some(sender) = self.sender.take() {
133			if sender.unbounded_send(element).is_ok() {
134				let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
135					// Note that since there's a small delay between the moment a task is
136					// woken up and the moment it is polled, the period is actually not
137					// `interval` but `interval + <delay>`. We ignore this problem in
138					// practice.
139					delay: Delay::new(self.interval),
140					interval: self.interval,
141					sender: Some(sender),
142				});
143			}
144		}
145	}
146}
147
148impl<'a, T> Drop for ReadySinkEvent<'a, T> {
149	fn drop(&mut self) {
150		if let Some(sender) = self.sender.take() {
151			if sender.is_closed() {
152				return;
153			}
154
155			let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
156				delay: Delay::new(self.interval),
157				interval: self.interval,
158				sender: Some(sender),
159			});
160		}
161	}
162}
163
164impl<T> futures::Future for YieldAfter<T> {
165	type Output = (TracingUnboundedSender<T>, Duration);
166
167	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
168		let this = Pin::into_inner(self);
169
170		match Pin::new(&mut this.delay).poll(cx) {
171			Poll::Pending => Poll::Pending,
172			Poll::Ready(()) => {
173				let sender = this.sender.take()
174					.expect("sender is always Some unless the future is finished; qed");
175				Poll::Ready((sender, this.interval))
176			}
177		}
178	}
179}
180
181#[cfg(test)]
182mod tests {
183	use crate::mpsc::tracing_unbounded;
184	use super::StatusSinks;
185	use futures::prelude::*;
186	use std::time::Duration;
187
188	#[test]
189	fn works() {
190		// We're not testing that the `StatusSink` properly enforces an order in the intervals, as
191		// this easily causes test failures on busy CPUs.
192
193		let status_sinks = StatusSinks::new();
194
195		let (tx, rx) = tracing_unbounded("test");
196		status_sinks.push(Duration::from_millis(100), tx);
197
198		let mut val_order = 5;
199
200		futures::executor::block_on(futures::future::select(
201			Box::pin(async move {
202				loop {
203					let ev = status_sinks.next().await;
204					val_order += 1;
205					ev.send(val_order);
206				}
207			}),
208			Box::pin(async {
209				let items: Vec<i32> = rx.take(3).collect().await;
210				assert_eq!(items, [6, 7, 8]);
211			})
212		));
213	}
214}