async_std/sync/
condvar.rs

1use std::fmt;
2use std::pin::Pin;
3use std::time::Duration;
4
5use super::MutexGuard;
6use crate::future::{timeout, Future};
7use crate::sync::WakerSet;
8use crate::task::{Context, Poll};
9
10#[derive(Debug, PartialEq, Eq, Copy, Clone)]
11pub struct WaitTimeoutResult(bool);
12
13/// A type indicating whether a timed wait on a condition variable returned due to a time out or
14/// not
15impl WaitTimeoutResult {
16    /// Returns `true` if the wait was known to have timed out.
17    pub fn timed_out(self) -> bool {
18        self.0
19    }
20}
21
22/// A Condition Variable
23///
24/// This type is an async version of [`std::sync::Condvar`].
25///
26/// [`std::sync::Condvar`]: https://doc.rust-lang.org/std/sync/struct.Condvar.html
27///
28/// # Examples
29///
30/// ```
31/// # async_std::task::block_on(async {
32/// #
33/// use std::sync::Arc;
34///
35/// use async_std::sync::{Mutex, Condvar};
36/// use async_std::task;
37///
38/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
39/// let pair2 = pair.clone();
40///
41/// // Inside of our lock, spawn a new thread, and then wait for it to start.
42/// task::spawn(async move {
43///     let (lock, cvar) = &*pair2;
44///     let mut started = lock.lock().await;
45///     *started = true;
46///     // We notify the condvar that the value has changed.
47///     cvar.notify_one();
48/// });
49///
50/// // Wait for the thread to start up.
51/// let (lock, cvar) = &*pair;
52/// let mut started = lock.lock().await;
53/// while !*started {
54///     started = cvar.wait(started).await;
55/// }
56///
57/// # })
58/// ```
59pub struct Condvar {
60    wakers: WakerSet,
61}
62
63unsafe impl Send for Condvar {}
64unsafe impl Sync for Condvar {}
65
66impl Default for Condvar {
67    fn default() -> Self {
68        Condvar::new()
69    }
70}
71
72impl Condvar {
73    /// Creates a new condition variable
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use async_std::sync::Condvar;
79    ///
80    /// let cvar = Condvar::new();
81    /// ```
82    pub fn new() -> Self {
83        Condvar {
84            wakers: WakerSet::new(),
85        }
86    }
87
88    /// Blocks the current task until this condition variable receives a notification.
89    ///
90    /// Unlike the std equivalent, this does not check that a single mutex is used at runtime.
91    /// However, as a best practice avoid using with multiple mutexes.
92    ///
93    /// # Examples
94    ///
95    /// ```
96    /// # async_std::task::block_on(async {
97    /// use std::sync::Arc;
98    ///
99    /// use async_std::sync::{Mutex, Condvar};
100    /// use async_std::task;
101    ///
102    /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
103    /// let pair2 = pair.clone();
104    ///
105    /// task::spawn(async move {
106    ///     let (lock, cvar) = &*pair2;
107    ///     let mut started = lock.lock().await;
108    ///     *started = true;
109    ///     // We notify the condvar that the value has changed.
110    ///     cvar.notify_one();
111    /// });
112    ///
113    /// // Wait for the thread to start up.
114    /// let (lock, cvar) = &*pair;
115    /// let mut started = lock.lock().await;
116    /// while !*started {
117    ///     started = cvar.wait(started).await;
118    /// }
119    /// # })
120    /// ```
121    #[allow(clippy::needless_lifetimes)]
122    pub async fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
123        let mutex = MutexGuard::source(&guard);
124
125        self.await_notify(guard).await;
126
127        mutex.lock().await
128    }
129
130    fn await_notify<'a, T>(&self, guard: MutexGuard<'a, T>) -> AwaitNotify<'_, 'a, T> {
131        AwaitNotify {
132            cond: self,
133            guard: Some(guard),
134            key: None,
135        }
136    }
137
138    /// Blocks the current task until this condition variable receives a notification and the
139    /// required condition is met. Spurious wakeups are ignored and this function will only
140    /// return once the condition has been met.
141    ///
142    /// # Examples
143    ///
144    /// ```
145    /// # async_std::task::block_on(async {
146    /// #
147    /// use std::sync::Arc;
148    ///
149    /// use async_std::sync::{Mutex, Condvar};
150    /// use async_std::task;
151    ///
152    /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
153    /// let pair2 = pair.clone();
154    ///
155    /// task::spawn(async move {
156    ///     let (lock, cvar) = &*pair2;
157    ///     let mut started = lock.lock().await;
158    ///     *started = true;
159    ///     // We notify the condvar that the value has changed.
160    ///     cvar.notify_one();
161    /// });
162    ///
163    /// // Wait for the thread to start up.
164    /// let (lock, cvar) = &*pair;
165    /// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
166    /// let _guard = cvar.wait_until(lock.lock().await, |started| { *started }).await;
167    /// #
168    /// # })
169    /// ```
170    #[allow(clippy::needless_lifetimes)]
171    pub async fn wait_until<'a, T, F>(
172        &self,
173        mut guard: MutexGuard<'a, T>,
174        mut condition: F,
175    ) -> MutexGuard<'a, T>
176    where
177        F: FnMut(&mut T) -> bool,
178    {
179        while !condition(&mut *guard) {
180            guard = self.wait(guard).await;
181        }
182        guard
183    }
184
185    /// Waits on this condition variable for a notification, timing out after a specified duration.
186    ///
187    /// For these reasons `Condvar::wait_timeout_until` is recommended in most cases.
188    ///
189    /// # Examples
190    ///
191    /// ```
192    /// # async_std::task::block_on(async {
193    /// #
194    /// use std::sync::Arc;
195    /// use std::time::Duration;
196    ///
197    /// use async_std::sync::{Mutex, Condvar};
198    /// use async_std::task;
199    ///
200    /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
201    /// let pair2 = pair.clone();
202    ///
203    /// task::spawn(async move {
204    ///   let (lock, cvar) = &*pair2;
205    ///   let mut started = lock.lock().await;
206    ///   *started = true;
207    ///   // We notify the condvar that the value has changed.
208    ///   cvar.notify_one();
209    /// });
210    ///
211    /// // wait for the thread to start up
212    /// let (lock, cvar) = &*pair;
213    /// let mut started = lock.lock().await;
214    /// loop {
215    ///   let result = cvar.wait_timeout(started, Duration::from_millis(10)).await;
216    ///   started = result.0;
217    ///   if *started == true {
218    ///       // We received the notification and the value has been updated, we can leave.
219    ///       break
220    ///   }
221    /// }
222    /// #
223    /// # })
224    /// ```
225    #[allow(clippy::needless_lifetimes)]
226    pub async fn wait_timeout<'a, T>(
227        &self,
228        guard: MutexGuard<'a, T>,
229        dur: Duration,
230    ) -> (MutexGuard<'a, T>, WaitTimeoutResult) {
231        let mutex = MutexGuard::source(&guard);
232        match timeout(dur, self.wait(guard)).await {
233            Ok(guard) => (guard, WaitTimeoutResult(false)),
234            Err(_) => (mutex.lock().await, WaitTimeoutResult(true)),
235        }
236    }
237
238    /// Waits on this condition variable for a notification, timing out after a specified duration.
239    /// Spurious wakes will not cause this function to return.
240    ///
241    /// # Examples
242    /// ```
243    /// # async_std::task::block_on(async {
244    /// use std::sync::Arc;
245    /// use std::time::Duration;
246    ///
247    /// use async_std::sync::{Mutex, Condvar};
248    /// use async_std::task;
249    ///
250    /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
251    /// let pair2 = pair.clone();
252    ///
253    /// task::spawn(async move {
254    ///     let (lock, cvar) = &*pair2;
255    ///     let mut started = lock.lock().await;
256    ///     *started = true;
257    ///     // We notify the condvar that the value has changed.
258    ///     cvar.notify_one();
259    /// });
260    ///
261    /// // wait for the thread to start up
262    /// let (lock, cvar) = &*pair;
263    /// let result = cvar.wait_timeout_until(
264    ///     lock.lock().await,
265    ///     Duration::from_millis(100),
266    ///     |&mut started| started,
267    /// ).await;
268    /// if result.1.timed_out() {
269    ///     // timed-out without the condition ever evaluating to true.
270    /// }
271    /// // access the locked mutex via result.0
272    /// # });
273    /// ```
274    #[allow(clippy::needless_lifetimes)]
275    pub async fn wait_timeout_until<'a, T, F>(
276        &self,
277        guard: MutexGuard<'a, T>,
278        dur: Duration,
279        condition: F,
280    ) -> (MutexGuard<'a, T>, WaitTimeoutResult)
281    where
282        F: FnMut(&mut T) -> bool,
283    {
284        let mutex = MutexGuard::source(&guard);
285        match timeout(dur, self.wait_until(guard, condition)).await {
286            Ok(guard) => (guard, WaitTimeoutResult(false)),
287            Err(_) => (mutex.lock().await, WaitTimeoutResult(true)),
288        }
289    }
290
291    /// Wakes up one blocked task on this condvar.
292    ///
293    /// # Examples
294    ///
295    /// ```
296    /// # fn main() { async_std::task::block_on(async {
297    /// use std::sync::Arc;
298    ///
299    /// use async_std::sync::{Mutex, Condvar};
300    /// use async_std::task;
301    ///
302    /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
303    /// let pair2 = pair.clone();
304    ///
305    /// task::spawn(async move {
306    ///     let (lock, cvar) = &*pair2;
307    ///     let mut started = lock.lock().await;
308    ///     *started = true;
309    ///     // We notify the condvar that the value has changed.
310    ///     cvar.notify_one();
311    /// });
312    ///
313    /// // Wait for the thread to start up.
314    /// let (lock, cvar) = &*pair;
315    /// let mut started = lock.lock().await;
316    /// while !*started {
317    ///     started = cvar.wait(started).await;
318    /// }
319    /// # }) }
320    /// ```
321    pub fn notify_one(&self) {
322        self.wakers.notify_one();
323    }
324
325    /// Wakes up all blocked tasks on this condvar.
326    ///
327    /// # Examples
328    /// ```
329    /// # fn main() { async_std::task::block_on(async {
330    /// #
331    /// use std::sync::Arc;
332    ///
333    /// use async_std::sync::{Mutex, Condvar};
334    /// use async_std::task;
335    ///
336    /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
337    /// let pair2 = pair.clone();
338    ///
339    /// task::spawn(async move {
340    ///     let (lock, cvar) = &*pair2;
341    ///     let mut started = lock.lock().await;
342    ///     *started = true;
343    ///     // We notify the condvar that the value has changed.
344    ///     cvar.notify_all();
345    /// });
346    ///
347    /// // Wait for the thread to start up.
348    /// let (lock, cvar) = &*pair;
349    /// let mut started = lock.lock().await;
350    /// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
351    /// while !*started {
352    ///     started = cvar.wait(started).await;
353    /// }
354    /// #
355    /// # }) }
356    /// ```
357    pub fn notify_all(&self) {
358        self.wakers.notify_all();
359    }
360}
361
362impl fmt::Debug for Condvar {
363    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
364        f.pad("Condvar { .. }")
365    }
366}
367
368/// A future that waits for another task to notify the condition variable.
369///
370/// This is an internal future that `wait` and `wait_until` await on.
371struct AwaitNotify<'a, 'b, T> {
372    /// The condition variable that we are waiting on
373    cond: &'a Condvar,
374    /// The lock used with `cond`.
375    /// This will be released the first time the future is polled,
376    /// after registering the context to be notified.
377    guard: Option<MutexGuard<'b, T>>,
378    /// A key into the conditions variable's `WakerSet`.
379    /// This is set to the index of the `Waker` for the context each time
380    /// the future is polled and not completed.
381    key: Option<usize>,
382}
383
384impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> {
385    type Output = ();
386
387    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
388        match self.guard.take() {
389            Some(_) => {
390                self.key = Some(self.cond.wakers.insert(cx));
391                // the guard is dropped when we return, which frees the lock
392                Poll::Pending
393            }
394            None => {
395                if let Some(key) = self.key {
396                    if self.cond.wakers.remove_if_notified(key, cx) {
397                        self.key = None;
398                        Poll::Ready(())
399                    } else {
400                        Poll::Pending
401                    }
402                } else {
403                    // This should only happen if it is polled twice after receiving a notification
404                    Poll::Ready(())
405                }
406            }
407        }
408    }
409}
410
411impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> {
412    fn drop(&mut self) {
413        if let Some(key) = self.key {
414            self.cond.wakers.cancel(key);
415        }
416    }
417}