async_std/sync/condvar.rs
1use std::fmt;
2use std::pin::Pin;
3use std::time::Duration;
4
5use super::MutexGuard;
6use crate::future::{timeout, Future};
7use crate::sync::WakerSet;
8use crate::task::{Context, Poll};
9
10#[derive(Debug, PartialEq, Eq, Copy, Clone)]
11pub struct WaitTimeoutResult(bool);
12
13/// A type indicating whether a timed wait on a condition variable returned due to a time out or
14/// not
15impl WaitTimeoutResult {
16 /// Returns `true` if the wait was known to have timed out.
17 pub fn timed_out(self) -> bool {
18 self.0
19 }
20}
21
22/// A Condition Variable
23///
24/// This type is an async version of [`std::sync::Condvar`].
25///
26/// [`std::sync::Condvar`]: https://doc.rust-lang.org/std/sync/struct.Condvar.html
27///
28/// # Examples
29///
30/// ```
31/// # async_std::task::block_on(async {
32/// #
33/// use std::sync::Arc;
34///
35/// use async_std::sync::{Mutex, Condvar};
36/// use async_std::task;
37///
38/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
39/// let pair2 = pair.clone();
40///
41/// // Inside of our lock, spawn a new thread, and then wait for it to start.
42/// task::spawn(async move {
43/// let (lock, cvar) = &*pair2;
44/// let mut started = lock.lock().await;
45/// *started = true;
46/// // We notify the condvar that the value has changed.
47/// cvar.notify_one();
48/// });
49///
50/// // Wait for the thread to start up.
51/// let (lock, cvar) = &*pair;
52/// let mut started = lock.lock().await;
53/// while !*started {
54/// started = cvar.wait(started).await;
55/// }
56///
57/// # })
58/// ```
59pub struct Condvar {
60 wakers: WakerSet,
61}
62
63unsafe impl Send for Condvar {}
64unsafe impl Sync for Condvar {}
65
66impl Default for Condvar {
67 fn default() -> Self {
68 Condvar::new()
69 }
70}
71
72impl Condvar {
73 /// Creates a new condition variable
74 ///
75 /// # Examples
76 ///
77 /// ```
78 /// use async_std::sync::Condvar;
79 ///
80 /// let cvar = Condvar::new();
81 /// ```
82 pub fn new() -> Self {
83 Condvar {
84 wakers: WakerSet::new(),
85 }
86 }
87
88 /// Blocks the current task until this condition variable receives a notification.
89 ///
90 /// Unlike the std equivalent, this does not check that a single mutex is used at runtime.
91 /// However, as a best practice avoid using with multiple mutexes.
92 ///
93 /// # Examples
94 ///
95 /// ```
96 /// # async_std::task::block_on(async {
97 /// use std::sync::Arc;
98 ///
99 /// use async_std::sync::{Mutex, Condvar};
100 /// use async_std::task;
101 ///
102 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
103 /// let pair2 = pair.clone();
104 ///
105 /// task::spawn(async move {
106 /// let (lock, cvar) = &*pair2;
107 /// let mut started = lock.lock().await;
108 /// *started = true;
109 /// // We notify the condvar that the value has changed.
110 /// cvar.notify_one();
111 /// });
112 ///
113 /// // Wait for the thread to start up.
114 /// let (lock, cvar) = &*pair;
115 /// let mut started = lock.lock().await;
116 /// while !*started {
117 /// started = cvar.wait(started).await;
118 /// }
119 /// # })
120 /// ```
121 #[allow(clippy::needless_lifetimes)]
122 pub async fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
123 let mutex = MutexGuard::source(&guard);
124
125 self.await_notify(guard).await;
126
127 mutex.lock().await
128 }
129
130 fn await_notify<'a, T>(&self, guard: MutexGuard<'a, T>) -> AwaitNotify<'_, 'a, T> {
131 AwaitNotify {
132 cond: self,
133 guard: Some(guard),
134 key: None,
135 }
136 }
137
138 /// Blocks the current task until this condition variable receives a notification and the
139 /// required condition is met. Spurious wakeups are ignored and this function will only
140 /// return once the condition has been met.
141 ///
142 /// # Examples
143 ///
144 /// ```
145 /// # async_std::task::block_on(async {
146 /// #
147 /// use std::sync::Arc;
148 ///
149 /// use async_std::sync::{Mutex, Condvar};
150 /// use async_std::task;
151 ///
152 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
153 /// let pair2 = pair.clone();
154 ///
155 /// task::spawn(async move {
156 /// let (lock, cvar) = &*pair2;
157 /// let mut started = lock.lock().await;
158 /// *started = true;
159 /// // We notify the condvar that the value has changed.
160 /// cvar.notify_one();
161 /// });
162 ///
163 /// // Wait for the thread to start up.
164 /// let (lock, cvar) = &*pair;
165 /// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
166 /// let _guard = cvar.wait_until(lock.lock().await, |started| { *started }).await;
167 /// #
168 /// # })
169 /// ```
170 #[allow(clippy::needless_lifetimes)]
171 pub async fn wait_until<'a, T, F>(
172 &self,
173 mut guard: MutexGuard<'a, T>,
174 mut condition: F,
175 ) -> MutexGuard<'a, T>
176 where
177 F: FnMut(&mut T) -> bool,
178 {
179 while !condition(&mut *guard) {
180 guard = self.wait(guard).await;
181 }
182 guard
183 }
184
185 /// Waits on this condition variable for a notification, timing out after a specified duration.
186 ///
187 /// For these reasons `Condvar::wait_timeout_until` is recommended in most cases.
188 ///
189 /// # Examples
190 ///
191 /// ```
192 /// # async_std::task::block_on(async {
193 /// #
194 /// use std::sync::Arc;
195 /// use std::time::Duration;
196 ///
197 /// use async_std::sync::{Mutex, Condvar};
198 /// use async_std::task;
199 ///
200 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
201 /// let pair2 = pair.clone();
202 ///
203 /// task::spawn(async move {
204 /// let (lock, cvar) = &*pair2;
205 /// let mut started = lock.lock().await;
206 /// *started = true;
207 /// // We notify the condvar that the value has changed.
208 /// cvar.notify_one();
209 /// });
210 ///
211 /// // wait for the thread to start up
212 /// let (lock, cvar) = &*pair;
213 /// let mut started = lock.lock().await;
214 /// loop {
215 /// let result = cvar.wait_timeout(started, Duration::from_millis(10)).await;
216 /// started = result.0;
217 /// if *started == true {
218 /// // We received the notification and the value has been updated, we can leave.
219 /// break
220 /// }
221 /// }
222 /// #
223 /// # })
224 /// ```
225 #[allow(clippy::needless_lifetimes)]
226 pub async fn wait_timeout<'a, T>(
227 &self,
228 guard: MutexGuard<'a, T>,
229 dur: Duration,
230 ) -> (MutexGuard<'a, T>, WaitTimeoutResult) {
231 let mutex = MutexGuard::source(&guard);
232 match timeout(dur, self.wait(guard)).await {
233 Ok(guard) => (guard, WaitTimeoutResult(false)),
234 Err(_) => (mutex.lock().await, WaitTimeoutResult(true)),
235 }
236 }
237
238 /// Waits on this condition variable for a notification, timing out after a specified duration.
239 /// Spurious wakes will not cause this function to return.
240 ///
241 /// # Examples
242 /// ```
243 /// # async_std::task::block_on(async {
244 /// use std::sync::Arc;
245 /// use std::time::Duration;
246 ///
247 /// use async_std::sync::{Mutex, Condvar};
248 /// use async_std::task;
249 ///
250 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
251 /// let pair2 = pair.clone();
252 ///
253 /// task::spawn(async move {
254 /// let (lock, cvar) = &*pair2;
255 /// let mut started = lock.lock().await;
256 /// *started = true;
257 /// // We notify the condvar that the value has changed.
258 /// cvar.notify_one();
259 /// });
260 ///
261 /// // wait for the thread to start up
262 /// let (lock, cvar) = &*pair;
263 /// let result = cvar.wait_timeout_until(
264 /// lock.lock().await,
265 /// Duration::from_millis(100),
266 /// |&mut started| started,
267 /// ).await;
268 /// if result.1.timed_out() {
269 /// // timed-out without the condition ever evaluating to true.
270 /// }
271 /// // access the locked mutex via result.0
272 /// # });
273 /// ```
274 #[allow(clippy::needless_lifetimes)]
275 pub async fn wait_timeout_until<'a, T, F>(
276 &self,
277 guard: MutexGuard<'a, T>,
278 dur: Duration,
279 condition: F,
280 ) -> (MutexGuard<'a, T>, WaitTimeoutResult)
281 where
282 F: FnMut(&mut T) -> bool,
283 {
284 let mutex = MutexGuard::source(&guard);
285 match timeout(dur, self.wait_until(guard, condition)).await {
286 Ok(guard) => (guard, WaitTimeoutResult(false)),
287 Err(_) => (mutex.lock().await, WaitTimeoutResult(true)),
288 }
289 }
290
291 /// Wakes up one blocked task on this condvar.
292 ///
293 /// # Examples
294 ///
295 /// ```
296 /// # fn main() { async_std::task::block_on(async {
297 /// use std::sync::Arc;
298 ///
299 /// use async_std::sync::{Mutex, Condvar};
300 /// use async_std::task;
301 ///
302 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
303 /// let pair2 = pair.clone();
304 ///
305 /// task::spawn(async move {
306 /// let (lock, cvar) = &*pair2;
307 /// let mut started = lock.lock().await;
308 /// *started = true;
309 /// // We notify the condvar that the value has changed.
310 /// cvar.notify_one();
311 /// });
312 ///
313 /// // Wait for the thread to start up.
314 /// let (lock, cvar) = &*pair;
315 /// let mut started = lock.lock().await;
316 /// while !*started {
317 /// started = cvar.wait(started).await;
318 /// }
319 /// # }) }
320 /// ```
321 pub fn notify_one(&self) {
322 self.wakers.notify_one();
323 }
324
325 /// Wakes up all blocked tasks on this condvar.
326 ///
327 /// # Examples
328 /// ```
329 /// # fn main() { async_std::task::block_on(async {
330 /// #
331 /// use std::sync::Arc;
332 ///
333 /// use async_std::sync::{Mutex, Condvar};
334 /// use async_std::task;
335 ///
336 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
337 /// let pair2 = pair.clone();
338 ///
339 /// task::spawn(async move {
340 /// let (lock, cvar) = &*pair2;
341 /// let mut started = lock.lock().await;
342 /// *started = true;
343 /// // We notify the condvar that the value has changed.
344 /// cvar.notify_all();
345 /// });
346 ///
347 /// // Wait for the thread to start up.
348 /// let (lock, cvar) = &*pair;
349 /// let mut started = lock.lock().await;
350 /// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
351 /// while !*started {
352 /// started = cvar.wait(started).await;
353 /// }
354 /// #
355 /// # }) }
356 /// ```
357 pub fn notify_all(&self) {
358 self.wakers.notify_all();
359 }
360}
361
362impl fmt::Debug for Condvar {
363 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
364 f.pad("Condvar { .. }")
365 }
366}
367
368/// A future that waits for another task to notify the condition variable.
369///
370/// This is an internal future that `wait` and `wait_until` await on.
371struct AwaitNotify<'a, 'b, T> {
372 /// The condition variable that we are waiting on
373 cond: &'a Condvar,
374 /// The lock used with `cond`.
375 /// This will be released the first time the future is polled,
376 /// after registering the context to be notified.
377 guard: Option<MutexGuard<'b, T>>,
378 /// A key into the conditions variable's `WakerSet`.
379 /// This is set to the index of the `Waker` for the context each time
380 /// the future is polled and not completed.
381 key: Option<usize>,
382}
383
384impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> {
385 type Output = ();
386
387 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
388 match self.guard.take() {
389 Some(_) => {
390 self.key = Some(self.cond.wakers.insert(cx));
391 // the guard is dropped when we return, which frees the lock
392 Poll::Pending
393 }
394 None => {
395 if let Some(key) = self.key {
396 if self.cond.wakers.remove_if_notified(key, cx) {
397 self.key = None;
398 Poll::Ready(())
399 } else {
400 Poll::Pending
401 }
402 } else {
403 // This should only happen if it is polled twice after receiving a notification
404 Poll::Ready(())
405 }
406 }
407 }
408 }
409}
410
411impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> {
412 fn drop(&mut self) {
413 if let Some(key) = self.key {
414 self.cond.wakers.cancel(key);
415 }
416 }
417}