broker_tokio/sync/watch.rs
1//! A single-producer, multi-consumer channel that only retains the *last* sent
2//! value.
3//!
4//! This channel is useful for watching for changes to a value from multiple
5//! points in the code base, for example, changes to configuration values.
6//!
7//! # Usage
8//!
9//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are
10//! the producer and sender halves of the channel. The channel is
11//! created with an initial value. [`Receiver::recv`] will always
12//! be ready upon creation and will yield either this initial value or
13//! the latest value that has been sent by `Sender`.
14//!
15//! Calls to [`Receiver::recv`] will always yield the latest value.
16//!
17//! # Examples
18//!
19//! ```
20//! use tokio::sync::watch;
21//!
22//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23//! let (tx, mut rx) = watch::channel("hello");
24//!
25//! tokio::spawn(async move {
26//! while let Some(value) = rx.recv().await {
27//! println!("received = {:?}", value);
28//! }
29//! });
30//!
31//! tx.broadcast("world")?;
32//! # Ok(())
33//! # }
34//! ```
35//!
36//! # Closing
37//!
38//! [`Sender::closed`] allows the producer to detect when all [`Receiver`]
39//! handles have been dropped. This indicates that there is no further interest
40//! in the values being produced and work can be stopped.
41//!
42//! # Thread safety
43//!
44//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
45//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
46//! handles may be moved to separate threads and also used concurrently.
47//!
48//! [`Sender`]: crate::sync::watch::Sender
49//! [`Receiver`]: crate::sync::watch::Receiver
50//! [`Receiver::recv`]: crate::sync::watch::Receiver::recv
51//! [`channel`]: crate::sync::watch::channel
52//! [`Sender::closed`]: crate::sync::watch::Sender::closed
53
54use crate::future::poll_fn;
55use crate::sync::task::AtomicWaker;
56
57use fnv::FnvHashMap;
58use std::ops;
59use std::sync::atomic::AtomicUsize;
60use std::sync::atomic::Ordering::SeqCst;
61use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
62use std::task::Poll::{Pending, Ready};
63use std::task::{Context, Poll};
64
65/// Receives values from the associated [`Sender`](struct.Sender.html).
66///
67/// Instances are created by the [`channel`](fn.channel.html) function.
68#[derive(Debug)]
69pub struct Receiver<T> {
70 /// Pointer to the shared state
71 shared: Arc<Shared<T>>,
72
73 /// Pointer to the watcher's internal state
74 inner: Arc<WatchInner>,
75
76 /// Watcher ID.
77 id: u64,
78
79 /// Last observed version
80 ver: usize,
81}
82
83/// Sends values to the associated [`Receiver`](struct.Receiver.html).
84///
85/// Instances are created by the [`channel`](fn.channel.html) function.
86#[derive(Debug)]
87pub struct Sender<T> {
88 shared: Weak<Shared<T>>,
89}
90
91/// Returns a reference to the inner value
92///
93/// Outstanding borrows hold a read lock on the inner value. This means that
94/// long lived borrows could cause the produce half to block. It is recommended
95/// to keep the borrow as short lived as possible.
96#[derive(Debug)]
97pub struct Ref<'a, T> {
98 inner: RwLockReadGuard<'a, T>,
99}
100
101pub mod error {
102 //! Watch error types
103
104 use std::fmt;
105
106 /// Error produced when sending a value fails.
107 #[derive(Debug)]
108 pub struct SendError<T> {
109 pub(crate) inner: T,
110 }
111
112 // ===== impl SendError =====
113
114 impl<T: fmt::Debug> fmt::Display for SendError<T> {
115 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
116 write!(fmt, "channel closed")
117 }
118 }
119
120 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
121}
122
123#[derive(Debug)]
124struct Shared<T> {
125 /// The most recent value
126 value: RwLock<T>,
127
128 /// The current version
129 ///
130 /// The lowest bit represents a "closed" state. The rest of the bits
131 /// represent the current version.
132 version: AtomicUsize,
133
134 /// All watchers
135 watchers: Mutex<Watchers>,
136
137 /// Task to notify when all watchers drop
138 cancel: AtomicWaker,
139}
140
141#[derive(Debug)]
142struct Watchers {
143 next_id: u64,
144 watchers: FnvHashMap<u64, Arc<WatchInner>>,
145}
146
147#[derive(Debug)]
148struct WatchInner {
149 waker: AtomicWaker,
150}
151
152const CLOSED: usize = 1;
153
154/// Create a new watch channel, returning the "send" and "receive" handles.
155///
156/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
157/// Only the last value sent is made available to the [`Receiver`] half. All
158/// intermediate values are dropped.
159///
160/// # Examples
161///
162/// ```
163/// use tokio::sync::watch;
164///
165/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
166/// let (tx, mut rx) = watch::channel("hello");
167///
168/// tokio::spawn(async move {
169/// while let Some(value) = rx.recv().await {
170/// println!("received = {:?}", value);
171/// }
172/// });
173///
174/// tx.broadcast("world")?;
175/// # Ok(())
176/// # }
177/// ```
178///
179/// [`Sender`]: struct.Sender.html
180/// [`Receiver`]: struct.Receiver.html
181pub fn channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>) {
182 const INIT_ID: u64 = 0;
183
184 let inner = Arc::new(WatchInner::new());
185
186 // Insert the watcher
187 let mut watchers = FnvHashMap::with_capacity_and_hasher(0, Default::default());
188 watchers.insert(INIT_ID, inner.clone());
189
190 let shared = Arc::new(Shared {
191 value: RwLock::new(init),
192 version: AtomicUsize::new(2),
193 watchers: Mutex::new(Watchers {
194 next_id: INIT_ID + 1,
195 watchers,
196 }),
197 cancel: AtomicWaker::new(),
198 });
199
200 let tx = Sender {
201 shared: Arc::downgrade(&shared),
202 };
203
204 let rx = Receiver {
205 shared,
206 inner,
207 id: INIT_ID,
208 ver: 0,
209 };
210
211 (tx, rx)
212}
213
214impl<T> Receiver<T> {
215 /// Returns a reference to the most recently sent value
216 ///
217 /// Outstanding borrows hold a read lock. This means that long lived borrows
218 /// could cause the send half to block. It is recommended to keep the borrow
219 /// as short lived as possible.
220 ///
221 /// # Examples
222 ///
223 /// ```
224 /// use tokio::sync::watch;
225 ///
226 /// let (_, rx) = watch::channel("hello");
227 /// assert_eq!(*rx.borrow(), "hello");
228 /// ```
229 pub fn borrow(&self) -> Ref<'_, T> {
230 let inner = self.shared.value.read().unwrap();
231 Ref { inner }
232 }
233
234 // TODO: document
235 #[doc(hidden)]
236 pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
237 // Make sure the task is up to date
238 self.inner.waker.register_by_ref(cx.waker());
239
240 let state = self.shared.version.load(SeqCst);
241 let version = state & !CLOSED;
242
243 if version != self.ver {
244 let inner = self.shared.value.read().unwrap();
245 self.ver = version;
246
247 return Ready(Some(Ref { inner }));
248 }
249
250 if CLOSED == state & CLOSED {
251 // The `Store` handle has been dropped.
252 return Ready(None);
253 }
254
255 Pending
256 }
257}
258
259impl<T: Clone> Receiver<T> {
260 /// Attempts to clone the latest value sent via the channel.
261 pub async fn recv(&mut self) -> Option<T> {
262 poll_fn(|cx| {
263 let v_ref = ready!(self.poll_recv_ref(cx));
264 Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
265 })
266 .await
267 }
268}
269
270#[cfg(feature = "stream")]
271impl<T: Clone> crate::stream::Stream for Receiver<T> {
272 type Item = T;
273
274 fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
275 let v_ref = ready!(self.poll_recv_ref(cx));
276
277 Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
278 }
279}
280
281impl<T> Clone for Receiver<T> {
282 fn clone(&self) -> Self {
283 let inner = Arc::new(WatchInner::new());
284 let shared = self.shared.clone();
285
286 let id = {
287 let mut watchers = shared.watchers.lock().unwrap();
288 let id = watchers.next_id;
289
290 watchers.next_id += 1;
291 watchers.watchers.insert(id, inner.clone());
292
293 id
294 };
295
296 let ver = self.ver;
297
298 Receiver {
299 shared,
300 inner,
301 id,
302 ver,
303 }
304 }
305}
306
307impl<T> Drop for Receiver<T> {
308 fn drop(&mut self) {
309 let mut watchers = self.shared.watchers.lock().unwrap();
310 watchers.watchers.remove(&self.id);
311 }
312}
313
314impl WatchInner {
315 fn new() -> Self {
316 WatchInner {
317 waker: AtomicWaker::new(),
318 }
319 }
320}
321
322impl<T> Sender<T> {
323 /// Broadcast a new value via the channel, notifying all receivers.
324 pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
325 let shared = match self.shared.upgrade() {
326 Some(shared) => shared,
327 // All `Watch` handles have been canceled
328 None => return Err(error::SendError { inner: value }),
329 };
330
331 // Replace the value
332 {
333 let mut lock = shared.value.write().unwrap();
334 *lock = value;
335 }
336
337 // Update the version. 2 is used so that the CLOSED bit is not set.
338 shared.version.fetch_add(2, SeqCst);
339
340 // Notify all watchers
341 notify_all(&*shared);
342
343 // Return the old value
344 Ok(())
345 }
346
347 /// Completes when all receivers have dropped.
348 ///
349 /// This allows the producer to get notified when interest in the produced
350 /// values is canceled and immediately stop doing work.
351 pub async fn closed(&mut self) {
352 poll_fn(|cx| self.poll_close(cx)).await
353 }
354
355 fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
356 match self.shared.upgrade() {
357 Some(shared) => {
358 shared.cancel.register_by_ref(cx.waker());
359 Pending
360 }
361 None => Ready(()),
362 }
363 }
364}
365
366/// Notify all watchers of a change
367fn notify_all<T>(shared: &Shared<T>) {
368 let watchers = shared.watchers.lock().unwrap();
369
370 for watcher in watchers.watchers.values() {
371 // Notify the task
372 watcher.waker.wake();
373 }
374}
375
376impl<T> Drop for Sender<T> {
377 fn drop(&mut self) {
378 if let Some(shared) = self.shared.upgrade() {
379 shared.version.fetch_or(CLOSED, SeqCst);
380 notify_all(&*shared);
381 }
382 }
383}
384
385// ===== impl Ref =====
386
387impl<T> ops::Deref for Ref<'_, T> {
388 type Target = T;
389
390 fn deref(&self) -> &T {
391 self.inner.deref()
392 }
393}
394
395// ===== impl Shared =====
396
397impl<T> Drop for Shared<T> {
398 fn drop(&mut self) {
399 self.cancel.wake();
400 }
401}