1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
use crate::error::Result;
use crate::Either;
use crate::PgConnection;
use hkdf::Hkdf;
use once_cell::sync::OnceCell;
use sha2::Sha256;
use std::ops::{Deref, DerefMut};

/// A mutex-like type utilizing [Postgres advisory locks].
///
/// Advisory locks are a mechanism provided by Postgres to have mutually exclusive or shared
/// locks tracked in the database with application-defined semantics, as opposed to the standard
/// row-level or table-level locks which may not fit all use-cases.
///
/// This API provides a convenient wrapper for generating and storing the integer keys that
/// advisory locks use, as well as RAII guards for releasing advisory locks when they fall out
/// of scope.
///
/// This API only handles session-scoped advisory locks (explicitly locked and unlocked, or
/// automatically released when a connection is closed).
///
/// It is also possible to use transaction-scoped locks but those can be used by beginning a
/// transaction and calling the appropriate lock functions (e.g. `SELECT pg_advisory_xact_lock()`)
/// manually, and cannot be explicitly released, but are automatically released when a transaction
/// ends (is committed or rolled back).
///
/// Session-level locks can be acquired either inside or outside a transaction and are not
/// tied to transaction semantics; a lock acquired inside a transaction is still held when that
/// transaction is committed or rolled back, until explicitly released or the connection is closed.
///
/// Locks can be acquired in either shared or exclusive modes, which can be thought of as read locks
/// and write locks, respectively. Multiple shared locks are allowed for the same key, but a single
/// exclusive lock prevents any other lock being taken for a given key until it is released.
///
/// [Postgres advisory locks]: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
#[derive(Debug, Clone)]
pub struct PgAdvisoryLock {
    key: PgAdvisoryLockKey,
    /// The query to execute to release this lock.
    release_query: OnceCell<String>,
}

/// A key type natively used by Postgres advisory locks.
///
/// Currently, Postgres advisory locks have two different key spaces: one keyed by a single
/// 64-bit integer, and one keyed by a pair of two 32-bit integers. The Postgres docs
/// specify that these key spaces "do not overlap":
///
/// https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
///
/// The documentation for the `pg_locks` system view explains further how advisory locks
/// are treated in Postgres:
///
/// https://www.postgresql.org/docs/current/view-pg-locks.html
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum PgAdvisoryLockKey {
    /// The keyspace designated by a single 64-bit integer.
    ///
    /// When [PgAdvisoryLock] is constructed with [::new()][PgAdvisoryLock::new()],
    /// this is the keyspace used.
    BigInt(i64),
    /// The keyspace designated by two 32-bit integers.
    IntPair(i32, i32),
}

/// A wrapper for `PgConnection` (or a similar type) that represents a held Postgres advisory lock.
///
/// Can be acquired by [`PgAdvisoryLock::acquire()`] or [`PgAdvisoryLock::try_acquire()`].
/// Released on-drop or via [`Self::release_now()`].
///
/// ### Note: Release-on-drop is not immediate!
/// On drop, this guard queues a `pg_advisory_unlock()` call on the connection which will be
/// flushed to the server the next time it is used, or when it is returned to
/// a [`PgPool`][crate::PgPool] in the case of
/// [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
///
/// This means the lock is not actually released as soon as the guard is dropped. To ensure the
/// lock is eagerly released, you can call [`.release_now().await`][Self::release_now()].
pub struct PgAdvisoryLockGuard<'lock, C: AsMut<PgConnection>> {
    lock: &'lock PgAdvisoryLock,
    conn: Option<C>,
}

