sqlx_postgres/advisory_lock.rs
1use crate::error::Result;
2use crate::Either;
3use crate::PgConnection;
4use hkdf::Hkdf;
5use once_cell::sync::OnceCell;
6use sha2::Sha256;
7use std::ops::{Deref, DerefMut};
8
9/// A mutex-like type utilizing [Postgres advisory locks].
10///
11/// Advisory locks are a mechanism provided by Postgres to have mutually exclusive or shared
12/// locks tracked in the database with application-defined semantics, as opposed to the standard
13/// row-level or table-level locks which may not fit all use-cases.
14///
15/// This API provides a convenient wrapper for generating and storing the integer keys that
16/// advisory locks use, as well as RAII guards for releasing advisory locks when they fall out
17/// of scope.
18///
19/// This API only handles session-scoped advisory locks (explicitly locked and unlocked, or
20/// automatically released when a connection is closed).
21///
22/// It is also possible to use transaction-scoped locks but those can be used by beginning a
23/// transaction and calling the appropriate lock functions (e.g. `SELECT pg_advisory_xact_lock()`)
24/// manually, and cannot be explicitly released, but are automatically released when a transaction
25/// ends (is committed or rolled back).
26///
27/// Session-level locks can be acquired either inside or outside a transaction and are not
28/// tied to transaction semantics; a lock acquired inside a transaction is still held when that
29/// transaction is committed or rolled back, until explicitly released or the connection is closed.
30///
31/// Locks can be acquired in either shared or exclusive modes, which can be thought of as read locks
32/// and write locks, respectively. Multiple shared locks are allowed for the same key, but a single
33/// exclusive lock prevents any other lock being taken for a given key until it is released.
34///
35/// [Postgres advisory locks]: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
36#[derive(Debug, Clone)]
37pub struct PgAdvisoryLock {
38 key: PgAdvisoryLockKey,
39 /// The query to execute to release this lock.
40 release_query: OnceCell<String>,
41}
42
43/// A key type natively used by Postgres advisory locks.
44///
45/// Currently, Postgres advisory locks have two different key spaces: one keyed by a single
46/// 64-bit integer, and one keyed by a pair of two 32-bit integers. The Postgres docs
47/// specify that these key spaces "do not overlap":
48///
49/// <https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS>
50///
51/// The documentation for the `pg_locks` system view explains further how advisory locks
52/// are treated in Postgres:
53///
54/// <https://www.postgresql.org/docs/current/view-pg-locks.html>
55#[derive(Debug, Clone, PartialEq, Eq)]
56#[non_exhaustive]
57pub enum PgAdvisoryLockKey {
58 /// The keyspace designated by a single 64-bit integer.
59 ///
60 /// When [PgAdvisoryLock] is constructed with [::new()][PgAdvisoryLock::new()],
61 /// this is the keyspace used.
62 BigInt(i64),
63 /// The keyspace designated by two 32-bit integers.
64 IntPair(i32, i32),
65}
66
67/// A wrapper for `PgConnection` (or a similar type) that represents a held Postgres advisory lock.
68///
69/// Can be acquired by [`PgAdvisoryLock::acquire()`] or [`PgAdvisoryLock::try_acquire()`].
70/// Released on-drop or via [`Self::release_now()`].
71///
72/// ### Note: Release-on-drop is not immediate!
73/// On drop, this guard queues a `pg_advisory_unlock()` call on the connection which will be
74/// flushed to the server the next time it is used, or when it is returned to
75/// a [`PgPool`][crate::PgPool] in the case of
76/// [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
77///
78/// This means the lock is not actually released as soon as the guard is dropped. To ensure the
79/// lock is eagerly released, you can call [`.release_now().await`][Self::release_now()].
80pub struct PgAdvisoryLockGuard<'lock, C: AsMut<PgConnection>> {
81 lock: &'lock PgAdvisoryLock,
82 conn: Option<C>,
83}
84
85impl PgAdvisoryLock {
86 /// Construct a `PgAdvisoryLock` using the given string as a key.
87 ///
88 /// This is intended to make it easier to use an advisory lock by using a human-readable string
89 /// for a key as opposed to manually generating a unique integer key. The generated integer key
90 /// is guaranteed to be stable and in the single 64-bit integer keyspace
91 /// (see [`PgAdvisoryLockKey`] for details).
92 ///
93 /// This is done by applying the [Hash-based Key Derivation Function (HKDF; IETF RFC 5869)][hkdf]
94 /// to the bytes of the input string, but in a way that the calculated integer is unlikely
95 /// to collide with any similar implementations (although we don't currently know of any).
96 /// See the source of this method for details.
97 ///
98 /// [hkdf]: https://datatracker.ietf.org/doc/html/rfc5869
99 /// ### Example
100 /// ```rust
101 /// use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockKey};
102 ///
103 /// let lock = PgAdvisoryLock::new("my first Postgres advisory lock!");
104 /// // Negative values are fine because of how Postgres treats advisory lock keys.
105 /// // See the documentation for the `pg_locks` system view for details.
106 /// assert_eq!(lock.key(), &PgAdvisoryLockKey::BigInt(-5560419505042474287));
107 /// ```
108 pub fn new(key_string: impl AsRef<str>) -> Self {
109 let input_key_material = key_string.as_ref();
110
111 // HKDF was chosen because it is designed to concentrate the entropy in a variable-length
112 // input key and produce a higher quality but reduced-length output key with a
113 // well-specified and reproducible algorithm.
114 //
115 // Granted, the input key is usually meant to be pseudorandom and not human readable,
116 // but we're not trying to produce an unguessable value by any means; just one that's as
117 // unlikely to already be in use as possible, but still deterministic.
118 //
119 // SHA-256 was chosen as the hash function because it's already used in the Postgres driver,
120 // which should save on codegen and optimization.
121
122 // We don't supply a salt as that is intended to be random, but we want a deterministic key.
123 let hkdf = Hkdf::<Sha256>::new(None, input_key_material.as_bytes());
124
125 let mut output_key_material = [0u8; 8];
126
127 // The first string is the "info" string of the HKDF which is intended to tie the output
128 // exclusively to SQLx. This should avoid collisions with implementations using a similar
129 // strategy. If you _want_ this to match some other implementation then you should get
130 // the calculated integer key from it and use that directly.
131 //
132 // Do *not* change this string as it will affect the output!
133 hkdf.expand(
134 b"SQLx (Rust) Postgres advisory lock",
135 &mut output_key_material,
136 )
137 // `Hkdf::expand()` only returns an error if you ask for more than 255 times the digest size.
138 // This is specified by RFC 5869 but not elaborated upon:
139 // https://datatracker.ietf.org/doc/html/rfc5869#section-2.3
140 // Since we're only asking for 8 bytes, this error shouldn't be returned.
141 .expect("BUG: `output_key_material` should be of acceptable length");
142
143 // For ease of use, this method assumes the user doesn't care which keyspace is used.
144 //
145 // It doesn't seem likely that someone would care about using the `(int, int)` keyspace
146 // specifically unless they already had keys to use, in which case they wouldn't
147 // care about this method. That's why we also provide `with_key()`.
148 //
149 // The choice of `from_le_bytes()` is mostly due to x86 being the most popular
150 // architecture for server software, so it should be a no-op there.
151 let key = PgAdvisoryLockKey::BigInt(i64::from_le_bytes(output_key_material));
152
153 tracing::trace!(
154 ?key,
155 key_string = ?input_key_material,
156 "generated key from key string",
157 );
158
159 Self::with_key(key)
160 }
161
162 /// Construct a `PgAdvisoryLock` with a manually supplied key.
163 pub fn with_key(key: PgAdvisoryLockKey) -> Self {
164 Self {
165 key,
166 release_query: OnceCell::new(),
167 }
168 }
169
170 /// Returns the current key.
171 pub fn key(&self) -> &PgAdvisoryLockKey {
172 &self.key
173 }
174
175 // Why doesn't this use `Acquire`? Well, I tried it and got really useless errors
176 // about "cannot project lifetimes to parent scope".
177 //
178 // It has something to do with how lifetimes work on the `Acquire` trait, I couldn't
179 // be bothered to figure it out. Probably another issue with a lack of `async fn` in traits
180 // or lazy normalization.
181
182 /// Acquires an exclusive lock using `pg_advisory_lock()`, waiting until the lock is acquired.
183 ///
184 /// For a version that returns immediately instead of waiting, see [`Self::try_acquire()`].
185 ///
186 /// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
187 /// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
188 /// any of these.
189 ///
190 /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
191 /// which will be executed the next time the connection is used, or when returned to a
192 /// [`PgPool`][crate::PgPool] in the case of `PoolConnection<Postgres>`.
193 ///
194 /// Postgres allows a single connection to acquire a given lock more than once without releasing
195 /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
196 /// must match the number of lock operations for the lock to actually be released.
197 ///
198 /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
199 ///
200 /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
201 pub async fn acquire<C: AsMut<PgConnection>>(
202 &self,
203 mut conn: C,
204 ) -> Result<PgAdvisoryLockGuard<'_, C>> {
205 match &self.key {
206 PgAdvisoryLockKey::BigInt(key) => {
207 crate::query::query("SELECT pg_advisory_lock($1)")
208 .bind(key)
209 .execute(conn.as_mut())
210 .await?;
211 }
212 PgAdvisoryLockKey::IntPair(key1, key2) => {
213 crate::query::query("SELECT pg_advisory_lock($1, $2)")
214 .bind(key1)
215 .bind(key2)
216 .execute(conn.as_mut())
217 .await?;
218 }
219 }
220
221 Ok(PgAdvisoryLockGuard::new(self, conn))
222 }
223
224 /// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
225 /// if the lock could not be acquired.
226 ///
227 /// For a version that waits until the lock is acquired, see [`Self::acquire()`].
228 ///
229 /// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
230 /// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
231 /// any of these. The connection is returned if the lock could not be acquired.
232 ///
233 /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
234 /// which will be executed the next time the connection is used, or when returned to a
235 /// [`PgPool`][crate::PgPool] in the case of `PoolConnection<Postgres>`.
236 ///
237 /// Postgres allows a single connection to acquire a given lock more than once without releasing
238 /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
239 /// must match the number of lock operations for the lock to actually be released.
240 ///
241 /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
242 ///
243 /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
244 pub async fn try_acquire<C: AsMut<PgConnection>>(
245 &self,
246 mut conn: C,
247 ) -> Result<Either<PgAdvisoryLockGuard<'_, C>, C>> {
248 let locked: bool = match &self.key {
249 PgAdvisoryLockKey::BigInt(key) => {
250 crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)")
251 .bind(key)
252 .fetch_one(conn.as_mut())
253 .await?
254 }
255 PgAdvisoryLockKey::IntPair(key1, key2) => {
256 crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1, $2)")
257 .bind(key1)
258 .bind(key2)
259 .fetch_one(conn.as_mut())
260 .await?
261 }
262 };
263
264 if locked {
265 Ok(Either::Left(PgAdvisoryLockGuard::new(self, conn)))
266 } else {
267 Ok(Either::Right(conn))
268 }
269 }
270
271 /// Execute `pg_advisory_unlock()` for this lock's key on the given connection.
272 ///
273 /// This is used by [`PgAdvisoryLockGuard::release_now()`] and is also provided for manually
274 /// releasing the lock from connections returned by [`PgAdvisoryLockGuard::leak()`].
275 ///
276 /// An error should only be returned if there is something wrong with the connection,
277 /// in which case the lock will be automatically released by the connection closing anyway.
278 ///
279 /// The `boolean` value is that returned by `pg_advisory_lock()`. If it is `false`, it
280 /// indicates that the lock was not actually held by the given connection and that a warning
281 /// has been logged by the Postgres server.
282 pub async fn force_release<C: AsMut<PgConnection>>(&self, mut conn: C) -> Result<(C, bool)> {
283 let released: bool = match &self.key {
284 PgAdvisoryLockKey::BigInt(key) => {
285 crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1)")
286 .bind(key)
287 .fetch_one(conn.as_mut())
288 .await?
289 }
290 PgAdvisoryLockKey::IntPair(key1, key2) => {
291 crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1, $2)")
292 .bind(key1)
293 .bind(key2)
294 .fetch_one(conn.as_mut())
295 .await?
296 }
297 };
298
299 Ok((conn, released))
300 }
301
302 fn get_release_query(&self) -> &str {
303 self.release_query.get_or_init(|| match &self.key {
304 PgAdvisoryLockKey::BigInt(key) => format!("SELECT pg_advisory_unlock({key})"),
305 PgAdvisoryLockKey::IntPair(key1, key2) => {
306 format!("SELECT pg_advisory_unlock({key1}, {key2})")
307 }
308 })
309 }
310}
311
312impl PgAdvisoryLockKey {
313 /// Converts `Self::Bigint(bigint)` to `Some(bigint)` and all else to `None`.
314 pub fn as_bigint(&self) -> Option<i64> {
315 if let Self::BigInt(bigint) = self {
316 Some(*bigint)
317 } else {
318 None
319 }
320 }
321}
322
323const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken";
324
325impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
326 fn new(lock: &'lock PgAdvisoryLock, conn: C) -> Self {
327 PgAdvisoryLockGuard {
328 lock,
329 conn: Some(conn),
330 }
331 }
332
333 /// Immediately release the held advisory lock instead of when the connection is next used.
334 ///
335 /// An error should only be returned if there is something wrong with the connection,
336 /// in which case the lock will be automatically released by the connection closing anyway.
337 ///
338 /// If `pg_advisory_unlock()` returns `false`, a warning will be logged, both by SQLx as
339 /// well as the Postgres server. This would only happen if the lock was released without
340 /// using this guard, or the connection was swapped using [`std::mem::replace()`].
341 pub async fn release_now(mut self) -> Result<C> {
342 let (conn, released) = self
343 .lock
344 .force_release(self.conn.take().expect(NONE_ERR))
345 .await?;
346
347 if !released {
348 tracing::warn!(
349 lock = ?self.lock.key,
350 "PgAdvisoryLockGuard: advisory lock was not held by the contained connection",
351 );
352 }
353
354 Ok(conn)
355 }
356
357 /// Cancel the release of the advisory lock, keeping it held until the connection is closed.
358 ///
359 /// To manually release the lock later, see [`PgAdvisoryLock::force_release()`].
360 pub fn leak(mut self) -> C {
361 self.conn.take().expect(NONE_ERR)
362 }
363}
364
365impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<'lock, C> {
366 type Target = PgConnection;
367
368 fn deref(&self) -> &Self::Target {
369 self.conn.as_ref().expect(NONE_ERR).as_ref()
370 }
371}
372
373/// Mutable access to the underlying connection is provided so it can still be used like normal,
374/// even allowing locks to be taken recursively.
375///
376/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
377/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
378/// guard attempts to release the lock.
379impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut
380 for PgAdvisoryLockGuard<'lock, C>
381{
382 fn deref_mut(&mut self) -> &mut Self::Target {
383 self.conn.as_mut().expect(NONE_ERR).as_mut()
384 }
385}
386
387impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
388 for PgAdvisoryLockGuard<'lock, C>
389{
390 fn as_ref(&self) -> &PgConnection {
391 self.conn.as_ref().expect(NONE_ERR).as_ref()
392 }
393}
394
395/// Mutable access to the underlying connection is provided so it can still be used like normal,
396/// even allowing locks to be taken recursively.
397///
398/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
399/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
400/// guard attempts to release the lock.
401impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<'lock, C> {
402 fn as_mut(&mut self) -> &mut PgConnection {
403 self.conn.as_mut().expect(NONE_ERR).as_mut()
404 }
405}
406
407/// Queues a `pg_advisory_unlock()` call on the wrapped connection which will be flushed
408/// to the server the next time it is used, or when it is returned to [`PgPool`][crate::PgPool]
409/// in the case of [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
410impl<'lock, C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<'lock, C> {
411 fn drop(&mut self) {
412 if let Some(mut conn) = self.conn.take() {
413 // Queue a simple query message to execute next time the connection is used.
414 // The `async fn` versions can safely use the prepared statement protocol,
415 // but this is the safest way to queue a query to execute on the next opportunity.
416 conn.as_mut()
417 .queue_simple_query(self.lock.get_release_query())
418 .expect("BUG: PgAdvisoryLock::get_release_query() somehow too long for protocol");
419 }
420 }
421}