tokio_sync/
watch.rs

1//! A single-producer, multi-consumer channel that only retains the *last* sent
2//! value.
3//!
4//! This channel is useful for watching for changes to a value from multiple
5//! points in the code base, for example, changes to configuration values.
6//!
7//! # Usage
8//!
9//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
10//! and sender halves of the channel. The channel is created with an initial
11//! value. `Receiver::poll` will always be ready upon creation and will yield
12//! either this initial value or the latest value that has been sent by
13//! `Sender`.
14//!
15//! Calls to [`Receiver::poll`] and [`Receiver::poll_ref`] will always yield
16//! the latest value.
17//!
18//! # Examples
19//!
20//! ```
21//! # extern crate futures;
22//! extern crate tokio;
23//!
24//! use tokio::prelude::*;
25//! use tokio::sync::watch;
26//!
27//! # tokio::run(futures::future::lazy(|| {
28//! let (mut tx, rx) = watch::channel("hello");
29//!
30//! tokio::spawn(rx.for_each(|value| {
31//!     println!("received = {:?}", value);
32//!     Ok(())
33//! }).map_err(|_| ()));
34//!
35//! tx.broadcast("world").unwrap();
36//! # Ok(())
37//! # }));
38//! ```
39//!
40//! # Closing
41//!
42//! [`Sender::poll_close`] allows the producer to detect when all [`Sender`]
43//! handles have been dropped. This indicates that there is no further interest
44//! in the values being produced and work can be stopped.
45//!
46//! # Thread safety
47//!
48//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
49//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
50//! handles may be moved to separate threads and also used concurrently.
51//!
52//! [`Sender`]: struct.Sender.html
53//! [`Receiver`]: struct.Receiver.html
54//! [`channel`]: fn.channel.html
55//! [`Sender::poll_close`]: struct.Sender.html#method.poll_close
56//! [`Receiver::poll`]: struct.Receiver.html#method.poll
57//! [`Receiver::poll_ref`]: struct.Receiver.html#method.poll_ref
58
59use fnv::FnvHashMap;
60use futures::task::AtomicTask;
61use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
62
63use std::ops;
64use std::sync::atomic::AtomicUsize;
65use std::sync::atomic::Ordering::SeqCst;
66use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
67
68/// Receives values from the associated `Sender`.
69///
70/// Instances are created by the [`channel`](fn.channel.html) function.
71#[derive(Debug)]
72pub struct Receiver<T> {
73    /// Pointer to the shared state
74    shared: Arc<Shared<T>>,
75
76    /// Pointer to the watcher's internal state
77    inner: Arc<WatchInner>,
78
79    /// Watcher ID.
80    id: u64,
81
82    /// Last observed version
83    ver: usize,
84}
85
86/// Sends values to the associated `Receiver`.
87///
88/// Instances are created by the [`channel`](fn.channel.html) function.
89#[derive(Debug)]
90pub struct Sender<T> {
91    shared: Weak<Shared<T>>,
92}
93
94/// Returns a reference to the inner value
95///
96/// Outstanding borrows hold a read lock on the inner value. This means that
97/// long lived borrows could cause the produce half to block. It is recommended
98/// to keep the borrow as short lived as possible.
99#[derive(Debug)]
100pub struct Ref<'a, T: 'a> {
101    inner: RwLockReadGuard<'a, T>,
102}
103
104pub mod error {
105    //! Watch error types
106
107    use std::fmt;
108
109    /// Error produced when receiving a value fails.
110    #[derive(Debug)]
111    pub struct RecvError {
112        pub(crate) _p: (),
113    }
114
115    /// Error produced when sending a value fails.
116    #[derive(Debug)]
117    pub struct SendError<T> {
118        pub(crate) inner: T,
119    }
120
121    // ===== impl RecvError =====
122
123    impl fmt::Display for RecvError {
124        fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
125            use std::error::Error;
126            write!(fmt, "{}", self.description())
127        }
128    }
129
130    impl ::std::error::Error for RecvError {
131        fn description(&self) -> &str {
132            "channel closed"
133        }
134    }
135
136    // ===== impl SendError =====
137
138    impl<T: fmt::Debug> fmt::Display for SendError<T> {
139        fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
140            use std::error::Error;
141            write!(fmt, "{}", self.description())
142        }
143    }
144
145    impl<T: fmt::Debug> ::std::error::Error for SendError<T> {
146        fn description(&self) -> &str {
147            "channel closed"
148        }
149    }
150}
151
152#[derive(Debug)]
153struct Shared<T> {
154    /// The most recent value
155    value: RwLock<T>,
156
157    /// The current version
158    ///
159    /// The lowest bit represents a "closed" state. The rest of the bits
160    /// represent the current version.
161    version: AtomicUsize,
162
163    /// All watchers
164    watchers: Mutex<Watchers>,
165
166    /// Task to notify when all watchers drop
167    cancel: AtomicTask,
168}
169
170#[derive(Debug)]
171struct Watchers {
172    next_id: u64,
173    watchers: FnvHashMap<u64, Arc<WatchInner>>,
174}
175
176#[derive(Debug)]
177struct WatchInner {
178    task: AtomicTask,
179}
180
181const CLOSED: usize = 1;
182
183/// Create a new watch channel, returning the "send" and "receive" handles.
184///
185/// All values sent by `Sender` will become visible to the `Receiver` handles.
186/// Only the last value sent is made available to the `Receiver` half. All
187/// intermediate values are dropped.
188///
189/// # Examples
190///
191/// ```
192/// # extern crate futures;
193/// extern crate tokio;
194///
195/// use tokio::prelude::*;
196/// use tokio::sync::watch;
197///
198/// # tokio::run(futures::future::lazy(|| {
199/// let (mut tx, rx) = watch::channel("hello");
200///
201/// tokio::spawn(rx.for_each(|value| {
202///     println!("received = {:?}", value);
203///     Ok(())
204/// }).map_err(|_| ()));
205///
206/// tx.broadcast("world").unwrap();
207/// # Ok(())
208/// # }));
209/// ```
210pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
211    const INIT_ID: u64 = 0;
212
213    let inner = Arc::new(WatchInner::new());
214
215    // Insert the watcher
216    let mut watchers = FnvHashMap::with_capacity_and_hasher(0, Default::default());
217    watchers.insert(INIT_ID, inner.clone());
218
219    let shared = Arc::new(Shared {
220        value: RwLock::new(init),
221        version: AtomicUsize::new(2),
222        watchers: Mutex::new(Watchers {
223            next_id: INIT_ID + 1,
224            watchers,
225        }),
226        cancel: AtomicTask::new(),
227    });
228
229    let tx = Sender {
230        shared: Arc::downgrade(&shared),
231    };
232
233    let rx = Receiver {
234        shared,
235        inner,
236        id: INIT_ID,
237        ver: 0,
238    };
239
240    (tx, rx)
241}
242
243impl<T> Receiver<T> {
244    /// Returns a reference to the most recently sent value
245    ///
246    /// Outstanding borrows hold a read lock. This means that long lived borrows
247    /// could cause the send half to block. It is recommended to keep the borrow
248    /// as short lived as possible.
249    ///
250    /// # Examples
251    ///
252    /// ```
253    /// # extern crate tokio;
254    /// # use tokio::sync::watch;
255    /// let (_, rx) = watch::channel("hello");
256    /// assert_eq!(*rx.get_ref(), "hello");
257    /// ```
258    pub fn get_ref(&self) -> Ref<T> {
259        let inner = self.shared.value.read().unwrap();
260        Ref { inner }
261    }
262
263    /// Attempts to receive the latest value sent via the channel.
264    ///
265    /// If a new, unobserved, value has been sent, a reference to it is
266    /// returned. If no new value has been sent, then `NotReady` is returned and
267    /// the current task is notified once a new value is sent.
268    ///
269    /// Only the **most recent** value is returned. If the receiver is falling
270    /// behind the sender, intermediate values are dropped.
271    pub fn poll_ref(&mut self) -> Poll<Option<Ref<T>>, error::RecvError> {
272        // Make sure the task is up to date
273        self.inner.task.register();
274
275        let state = self.shared.version.load(SeqCst);
276        let version = state & !CLOSED;
277
278        if version != self.ver {
279            // Track the latest version
280            self.ver = version;
281
282            let inner = self.shared.value.read().unwrap();
283
284            return Ok(Some(Ref { inner }).into());
285        }
286
287        if CLOSED == state & CLOSED {
288            // The `Store` handle has been dropped.
289            return Ok(None.into());
290        }
291
292        Ok(Async::NotReady)
293    }
294}
295
296impl<T: Clone> Stream for Receiver<T> {
297    type Item = T;
298    type Error = error::RecvError;
299
300    fn poll(&mut self) -> Poll<Option<T>, error::RecvError> {
301        let item = try_ready!(self.poll_ref());
302        Ok(Async::Ready(item.map(|v_ref| v_ref.clone())))
303    }
304}
305
306impl<T> Clone for Receiver<T> {
307    fn clone(&self) -> Self {
308        let inner = Arc::new(WatchInner::new());
309        let shared = self.shared.clone();
310
311        let id = {
312            let mut watchers = shared.watchers.lock().unwrap();
313            let id = watchers.next_id;
314
315            watchers.next_id += 1;
316            watchers.watchers.insert(id, inner.clone());
317
318            id
319        };
320
321        let ver = self.ver;
322
323        Receiver {
324            shared: shared,
325            inner,
326            id,
327            ver,
328        }
329    }
330}
331
332impl<T> Drop for Receiver<T> {
333    fn drop(&mut self) {
334        let mut watchers = self.shared.watchers.lock().unwrap();
335        watchers.watchers.remove(&self.id);
336    }
337}
338
339impl WatchInner {
340    fn new() -> Self {
341        WatchInner {
342            task: AtomicTask::new(),
343        }
344    }
345}
346
347impl<T> Sender<T> {
348    /// Broadcast a new value via the channel, notifying all receivers.
349    pub fn broadcast(&mut self, value: T) -> Result<(), error::SendError<T>> {
350        let shared = match self.shared.upgrade() {
351            Some(shared) => shared,
352            // All `Watch` handles have been canceled
353            None => return Err(error::SendError { inner: value }),
354        };
355
356        // Replace the value
357        {
358            let mut lock = shared.value.write().unwrap();
359            *lock = value;
360        }
361
362        // Update the version. 2 is used so that the CLOSED bit is not set.
363        shared.version.fetch_add(2, SeqCst);
364
365        // Notify all watchers
366        notify_all(&*shared);
367
368        // Return the old value
369        Ok(())
370    }
371
372    /// Returns `Ready` when all receivers have dropped.
373    ///
374    /// This allows the producer to get notified when interest in the produced
375    /// values is canceled and immediately stop doing work.
376    pub fn poll_close(&mut self) -> Poll<(), ()> {
377        match self.shared.upgrade() {
378            Some(shared) => {
379                shared.cancel.register();
380                Ok(Async::NotReady)
381            }
382            None => Ok(Async::Ready(())),
383        }
384    }
385}
386
387impl<T> Sink for Sender<T> {
388    type SinkItem = T;
389    type SinkError = error::SendError<T>;
390
391    fn start_send(&mut self, item: T) -> StartSend<T, error::SendError<T>> {
392        let _ = self.broadcast(item)?;
393        Ok(AsyncSink::Ready)
394    }
395
396    fn poll_complete(&mut self) -> Poll<(), error::SendError<T>> {
397        Ok(().into())
398    }
399}
400
401/// Notify all watchers of a change
402fn notify_all<T>(shared: &Shared<T>) {
403    let watchers = shared.watchers.lock().unwrap();
404
405    for watcher in watchers.watchers.values() {
406        // Notify the task
407        watcher.task.notify();
408    }
409}
410
411impl<T> Drop for Sender<T> {
412    fn drop(&mut self) {
413        if let Some(shared) = self.shared.upgrade() {
414            shared.version.fetch_or(CLOSED, SeqCst);
415            notify_all(&*shared);
416        }
417    }
418}
419
420// ===== impl Ref =====
421
422impl<'a, T: 'a> ops::Deref for Ref<'a, T> {
423    type Target = T;
424
425    fn deref(&self) -> &T {
426        self.inner.deref()
427    }
428}
429
430// ===== impl Shared =====
431
432impl<T> Drop for Shared<T> {
433    fn drop(&mut self) {
434        self.cancel.notify();
435    }
436}