sc_utils/
status_sinks.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
20use futures::{lock::Mutex, prelude::*};
21use futures_timer::Delay;
22use std::{
23	pin::Pin,
24	task::{Context, Poll},
25	time::Duration,
26};
27
28/// Holds a list of `UnboundedSender`s, each associated with a certain time period. Every time the
29/// period elapses, we push an element on the sender.
30///
31/// Senders are removed only when they are closed.
32pub struct StatusSinks<T> {
33	/// Should only be locked by `next`.
34	inner: Mutex<Inner<T>>,
35	/// Sending side of `Inner::entries_rx`.
36	entries_tx: TracingUnboundedSender<YieldAfter<T>>,
37}
38
39struct Inner<T> {
40	/// The actual entries of the list.
41	entries: stream::FuturesUnordered<YieldAfter<T>>,
42	/// Receives new entries and puts them in `entries`.
43	entries_rx: TracingUnboundedReceiver<YieldAfter<T>>,
44}
45
46struct YieldAfter<T> {
47	delay: Delay,
48	interval: Duration,
49	sender: Option<TracingUnboundedSender<T>>,
50}
51
52impl<T> Default for StatusSinks<T> {
53	fn default() -> Self {
54		Self::new()
55	}
56}
57
58impl<T> StatusSinks<T> {
59	/// Builds a new empty collection.
60	pub fn new() -> StatusSinks<T> {
61		let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries", 100_000);
62
63		StatusSinks {
64			inner: Mutex::new(Inner { entries: stream::FuturesUnordered::new(), entries_rx }),
65			entries_tx,
66		}
67	}
68
69	/// Adds a sender to the collection.
70	///
71	/// The `interval` is the time period between two pushes on the sender.
72	pub fn push(&self, interval: Duration, sender: TracingUnboundedSender<T>) {
73		let _ = self.entries_tx.unbounded_send(YieldAfter {
74			delay: Delay::new(interval),
75			interval,
76			sender: Some(sender),
77		});
78	}
79
80	/// Waits until one of the sinks is ready, then returns an object that can be used to send
81	/// an element on said sink.
82	///
83	/// If the object isn't used to send an element, the slot is skipped.
84	pub async fn next(&self) -> ReadySinkEvent<'_, T> {
85		// This is only ever locked by `next`, which means that one `next` at a time can run.
86		let mut inner = self.inner.lock().await;
87		let inner = &mut *inner;
88
89		loop {
90			// Future that produces the next ready entry in `entries`, or doesn't produce anything
91			// if the list is empty.
92			let next_ready_entry = {
93				let entries = &mut inner.entries;
94				async move {
95					if let Some(v) = entries.next().await {
96						v
97					} else {
98						loop {
99							futures::pending!()
100						}
101					}
102				}
103			};
104
105			futures::select! {
106				new_entry = inner.entries_rx.next() => {
107					if let Some(new_entry) = new_entry {
108						inner.entries.push(new_entry);
109					}
110				},
111				(sender, interval) = next_ready_entry.fuse() => {
112					return ReadySinkEvent {
113						sinks: self,
114						sender: Some(sender),
115						interval,
116					}
117				}
118			}
119		}
120	}
121}
122
123/// One of the sinks is ready.
124#[must_use]
125pub struct ReadySinkEvent<'a, T> {
126	sinks: &'a StatusSinks<T>,
127	sender: Option<TracingUnboundedSender<T>>,
128	interval: Duration,
129}
130
131impl<'a, T> ReadySinkEvent<'a, T> {
132	/// Sends an element on the sender.
133	pub fn send(mut self, element: T) {
134		if let Some(sender) = self.sender.take() {
135			if sender.unbounded_send(element).is_ok() {
136				let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
137					// Note that since there's a small delay between the moment a task is
138					// woken up and the moment it is polled, the period is actually not
139					// `interval` but `interval + <delay>`. We ignore this problem in
140					// practice.
141					delay: Delay::new(self.interval),
142					interval: self.interval,
143					sender: Some(sender),
144				});
145			}
146		}
147	}
148}
149
150impl<'a, T> Drop for ReadySinkEvent<'a, T> {
151	fn drop(&mut self) {
152		if let Some(sender) = self.sender.take() {
153			if sender.is_closed() {
154				return
155			}
156
157			let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
158				delay: Delay::new(self.interval),
159				interval: self.interval,
160				sender: Some(sender),
161			});
162		}
163	}
164}
165
166impl<T> futures::Future for YieldAfter<T> {
167	type Output = (TracingUnboundedSender<T>, Duration);
168
169	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
170		let this = Pin::into_inner(self);
171
172		match Pin::new(&mut this.delay).poll(cx) {
173			Poll::Pending => Poll::Pending,
174			Poll::Ready(()) => {
175				let sender = this
176					.sender
177					.take()
178					.expect("sender is always Some unless the future is finished; qed");
179				Poll::Ready((sender, this.interval))
180			},
181		}
182	}
183}
184
185#[cfg(test)]
186mod tests {
187	use super::StatusSinks;
188	use crate::mpsc::tracing_unbounded;
189	use futures::prelude::*;
190	use std::time::Duration;
191
192	#[test]
193	fn works() {
194		// We're not testing that the `StatusSink` properly enforces an order in the intervals, as
195		// this easily causes test failures on busy CPUs.
196
197		let status_sinks = StatusSinks::new();
198
199		let (tx, rx) = tracing_unbounded("test", 100_000);
200		status_sinks.push(Duration::from_millis(100), tx);
201
202		let mut val_order = 5;
203
204		futures::executor::block_on(futures::future::select(
205			Box::pin(async move {
206				loop {
207					let ev = status_sinks.next().await;
208					val_order += 1;
209					ev.send(val_order);
210				}
211			}),
212			Box::pin(async {
213				let items: Vec<i32> = rx.take(3).collect().await;
214				assert_eq!(items, [6, 7, 8]);
215			}),
216		));
217	}
218}