impl PgAdvisoryLock {
    /// Construct a `PgAdvisoryLock` using the given string as a key.
    ///
    /// This is intended to make it easier to use an advisory lock by using a human-readable string
    /// for a key as opposed to manually generating a unique integer key. The generated integer key
    /// is guaranteed to be stable and in the single 64-bit integer keyspace
    /// (see [`PgAdvisoryLockKey`] for details).
    ///
    /// This is done by applying the [Hash-based Key Derivation Function (HKDF; IETF RFC 5869)][hkdf]
    /// to the bytes of the input string, but in a way that the calculated integer is unlikely
    /// to collide with any similar implementations (although we don't currently know of any).
    /// See the source of this method for details.
    ///
    /// [hkdf]: https://datatracker.ietf.org/doc/html/rfc5869
    /// ### Example
    /// ```rust
    /// # extern crate sqlx_core as sqlx;
    /// use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockKey};
    ///
    /// let lock = PgAdvisoryLock::new("my first Postgres advisory lock!");
    /// // Negative values are fine because of how Postgres treats advisory lock keys.
    /// // See the documentation for the `pg_locks` system view for details.
    /// assert_eq!(lock.key(), &PgAdvisoryLockKey::BigInt(-5560419505042474287));
    /// ```
    pub fn new(key_string: impl AsRef<str>) -> Self {
        let input_key_material = key_string.as_ref();

        // HKDF was chosen because it is designed to concentrate the entropy in a variable-length
        // input key and produce a higher quality but reduced-length output key with a
        // well-specified and reproducible algorithm.
        //
        // Granted, the input key is usually meant to be pseudorandom and not human readable,
        // but we're not trying to produce an unguessable value by any means; just one that's as
        // unlikely to already be in use as possible, but still deterministic.
        //
        // SHA-256 was chosen as the hash function because it's already used in the Postgres driver,
        // which should save on codegen and optimization.

        // We don't supply a salt as that is intended to be random, but we want a deterministic key.
        let hkdf = Hkdf::<Sha256>::new(None, input_key_material.as_bytes());

        let mut output_key_material = [0u8; 8];

        // The first string is the "info" string of the HKDF which is intended to tie the output
        // exclusively to SQLx. This should avoid collisions with implementations using a similar
        // strategy. If you _want_ this to match some other implementation then you should get
        // the calculated integer key from it and use that directly.
        //
        // Do *not* change this string as it will affect the output!
        hkdf.expand(
            b"SQLx (Rust) Postgres advisory lock",
            &mut output_key_material,
        )
        // `Hkdf::expand()` only returns an error if you ask for more than 255 times the digest size.
        // This is specified by RFC 5869 but not elaborated upon:
        // https://datatracker.ietf.org/doc/html/rfc5869#section-2.3
        // Since we're only asking for 8 bytes, this error shouldn't be returned.
        .expect("BUG: `output_key_material` should be of acceptable length");

        // For ease of use, this method assumes the user doesn't care which keyspace is used.
        //
        // It doesn't seem likely that someone would care about using the `(int, int)` keyspace
        // specifically unless they already had keys to use, in which case they wouldn't
        // care about this method. That's why we also provide `with_key()`.
        //
        // The choice of `from_le_bytes()` is mostly due to x86 being the most popular
        // architecture for server software, so it should be a no-op there.
        let key = PgAdvisoryLockKey::BigInt(i64::from_le_bytes(output_key_material));

        tracing::trace!(
            ?key,
            key_string = ?input_key_material,
            "generated key from key string",
        );

        Self::with_key(key)
    }

    /// Construct a `PgAdvisoryLock` with a manually supplied key.
    pub fn with_key(key: PgAdvisoryLockKey) -> Self {
        Self {
            key,
            release_query: OnceCell::new(),
        }
    }

    /// Returns the current key.
    pub fn key(&self) -> &PgAdvisoryLockKey {
        &self.key
    }

    // Why doesn't this use `Acquire`? Well, I tried it and got really useless errors
    // about "cannot project lifetimes to parent scope".
    //
    // It has something to do with how lifetimes work on the `Acquire` trait, I couldn't
    // be bothered to figure it out. Probably another issue with a lack of `async fn` in traits
    // or lazy normalization.

    /// Acquires an exclusive lock using `pg_advisory_lock()`, waiting until the lock is acquired.
    ///
    /// For a version that returns immediately instead of waiting, see [`Self::try_acquire()`].
    ///
    /// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
    /// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
    /// any of these.
    ///
    /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
    /// which will be executed the next time the connection is used, or when returned to a
    /// [`PgPool`][crate::PgPool] in the case of `PoolConnection<Postgres>`.
    ///
    /// Postgres allows a single connection to acquire a given lock more than once without releasing
    /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
    /// must match the number of lock operations for the lock to actually be released.
    ///
    /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
    ///
    /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
    pub async fn acquire<C: AsMut<PgConnection>>(
        &self,
        mut conn: C,
    ) -> Result<PgAdvisoryLockGuard<'_, C>> {
        match &self.key {
            PgAdvisoryLockKey::BigInt(key) => {
                crate::query::query("SELECT pg_advisory_lock($1)")
                    .bind(key)
                    .execute(conn.as_mut())
                    .await?;
            }
            PgAdvisoryLockKey::IntPair(key1, key2) => {
                crate::query::query("SELECT pg_advisory_lock($1, $2)")
                    .bind(key1)
                    .bind(key2)
                    .execute(conn.as_mut())
                    .await?;
            }
        }

