tokio_sync/
lock.rs

1//! An asynchronous `Mutex`-like type.
2//!
3//! This module provides [`Lock`], a type that acts similarly to an asynchronous `Mutex`, with one
4//! major difference: the [`LockGuard`] returned by `poll_lock` is not tied to the lifetime of the
5//! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then
6//! release it at some later point in time.
7//!
8//! This allows you to do something along the lines of:
9//!
10//! ```rust,no_run
11//! # #[macro_use]
12//! # extern crate futures;
13//! # extern crate tokio;
14//! # use futures::{future, Poll, Async, Future, Stream};
15//! use tokio::sync::lock::{Lock, LockGuard};
16//! struct MyType<S> {
17//!     lock: Lock<S>,
18//! }
19//!
20//! impl<S> Future for MyType<S>
21//!   where S: Stream<Item = u32> + Send + 'static
22//! {
23//!     type Item = ();
24//!     type Error = ();
25//!
26//!     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
27//!         match self.lock.poll_lock() {
28//!             Async::Ready(mut guard) => {
29//!                 tokio::spawn(future::poll_fn(move || {
30//!                     let item = try_ready!(guard.poll().map_err(|_| ()));
31//!                     println!("item = {:?}", item);
32//!                     Ok(().into())
33//!                 }));
34//!                 Ok(().into())
35//!             },
36//!             Async::NotReady => Ok(Async::NotReady)
37//!         }
38//!     }
39//! }
40//! # fn main() {}
41//! ```
42//!
43//!   [`Lock`]: struct.Lock.html
44//!   [`LockGuard`]: struct.LockGuard.html
45
46use futures::Async;
47use semaphore;
48use std::cell::UnsafeCell;
49use std::fmt;
50use std::ops::{Deref, DerefMut};
51use std::sync::Arc;
52
53/// An asynchronous mutual exclusion primitive useful for protecting shared data
54///
55/// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data
56/// can only be accessed through the RAII guards returned from `poll_lock`, which guarantees that
57/// the data is only ever accessed when the mutex is locked.
58#[derive(Debug)]
59pub struct Lock<T> {
60    inner: Arc<State<T>>,
61    permit: semaphore::Permit,
62}
63
64/// A handle to a held `Lock`.
65///
66/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard
67/// internally keeps a reference-couned pointer to the original `Lock`, so even if the lock goes
68/// away, the guard remains valid.
69///
70/// The lock is automatically released whenever the guard is dropped, at which point `poll_lock`
71/// will succeed yet again.
72#[derive(Debug)]
73pub struct LockGuard<T>(Lock<T>);
74
75// As long as T: Send, it's fine to send and share Lock<T> between threads.
76// If T was not Send, sending and sharing a Lock<T> would be bad, since you can access T through
77// Lock<T>.
78unsafe impl<T> Send for Lock<T> where T: Send {}
79unsafe impl<T> Sync for Lock<T> where T: Send {}
80unsafe impl<T> Sync for LockGuard<T> where T: Send + Sync {}
81
82#[derive(Debug)]
83struct State<T> {
84    c: UnsafeCell<T>,
85    s: semaphore::Semaphore,
86}
87
88#[test]
89fn bounds() {
90    fn check<T: Send>() {}
91    check::<LockGuard<u32>>();
92}
93
94impl<T> Lock<T> {
95    /// Creates a new lock in an unlocked state ready for use.
96    pub fn new(t: T) -> Self {
97        Self {
98            inner: Arc::new(State {
99                c: UnsafeCell::new(t),
100                s: semaphore::Semaphore::new(1),
101            }),
102            permit: semaphore::Permit::new(),
103        }
104    }
105
106    /// Try to acquire the lock.
107    ///
108    /// If the lock is already held, the current task is notified when it is released.
109    pub fn poll_lock(&mut self) -> Async<LockGuard<T>> {
110        if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| {
111            // The semaphore was closed. but, we never explicitly close it, and we have a
112            // handle to it through the Arc, which means that this can never happen.
113            unreachable!()
114        }) {
115            return Async::NotReady;
116        }
117
118        // We want to move the acquired permit into the guard,
119        // and leave an unacquired one in self.
120        let acquired = Self {
121            inner: self.inner.clone(),
122            permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()),
123        };
124        Async::Ready(LockGuard(acquired))
125    }
126}
127
128impl<T> Drop for LockGuard<T> {
129    fn drop(&mut self) {
130        if self.0.permit.is_acquired() {
131            self.0.permit.release(&self.0.inner.s);
132        } else if ::std::thread::panicking() {
133            // A guard _should_ always hold its permit, but if the thread is already panicking,
134            // we don't want to generate a panic-while-panicing, since that's just unhelpful!
135        } else {
136            unreachable!("Permit not held when LockGuard was dropped")
137        }
138    }
139}
140
141impl<T> From<T> for Lock<T> {
142    fn from(s: T) -> Self {
143        Self::new(s)
144    }
145}
146
147impl<T> Clone for Lock<T> {
148    fn clone(&self) -> Self {
149        Self {
150            inner: self.inner.clone(),
151            permit: semaphore::Permit::new(),
152        }
153    }
154}
155
156impl<T> Default for Lock<T>
157where
158    T: Default,
159{
160    fn default() -> Self {
161        Self::new(T::default())
162    }
163}
164
165impl<T> Deref for LockGuard<T> {
166    type Target = T;
167    fn deref(&self) -> &Self::Target {
168        assert!(self.0.permit.is_acquired());
169        unsafe { &*self.0.inner.c.get() }
170    }
171}
172
173impl<T> DerefMut for LockGuard<T> {
174    fn deref_mut(&mut self) -> &mut Self::Target {
175        assert!(self.0.permit.is_acquired());
176        unsafe { &mut *self.0.inner.c.get() }
177    }
178}
179
180impl<T: fmt::Display> fmt::Display for LockGuard<T> {
181    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
182        fmt::Display::fmt(&**self, f)
183    }
184}