sqlx_postgres/
advisory_lock.rs

1use crate::error::Result;
2use crate::Either;
3use crate::PgConnection;
4use hkdf::Hkdf;
5use once_cell::sync::OnceCell;
6use sha2::Sha256;
7use std::ops::{Deref, DerefMut};
8
9/// A mutex-like type utilizing [Postgres advisory locks].
10///
11/// Advisory locks are a mechanism provided by Postgres to have mutually exclusive or shared
12/// locks tracked in the database with application-defined semantics, as opposed to the standard
13/// row-level or table-level locks which may not fit all use-cases.
14///
15/// This API provides a convenient wrapper for generating and storing the integer keys that
16/// advisory locks use, as well as RAII guards for releasing advisory locks when they fall out
17/// of scope.
18///
19/// This API only handles session-scoped advisory locks (explicitly locked and unlocked, or
20/// automatically released when a connection is closed).
21///
22/// It is also possible to use transaction-scoped locks but those can be used by beginning a
23/// transaction and calling the appropriate lock functions (e.g. `SELECT pg_advisory_xact_lock()`)
24/// manually, and cannot be explicitly released, but are automatically released when a transaction
25/// ends (is committed or rolled back).
26///
27/// Session-level locks can be acquired either inside or outside a transaction and are not
28/// tied to transaction semantics; a lock acquired inside a transaction is still held when that
29/// transaction is committed or rolled back, until explicitly released or the connection is closed.
30///
31/// Locks can be acquired in either shared or exclusive modes, which can be thought of as read locks
32/// and write locks, respectively. Multiple shared locks are allowed for the same key, but a single
33/// exclusive lock prevents any other lock being taken for a given key until it is released.
34///
35/// [Postgres advisory locks]: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
36#[derive(Debug, Clone)]
37pub struct PgAdvisoryLock {
38    key: PgAdvisoryLockKey,
39    /// The query to execute to release this lock.
40    release_query: OnceCell<String>,
41}
42
43/// A key type natively used by Postgres advisory locks.
44///
45/// Currently, Postgres advisory locks have two different key spaces: one keyed by a single
46/// 64-bit integer, and one keyed by a pair of two 32-bit integers. The Postgres docs
47/// specify that these key spaces "do not overlap":
48///
49/// <https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS>
50///
51/// The documentation for the `pg_locks` system view explains further how advisory locks
52/// are treated in Postgres:
53///
54/// <https://www.postgresql.org/docs/current/view-pg-locks.html>
55#[derive(Debug, Clone, PartialEq, Eq)]
56#[non_exhaustive]
57pub enum PgAdvisoryLockKey {
58    /// The keyspace designated by a single 64-bit integer.
59    ///
60    /// When [PgAdvisoryLock] is constructed with [::new()][PgAdvisoryLock::new()],
61    /// this is the keyspace used.
62    BigInt(i64),
63    /// The keyspace designated by two 32-bit integers.
64    IntPair(i32, i32),
65}
66
67/// A wrapper for `PgConnection` (or a similar type) that represents a held Postgres advisory lock.
68///
69/// Can be acquired by [`PgAdvisoryLock::acquire()`] or [`PgAdvisoryLock::try_acquire()`].
70/// Released on-drop or via [`Self::release_now()`].
71///
72/// ### Note: Release-on-drop is not immediate!
73/// On drop, this guard queues a `pg_advisory_unlock()` call on the connection which will be
74/// flushed to the server the next time it is used, or when it is returned to
75/// a [`PgPool`][crate::PgPool] in the case of
76/// [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
77///
78/// This means the lock is not actually released as soon as the guard is dropped. To ensure the
79/// lock is eagerly released, you can call [`.release_now().await`][Self::release_now()].
80pub struct PgAdvisoryLockGuard<'lock, C: AsMut<PgConnection>> {
81    lock: &'lock PgAdvisoryLock,
82    conn: Option<C>,
83}
84
85impl PgAdvisoryLock {
86    /// Construct a `PgAdvisoryLock` using the given string as a key.
87    ///
88    /// This is intended to make it easier to use an advisory lock by using a human-readable string
89    /// for a key as opposed to manually generating a unique integer key. The generated integer key
90    /// is guaranteed to be stable and in the single 64-bit integer keyspace
91    /// (see [`PgAdvisoryLockKey`] for details).
92    ///
93    /// This is done by applying the [Hash-based Key Derivation Function (HKDF; IETF RFC 5869)][hkdf]
94    /// to the bytes of the input string, but in a way that the calculated integer is unlikely
95    /// to collide with any similar implementations (although we don't currently know of any).
96    /// See the source of this method for details.
97    ///
98    /// [hkdf]: https://datatracker.ietf.org/doc/html/rfc5869
99    /// ### Example
100    /// ```rust
101    /// use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockKey};
102    ///
103    /// let lock = PgAdvisoryLock::new("my first Postgres advisory lock!");
104    /// // Negative values are fine because of how Postgres treats advisory lock keys.
105    /// // See the documentation for the `pg_locks` system view for details.
106    /// assert_eq!(lock.key(), &PgAdvisoryLockKey::BigInt(-5560419505042474287));
107    /// ```
108    pub fn new(key_string: impl AsRef<str>) -> Self {
109        let input_key_material = key_string.as_ref();
110
111        // HKDF was chosen because it is designed to concentrate the entropy in a variable-length
112        // input key and produce a higher quality but reduced-length output key with a
113        // well-specified and reproducible algorithm.
114        //
115        // Granted, the input key is usually meant to be pseudorandom and not human readable,
116        // but we're not trying to produce an unguessable value by any means; just one that's as
117        // unlikely to already be in use as possible, but still deterministic.
118        //
119        // SHA-256 was chosen as the hash function because it's already used in the Postgres driver,
120        // which should save on codegen and optimization.
121
122        // We don't supply a salt as that is intended to be random, but we want a deterministic key.
123        let hkdf = Hkdf::<Sha256>::new(None, input_key_material.as_bytes());
124
125        let mut output_key_material = [0u8; 8];
126
127        // The first string is the "info" string of the HKDF which is intended to tie the output
128        // exclusively to SQLx. This should avoid collisions with implementations using a similar
129        // strategy. If you _want_ this to match some other implementation then you should get
130        // the calculated integer key from it and use that directly.
131        //
132        // Do *not* change this string as it will affect the output!
133        hkdf.expand(
134            b"SQLx (Rust) Postgres advisory lock",
135            &mut output_key_material,
136        )
137        // `Hkdf::expand()` only returns an error if you ask for more than 255 times the digest size.
138        // This is specified by RFC 5869 but not elaborated upon:
139        // https://datatracker.ietf.org/doc/html/rfc5869#section-2.3
140        // Since we're only asking for 8 bytes, this error shouldn't be returned.
141        .expect("BUG: `output_key_material` should be of acceptable length");
142
143        // For ease of use, this method assumes the user doesn't care which keyspace is used.
144        //
145        // It doesn't seem likely that someone would care about using the `(int, int)` keyspace
146        // specifically unless they already had keys to use, in which case they wouldn't
147        // care about this method. That's why we also provide `with_key()`.
148        //
149        // The choice of `from_le_bytes()` is mostly due to x86 being the most popular
150        // architecture for server software, so it should be a no-op there.
151        let key = PgAdvisoryLockKey::BigInt(i64::from_le_bytes(output_key_material));
152
153        tracing::trace!(
154            ?key,
155            key_string = ?input_key_material,
156            "generated key from key string",
157        );
158
159        Self::with_key(key)
160    }
161
162    /// Construct a `PgAdvisoryLock` with a manually supplied key.
163    pub fn with_key(key: PgAdvisoryLockKey) -> Self {
164        Self {
165            key,
166            release_query: OnceCell::new(),
167        }
168    }
169
170    /// Returns the current key.
171    pub fn key(&self) -> &PgAdvisoryLockKey {
172        &self.key
173    }
174
175    // Why doesn't this use `Acquire`? Well, I tried it and got really useless errors
176    // about "cannot project lifetimes to parent scope".
177    //
178    // It has something to do with how lifetimes work on the `Acquire` trait, I couldn't
179    // be bothered to figure it out. Probably another issue with a lack of `async fn` in traits
180    // or lazy normalization.
181
182    /// Acquires an exclusive lock using `pg_advisory_lock()`, waiting until the lock is acquired.
183    ///
184    /// For a version that returns immediately instead of waiting, see [`Self::try_acquire()`].
185    ///
186    /// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
187    /// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
188    /// any of these.
189    ///
190    /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
191    /// which will be executed the next time the connection is used, or when returned to a
192    /// [`PgPool`][crate::PgPool] in the case of `PoolConnection<Postgres>`.
193    ///
194    /// Postgres allows a single connection to acquire a given lock more than once without releasing
195    /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
196    /// must match the number of lock operations for the lock to actually be released.
197    ///
198    /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
199    ///
200    /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
201    pub async fn acquire<C: AsMut<PgConnection>>(
202        &self,
203        mut conn: C,
204    ) -> Result<PgAdvisoryLockGuard<'_, C>> {
205        match &self.key {
206            PgAdvisoryLockKey::BigInt(key) => {
207                crate::query::query("SELECT pg_advisory_lock($1)")
208                    .bind(key)
209                    .execute(conn.as_mut())
210                    .await?;
211            }
212            PgAdvisoryLockKey::IntPair(key1, key2) => {
213                crate::query::query("SELECT pg_advisory_lock($1, $2)")
214                    .bind(key1)
215                    .bind(key2)
216                    .execute(conn.as_mut())
217                    .await?;
218            }
219        }
220
221        Ok(PgAdvisoryLockGuard::new(self, conn))
222    }
223
224    /// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
225    /// if the lock could not be acquired.
226    ///
227    /// For a version that waits until the lock is acquired, see [`Self::acquire()`].
228    ///
229    /// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
230    /// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
231    /// any of these. The connection is returned if the lock could not be acquired.
232    ///
233    /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
234    /// which will be executed the next time the connection is used, or when returned to a
235    /// [`PgPool`][crate::PgPool] in the case of `PoolConnection<Postgres>`.
236    ///
237    /// Postgres allows a single connection to acquire a given lock more than once without releasing
238    /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
239    /// must match the number of lock operations for the lock to actually be released.
240    ///
241    /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
242    ///
243    /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
244    pub async fn try_acquire<C: AsMut<PgConnection>>(
245        &self,
246        mut conn: C,
247    ) -> Result<Either<PgAdvisoryLockGuard<'_, C>, C>> {
248        let locked: bool = match &self.key {
249            PgAdvisoryLockKey::BigInt(key) => {
250                crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)")
251                    .bind(key)
252                    .fetch_one(conn.as_mut())
253                    .await?
254            }
255            PgAdvisoryLockKey::IntPair(key1, key2) => {
256                crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1, $2)")
257                    .bind(key1)
258                    .bind(key2)
259                    .fetch_one(conn.as_mut())
260                    .await?
261            }
262        };
263
264        if locked {
265            Ok(Either::Left(PgAdvisoryLockGuard::new(self, conn)))
266        } else {
267            Ok(Either::Right(conn))
268        }
269    }
270
271    /// Execute `pg_advisory_unlock()` for this lock's key on the given connection.
272    ///
273    /// This is used by [`PgAdvisoryLockGuard::release_now()`] and is also provided for manually
274    /// releasing the lock from connections returned by [`PgAdvisoryLockGuard::leak()`].
275    ///
276    /// An error should only be returned if there is something wrong with the connection,
277    /// in which case the lock will be automatically released by the connection closing anyway.
278    ///
279    /// The `boolean` value is that returned by `pg_advisory_lock()`. If it is `false`, it
280    /// indicates that the lock was not actually held by the given connection and that a warning
281    /// has been logged by the Postgres server.
282    pub async fn force_release<C: AsMut<PgConnection>>(&self, mut conn: C) -> Result<(C, bool)> {
283        let released: bool = match &self.key {
284            PgAdvisoryLockKey::BigInt(key) => {
285                crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1)")
286                    .bind(key)
287                    .fetch_one(conn.as_mut())
288                    .await?
289            }
290            PgAdvisoryLockKey::IntPair(key1, key2) => {
291                crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1, $2)")
292                    .bind(key1)
293                    .bind(key2)
294                    .fetch_one(conn.as_mut())
295                    .await?
296            }
297        };
298
299        Ok((conn, released))
300    }
301
302    fn get_release_query(&self) -> &str {
303        self.release_query.get_or_init(|| match &self.key {
304            PgAdvisoryLockKey::BigInt(key) => format!("SELECT pg_advisory_unlock({key})"),
305            PgAdvisoryLockKey::IntPair(key1, key2) => {
306                format!("SELECT pg_advisory_unlock({key1}, {key2})")
307            }
308        })
309    }
310}
311
312impl PgAdvisoryLockKey {
313    /// Converts `Self::Bigint(bigint)` to `Some(bigint)` and all else to `None`.
314    pub fn as_bigint(&self) -> Option<i64> {
315        if let Self::BigInt(bigint) = self {
316            Some(*bigint)
317        } else {
318            None
319        }
320    }
321}
322
323const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken";
324
325impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
326    fn new(lock: &'lock PgAdvisoryLock, conn: C) -> Self {
327        PgAdvisoryLockGuard {
328            lock,
329            conn: Some(conn),
330        }
331    }
332
333    /// Immediately release the held advisory lock instead of when the connection is next used.
334    ///
335    /// An error should only be returned if there is something wrong with the connection,
336    /// in which case the lock will be automatically released by the connection closing anyway.
337    ///
338    /// If `pg_advisory_unlock()` returns `false`, a warning will be logged, both by SQLx as
339    /// well as the Postgres server. This would only happen if the lock was released without
340    /// using this guard, or the connection was swapped using [`std::mem::replace()`].
341    pub async fn release_now(mut self) -> Result<C> {
342        let (conn, released) = self
343            .lock
344            .force_release(self.conn.take().expect(NONE_ERR))
345            .await?;
346
347        if !released {
348            tracing::warn!(
349                lock = ?self.lock.key,
350                "PgAdvisoryLockGuard: advisory lock was not held by the contained connection",
351            );
352        }
353
354        Ok(conn)
355    }
356
357    /// Cancel the release of the advisory lock, keeping it held until the connection is closed.
358    ///
359    /// To manually release the lock later, see [`PgAdvisoryLock::force_release()`].
360    pub fn leak(mut self) -> C {
361        self.conn.take().expect(NONE_ERR)
362    }
363}
364
365impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<'lock, C> {
366    type Target = PgConnection;
367
368    fn deref(&self) -> &Self::Target {
369        self.conn.as_ref().expect(NONE_ERR).as_ref()
370    }
371}
372
373/// Mutable access to the underlying connection is provided so it can still be used like normal,
374/// even allowing locks to be taken recursively.
375///
376/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
377/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
378/// guard attempts to release the lock.
379impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut
380    for PgAdvisoryLockGuard<'lock, C>
381{
382    fn deref_mut(&mut self) -> &mut Self::Target {
383        self.conn.as_mut().expect(NONE_ERR).as_mut()
384    }
385}
386
387impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
388    for PgAdvisoryLockGuard<'lock, C>
389{
390    fn as_ref(&self) -> &PgConnection {
391        self.conn.as_ref().expect(NONE_ERR).as_ref()
392    }
393}
394
395/// Mutable access to the underlying connection is provided so it can still be used like normal,
396/// even allowing locks to be taken recursively.
397///
398/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
399/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
400/// guard attempts to release the lock.
401impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<'lock, C> {
402    fn as_mut(&mut self) -> &mut PgConnection {
403        self.conn.as_mut().expect(NONE_ERR).as_mut()
404    }
405}
406
407/// Queues a `pg_advisory_unlock()` call on the wrapped connection which will be flushed
408/// to the server the next time it is used, or when it is returned to [`PgPool`][crate::PgPool]
409/// in the case of [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
410impl<'lock, C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<'lock, C> {
411    fn drop(&mut self) {
412        if let Some(mut conn) = self.conn.take() {
413            // Queue a simple query message to execute next time the connection is used.
414            // The `async fn` versions can safely use the prepared statement protocol,
415            // but this is the safest way to queue a query to execute on the next opportunity.
416            conn.as_mut()
417                .queue_simple_query(self.lock.get_release_query())
418                .expect("BUG: PgAdvisoryLock::get_release_query() somehow too long for protocol");
419        }
420    }
421}