broker_tokio/sync/
watch.rs

1//! A single-producer, multi-consumer channel that only retains the *last* sent
2//! value.
3//!
4//! This channel is useful for watching for changes to a value from multiple
5//! points in the code base, for example, changes to configuration values.
6//!
7//! # Usage
8//!
9//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are
10//! the producer and sender halves of the channel. The channel is
11//! created with an initial value. [`Receiver::recv`] will always
12//! be ready upon creation and will yield either this initial value or
13//! the latest value that has been sent by `Sender`.
14//!
15//! Calls to [`Receiver::recv`] will always yield the latest value.
16//!
17//! # Examples
18//!
19//! ```
20//! use tokio::sync::watch;
21//!
22//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23//!     let (tx, mut rx) = watch::channel("hello");
24//!
25//!     tokio::spawn(async move {
26//!         while let Some(value) = rx.recv().await {
27//!             println!("received = {:?}", value);
28//!         }
29//!     });
30//!
31//!     tx.broadcast("world")?;
32//! # Ok(())
33//! # }
34//! ```
35//!
36//! # Closing
37//!
38//! [`Sender::closed`] allows the producer to detect when all [`Receiver`]
39//! handles have been dropped. This indicates that there is no further interest
40//! in the values being produced and work can be stopped.
41//!
42//! # Thread safety
43//!
44//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
45//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
46//! handles may be moved to separate threads and also used concurrently.
47//!
48//! [`Sender`]: crate::sync::watch::Sender
49//! [`Receiver`]: crate::sync::watch::Receiver
50//! [`Receiver::recv`]: crate::sync::watch::Receiver::recv
51//! [`channel`]: crate::sync::watch::channel
52//! [`Sender::closed`]: crate::sync::watch::Sender::closed
53
54use crate::future::poll_fn;
55use crate::sync::task::AtomicWaker;
56
57use fnv::FnvHashMap;
58use std::ops;
59use std::sync::atomic::AtomicUsize;
60use std::sync::atomic::Ordering::SeqCst;
61use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
62use std::task::Poll::{Pending, Ready};
63use std::task::{Context, Poll};
64
65/// Receives values from the associated [`Sender`](struct.Sender.html).
66///
67/// Instances are created by the [`channel`](fn.channel.html) function.
68#[derive(Debug)]
69pub struct Receiver<T> {
70    /// Pointer to the shared state
71    shared: Arc<Shared<T>>,
72
73    /// Pointer to the watcher's internal state
74    inner: Arc<WatchInner>,
75
76    /// Watcher ID.
77    id: u64,
78
79    /// Last observed version
80    ver: usize,
81}
82
83/// Sends values to the associated [`Receiver`](struct.Receiver.html).
84///
85/// Instances are created by the [`channel`](fn.channel.html) function.
86#[derive(Debug)]
87pub struct Sender<T> {
88    shared: Weak<Shared<T>>,
89}
90
91/// Returns a reference to the inner value
92///
93/// Outstanding borrows hold a read lock on the inner value. This means that
94/// long lived borrows could cause the produce half to block. It is recommended
95/// to keep the borrow as short lived as possible.
96#[derive(Debug)]
97pub struct Ref<'a, T> {
98    inner: RwLockReadGuard<'a, T>,
99}
100
101pub mod error {
102    //! Watch error types
103
104    use std::fmt;
105
106    /// Error produced when sending a value fails.
107    #[derive(Debug)]
108    pub struct SendError<T> {
109        pub(crate) inner: T,
110    }
111
112    // ===== impl SendError =====
113
114    impl<T: fmt::Debug> fmt::Display for SendError<T> {
115        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
116            write!(fmt, "channel closed")
117        }
118    }
119
120    impl<T: fmt::Debug> std::error::Error for SendError<T> {}
121}
122
123#[derive(Debug)]
124struct Shared<T> {
125    /// The most recent value
126    value: RwLock<T>,
127
128    /// The current version
129    ///
130    /// The lowest bit represents a "closed" state. The rest of the bits
131    /// represent the current version.
132    version: AtomicUsize,
133
134    /// All watchers
135    watchers: Mutex<Watchers>,
136
137    /// Task to notify when all watchers drop
138    cancel: AtomicWaker,
139}
140
141#[derive(Debug)]
142struct Watchers {
143    next_id: u64,
144    watchers: FnvHashMap<u64, Arc<WatchInner>>,
145}
146
147#[derive(Debug)]
148struct WatchInner {
149    waker: AtomicWaker,
150}
151
152const CLOSED: usize = 1;
153
154/// Create a new watch channel, returning the "send" and "receive" handles.
155///
156/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
157/// Only the last value sent is made available to the [`Receiver`] half. All
158/// intermediate values are dropped.
159///
160/// # Examples
161///
162/// ```
163/// use tokio::sync::watch;
164///
165/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
166///     let (tx, mut rx) = watch::channel("hello");
167///
168///     tokio::spawn(async move {
169///         while let Some(value) = rx.recv().await {
170///             println!("received = {:?}", value);
171///         }
172///     });
173///
174///     tx.broadcast("world")?;
175/// # Ok(())
176/// # }
177/// ```
178///
179/// [`Sender`]: struct.Sender.html
180/// [`Receiver`]: struct.Receiver.html
181pub fn channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>) {
182    const INIT_ID: u64 = 0;
183
184    let inner = Arc::new(WatchInner::new());
185
186    // Insert the watcher
187    let mut watchers = FnvHashMap::with_capacity_and_hasher(0, Default::default());
188    watchers.insert(INIT_ID, inner.clone());
189
190    let shared = Arc::new(Shared {
191        value: RwLock::new(init),
192        version: AtomicUsize::new(2),
193        watchers: Mutex::new(Watchers {
194            next_id: INIT_ID + 1,
195            watchers,
196        }),
197        cancel: AtomicWaker::new(),
198    });
199
200    let tx = Sender {
201        shared: Arc::downgrade(&shared),
202    };
203
204    let rx = Receiver {
205        shared,
206        inner,
207        id: INIT_ID,
208        ver: 0,
209    };
210
211    (tx, rx)
212}
213
214impl<T> Receiver<T> {
215    /// Returns a reference to the most recently sent value
216    ///
217    /// Outstanding borrows hold a read lock. This means that long lived borrows
218    /// could cause the send half to block. It is recommended to keep the borrow
219    /// as short lived as possible.
220    ///
221    /// # Examples
222    ///
223    /// ```
224    /// use tokio::sync::watch;
225    ///
226    /// let (_, rx) = watch::channel("hello");
227    /// assert_eq!(*rx.borrow(), "hello");
228    /// ```
229    pub fn borrow(&self) -> Ref<'_, T> {
230        let inner = self.shared.value.read().unwrap();
231        Ref { inner }
232    }
233
234    // TODO: document
235    #[doc(hidden)]
236    pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
237        // Make sure the task is up to date
238        self.inner.waker.register_by_ref(cx.waker());
239
240        let state = self.shared.version.load(SeqCst);
241        let version = state & !CLOSED;
242
243        if version != self.ver {
244            let inner = self.shared.value.read().unwrap();
245            self.ver = version;
246
247            return Ready(Some(Ref { inner }));
248        }
249
250        if CLOSED == state & CLOSED {
251            // The `Store` handle has been dropped.
252            return Ready(None);
253        }
254
255        Pending
256    }
257}
258
259impl<T: Clone> Receiver<T> {
260    /// Attempts to clone the latest value sent via the channel.
261    pub async fn recv(&mut self) -> Option<T> {
262        poll_fn(|cx| {
263            let v_ref = ready!(self.poll_recv_ref(cx));
264            Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
265        })
266        .await
267    }
268}
269
270#[cfg(feature = "stream")]
271impl<T: Clone> crate::stream::Stream for Receiver<T> {
272    type Item = T;
273
274    fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
275        let v_ref = ready!(self.poll_recv_ref(cx));
276
277        Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
278    }
279}
280
281impl<T> Clone for Receiver<T> {
282    fn clone(&self) -> Self {
283        let inner = Arc::new(WatchInner::new());
284        let shared = self.shared.clone();
285
286        let id = {
287            let mut watchers = shared.watchers.lock().unwrap();
288            let id = watchers.next_id;
289
290            watchers.next_id += 1;
291            watchers.watchers.insert(id, inner.clone());
292
293            id
294        };
295
296        let ver = self.ver;
297
298        Receiver {
299            shared,
300            inner,
301            id,
302            ver,
303        }
304    }
305}
306
307impl<T> Drop for Receiver<T> {
308    fn drop(&mut self) {
309        let mut watchers = self.shared.watchers.lock().unwrap();
310        watchers.watchers.remove(&self.id);
311    }
312}
313
314impl WatchInner {
315    fn new() -> Self {
316        WatchInner {
317            waker: AtomicWaker::new(),
318        }
319    }
320}
321
322impl<T> Sender<T> {
323    /// Broadcast a new value via the channel, notifying all receivers.
324    pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
325        let shared = match self.shared.upgrade() {
326            Some(shared) => shared,
327            // All `Watch` handles have been canceled
328            None => return Err(error::SendError { inner: value }),
329        };
330
331        // Replace the value
332        {
333            let mut lock = shared.value.write().unwrap();
334            *lock = value;
335        }
336
337        // Update the version. 2 is used so that the CLOSED bit is not set.
338        shared.version.fetch_add(2, SeqCst);
339
340        // Notify all watchers
341        notify_all(&*shared);
342
343        // Return the old value
344        Ok(())
345    }
346
347    /// Completes when all receivers have dropped.
348    ///
349    /// This allows the producer to get notified when interest in the produced
350    /// values is canceled and immediately stop doing work.
351    pub async fn closed(&mut self) {
352        poll_fn(|cx| self.poll_close(cx)).await
353    }
354
355    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
356        match self.shared.upgrade() {
357            Some(shared) => {
358                shared.cancel.register_by_ref(cx.waker());
359                Pending
360            }
361            None => Ready(()),
362        }
363    }
364}
365
366/// Notify all watchers of a change
367fn notify_all<T>(shared: &Shared<T>) {
368    let watchers = shared.watchers.lock().unwrap();
369
370    for watcher in watchers.watchers.values() {
371        // Notify the task
372        watcher.waker.wake();
373    }
374}
375
376impl<T> Drop for Sender<T> {
377    fn drop(&mut self) {
378        if let Some(shared) = self.shared.upgrade() {
379            shared.version.fetch_or(CLOSED, SeqCst);
380            notify_all(&*shared);
381        }
382    }
383}
384
385// ===== impl Ref =====
386
387impl<T> ops::Deref for Ref<'_, T> {
388    type Target = T;
389
390    fn deref(&self) -> &T {
391        self.inner.deref()
392    }
393}
394
395// ===== impl Shared =====
396
397impl<T> Drop for Shared<T> {
398    fn drop(&mut self) {
399        self.cancel.wake();
400    }
401}