        Ok(PgAdvisoryLockGuard::new(self, conn))
    }

    /// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
    /// if the lock could not be acquired.
    ///
    /// For a version that waits until the lock is acquired, see [`Self::acquire()`].
    ///
    /// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
    /// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
    /// any of these. The connection is returned if the lock could not be acquired.
    ///
    /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
    /// which will be executed the next time the connection is used, or when returned to a
    /// [`PgPool`][crate::PgPool] in the case of `PoolConnection<Postgres>`.
    ///
    /// Postgres allows a single connection to acquire a given lock more than once without releasing
    /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
    /// must match the number of lock operations for the lock to actually be released.
    ///
    /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
    ///
    /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
    pub async fn try_acquire<C: AsMut<PgConnection>>(
        &self,
        mut conn: C,
    ) -> Result<Either<PgAdvisoryLockGuard<'_, C>, C>> {
        let locked: bool = match &self.key {
            PgAdvisoryLockKey::BigInt(key) => {
                crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)")
                    .bind(key)
                    .fetch_one(conn.as_mut())
                    .await?
            }
            PgAdvisoryLockKey::IntPair(key1, key2) => {
                crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1, $2)")
                    .bind(key1)
                    .bind(key2)
                    .fetch_one(conn.as_mut())
                    .await?
            }
        };

        if locked {
            Ok(Either::Left(PgAdvisoryLockGuard::new(self, conn)))
        } else {
            Ok(Either::Right(conn))
        }
    }

    /// Execute `pg_advisory_unlock()` for this lock's key on the given connection.
    ///
    /// This is used by [`PgAdvisoryLockGuard::release_now()`] and is also provided for manually
    /// releasing the lock from connections returned by [`PgAdvisoryLockGuard::leak()`].
    ///
    /// An error should only be returned if there is something wrong with the connection,
    /// in which case the lock will be automatically released by the connection closing anyway.
    ///
    /// The `boolean` value is that returned by `pg_advisory_lock()`. If it is `false`, it
    /// indicates that the lock was not actually held by the given connection and that a warning
    /// has been logged by the Postgres server.
    pub async fn force_release<C: AsMut<PgConnection>>(&self, mut conn: C) -> Result<(C, bool)> {
        let released: bool = match &self.key {
            PgAdvisoryLockKey::BigInt(key) => {
                crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1)")
                    .bind(key)
                    .fetch_one(conn.as_mut())
                    .await?
            }
            PgAdvisoryLockKey::IntPair(key1, key2) => {
                crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1, $2)")
                    .bind(key1)
                    .bind(key2)
                    .fetch_one(conn.as_mut())
                    .await?
            }
        };

        Ok((conn, released))
    }

    fn get_release_query(&self) -> &str {
        self.release_query.get_or_init(|| match &self.key {
            PgAdvisoryLockKey::BigInt(key) => format!("SELECT pg_advisory_unlock({key})"),
            PgAdvisoryLockKey::IntPair(key1, key2) => {
                format!("SELECT pg_advisory_unlock({key1}, {key2})")
            }
        })
    }
}

impl PgAdvisoryLockKey {
    /// Converts `Self::Bigint(bigint)` to `Some(bigint)` and all else to `None`.
    pub fn as_bigint(&self) -> Option<i64> {
        if let Self::BigInt(bigint) = self {
            Some(*bigint)
        } else {
            None
        }
    }
}

const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken";

impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
    fn new(lock: &'lock PgAdvisoryLock, conn: C) -> Self {
        PgAdvisoryLockGuard {
            lock,
            conn: Some(conn),
        }
    }

    /// Immediately release the held advisory lock instead of when the connection is next used.
    ///
    /// An error should only be returned if there is something wrong with the connection,
    /// in which case the lock will be automatically released by the connection closing anyway.
    ///
    /// If `pg_advisory_unlock()` returns `false`, a warning will be logged, both by SQLx as
    /// well as the Postgres server. This would only happen if the lock was released without
    /// using this guard, or the connection was swapped using [`std::mem::replace()`].
    pub async fn release_now(mut self) -> Result<C> {
        let (conn, released) = self
            .lock
            .force_release(self.conn.take().expect(NONE_ERR))
            .await?;

        if !released {
            tracing::warn!(
                lock = ?self.lock.key,
                "PgAdvisoryLockGuard: advisory lock was not held by the contained connection",
            );
        }

        Ok(conn)
    }

    /// Cancel the release of the advisory lock, keeping it held until the connection is closed.
    ///
    /// To manually release the lock later, see [`PgAdvisoryLock::force_release()`].
    pub fn leak(mut self) -> C {
        self.conn.take().expect(NONE_ERR)
    }
}

impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<'lock, C> {
    type Target = PgConnection;

    fn deref(&self) -> &Self::Target {
        self.conn.as_ref().expect(NONE_ERR).as_ref()
    }
}

/// Mutable access to the underlying connection is provided so it can still be used like normal,
/// even allowing locks to be taken recursively.
///
/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
/// guard attempts to release the lock.
impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut
    for PgAdvisoryLockGuard<'lock, C>
{
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.conn.as_mut().expect(NONE_ERR).as_mut()
    }
}

impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
    for PgAdvisoryLockGuard<'lock, C>
{
    fn as_ref(&self) -> &PgConnection {
        self.conn.as_ref().expect(NONE_ERR).as_ref()
    }
}

/// Mutable access to the underlying connection is provided so it can still be used like normal,
/// even allowing locks to be taken recursively.
///
/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
/// guard attempts to release the lock.
impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<'lock, C> {
    fn as_mut(&mut self) -> &mut PgConnection {
        self.conn.as_mut().expect(NONE_ERR).as_mut()
    }
}

/// Queues a `pg_advisory_unlock()` call on the wrapped connection which will be flushed
/// to the server the next time it is used, or when it is returned to [`PgPool`][crate::PgPool]
/// in the case of [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
impl<'lock, C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<'lock, C> {
    fn drop(&mut self) {
        if let Some(mut conn) = self.conn.take() {
            // Queue a simple query message to execute next time the connection is used.
            // The `async fn` versions can safely use the prepared statement protocol,
            // but this is the safest way to queue a query to execute on the next opportunity.
            conn.as_mut()
                .queue_simple_query(self.lock.get_release_query());
        }
    }
}