1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
use crate::error::Result;
use crate::Either;
use crate::PgConnection;
use hkdf::Hkdf;
use once_cell::sync::OnceCell;
use sha2::Sha256;
use std::ops::{Deref, DerefMut};
/// A mutex-like type utilizing [Postgres advisory locks].
///
/// Advisory locks are a mechanism provided by Postgres to have mutually exclusive or shared
/// locks tracked in the database with application-defined semantics, as opposed to the standard
/// row-level or table-level locks which may not fit all use-cases.
///
/// This API provides a convenient wrapper for generating and storing the integer keys that
/// advisory locks use, as well as RAII guards for releasing advisory locks when they fall out
/// of scope.
///
/// This API only handles session-scoped advisory locks (explicitly locked and unlocked, or
/// automatically released when a connection is closed).
///
/// It is also possible to use transaction-scoped locks but those can be used by beginning a
/// transaction and calling the appropriate lock functions (e.g. `SELECT pg_advisory_xact_lock()`)
/// manually, and cannot be explicitly released, but are automatically released when a transaction
/// ends (is committed or rolled back).
///
/// Session-level locks can be acquired either inside or outside a transaction and are not
/// tied to transaction semantics; a lock acquired inside a transaction is still held when that
/// transaction is committed or rolled back, until explicitly released or the connection is closed.
///
/// Locks can be acquired in either shared or exclusive modes, which can be thought of as read locks
/// and write locks, respectively. Multiple shared locks are allowed for the same key, but a single
/// exclusive lock prevents any other lock being taken for a given key until it is released.
///
/// [Postgres advisory locks]: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
#[derive(Debug, Clone)]
pub struct PgAdvisoryLock {
key: PgAdvisoryLockKey,
/// The query to execute to release this lock.
release_query: OnceCell<String>,
}
/// A key type natively used by Postgres advisory locks.
///
/// Currently, Postgres advisory locks have two different key spaces: one keyed by a single
/// 64-bit integer, and one keyed by a pair of two 32-bit integers. The Postgres docs
/// specify that these key spaces "do not overlap":
///
/// https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
///
/// The documentation for the `pg_locks` system view explains further how advisory locks
/// are treated in Postgres:
///
/// https://www.postgresql.org/docs/current/view-pg-locks.html
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum PgAdvisoryLockKey {
/// The keyspace designated by a single 64-bit integer.
///
/// When [PgAdvisoryLock] is constructed with [::new()][PgAdvisoryLock::new()],
/// this is the keyspace used.
BigInt(i64),
/// The keyspace designated by two 32-bit integers.
IntPair(i32, i32),
}
/// A wrapper for `PgConnection` (or a similar type) that represents a held Postgres advisory lock.
///
/// Can be acquired by [`PgAdvisoryLock::acquire()`] or [`PgAdvisoryLock::try_acquire()`].
/// Released on-drop or via [`Self::release_now()`].
///
/// ### Note: Release-on-drop is not immediate!
/// On drop, this guard queues a `pg_advisory_unlock()` call on the connection which will be
/// flushed to the server the next time it is used, or when it is returned to
/// a [`PgPool`][crate::PgPool] in the case of
/// [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
///
/// This means the lock is not actually released as soon as the guard is dropped. To ensure the
/// lock is eagerly released, you can call [`.release_now().await`][Self::release_now()].
pub struct PgAdvisoryLockGuard<'lock, C: AsMut<PgConnection>> {
lock: &'lock PgAdvisoryLock,
conn: Option<C>,
}
impl PgAdvisoryLock {
/// Construct a `PgAdvisoryLock` using the given string as a key.
///
/// This is intended to make it easier to use an advisory lock by using a human-readable string
/// for a key as opposed to manually generating a unique integer key. The generated integer key
/// is guaranteed to be stable and in the single 64-bit integer keyspace
/// (see [`PgAdvisoryLockKey`] for details).
///
/// This is done by applying the [Hash-based Key Derivation Function (HKDF; IETF RFC 5869)][hkdf]
/// to the bytes of the input string, but in a way that the calculated integer is unlikely
/// to collide with any similar implementations (although we don't currently know of any).
/// See the source of this method for details.
///
/// [hkdf]: https://datatracker.ietf.org/doc/html/rfc5869
/// ### Example
/// ```rust
/// # extern crate sqlx_core as sqlx;
/// use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockKey};
///
/// let lock = PgAdvisoryLock::new("my first Postgres advisory lock!");
/// // Negative values are fine because of how Postgres treats advisory lock keys.
/// // See the documentation for the `pg_locks` system view for details.
/// assert_eq!(lock.key(), &PgAdvisoryLockKey::BigInt(-5560419505042474287));
/// ```
pub fn new(key_string: impl AsRef<str>) -> Self {
let input_key_material = key_string.as_ref();
// HKDF was chosen because it is designed to concentrate the entropy in a variable-length
// input key and produce a higher quality but reduced-length output key with a
// well-specified and reproducible algorithm.
//
// Granted, the input key is usually meant to be pseudorandom and not human readable,
// but we're not trying to produce an unguessable value by any means; just one that's as
// unlikely to already be in use as possible, but still deterministic.
//
// SHA-256 was chosen as the hash function because it's already used in the Postgres driver,
// which should save on codegen and optimization.
// We don't supply a salt as that is intended to be random, but we want a deterministic key.
let hkdf = Hkdf::<Sha256>::new(None, input_key_material.as_bytes());
let mut output_key_material = [0u8; 8];
// The first string is the "info" string of the HKDF which is intended to tie the output
// exclusively to SQLx. This should avoid collisions with implementations using a similar
// strategy. If you _want_ this to match some other implementation then you should get
// the calculated integer key from it and use that directly.
//
// Do *not* change this string as it will affect the output!
hkdf.expand(
b"SQLx (Rust) Postgres advisory lock",
&mut output_key_material,
)
// `Hkdf::expand()` only returns an error if you ask for more than 255 times the digest size.
// This is specified by RFC 5869 but not elaborated upon:
// https://datatracker.ietf.org/doc/html/rfc5869#section-2.3
// Since we're only asking for 8 bytes, this error shouldn't be returned.
.expect("BUG: `output_key_material` should be of acceptable length");
// For ease of use, this method assumes the user doesn't care which keyspace is used.
//
// It doesn't seem likely that someone would care about using the `(int, int)` keyspace
// specifically unless they already had keys to use, in which case they wouldn't
// care about this method. That's why we also provide `with_key()`.
//
// The choice of `from_le_bytes()` is mostly due to x86 being the most popular
// architecture for server software, so it should be a no-op there.
let key = PgAdvisoryLockKey::BigInt(i64::from_le_bytes(output_key_material));
tracing::trace!(
?key,
key_string = ?input_key_material,
"generated key from key string",
);
Self::with_key(key)
}
/// Construct a `PgAdvisoryLock` with a manually supplied key.
pub fn with_key(key: PgAdvisoryLockKey) -> Self {
Self {
key,
release_query: OnceCell::new(),
}
}
/// Returns the current key.
pub fn key(&self) -> &PgAdvisoryLockKey {
&self.key
}
// Why doesn't this use `Acquire`? Well, I tried it and got really useless errors
// about "cannot project lifetimes to parent scope".
//
// It has something to do with how lifetimes work on the `Acquire` trait, I couldn't
// be bothered to figure it out. Probably another issue with a lack of `async fn` in traits
// or lazy normalization.
/// Acquires an exclusive lock using `pg_advisory_lock()`, waiting until the lock is acquired.
///
/// For a version that returns immediately instead of waiting, see [`Self::try_acquire()`].
///
/// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
/// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
/// any of these.
///
/// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
/// which will be executed the next time the connection is used, or when returned to a
/// [`PgPool`][crate::PgPool] in the case of `PoolConnection<Postgres>`.
///
/// Postgres allows a single connection to acquire a given lock more than once without releasing
/// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
/// must match the number of lock operations for the lock to actually be released.
///
/// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
///
/// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
pub async fn acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<PgAdvisoryLockGuard<'_, C>> {
match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query::query("SELECT pg_advisory_lock($1)")
.bind(key)
.execute(conn.as_mut())
.await?;
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query::query("SELECT pg_advisory_lock($1, $2)")
.bind(key1)
.bind(key2)
.execute(conn.as_mut())
.await?;
}
}
Ok(PgAdvisoryLockGuard::new(self, conn))
}
/// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
/// if the lock could not be acquired.
///
/// For a version that waits until the lock is acquired, see [`Self::acquire()`].
///
/// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
/// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
/// any of these. The connection is returned if the lock could not be acquired.
///
/// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
/// which will be executed the next time the connection is used, or when returned to a
/// [`PgPool`][crate::PgPool] in the case of `PoolConnection<Postgres>`.
///
/// Postgres allows a single connection to acquire a given lock more than once without releasing
/// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
/// must match the number of lock operations for the lock to actually be released.
///
/// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
///
/// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
pub async fn try_acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<Either<PgAdvisoryLockGuard<'_, C>, C>> {
let locked: bool = match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)")
.bind(key)
.fetch_one(conn.as_mut())
.await?
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1, $2)")
.bind(key1)
.bind(key2)
.fetch_one(conn.as_mut())
.await?
}
};
if locked {
Ok(Either::Left(PgAdvisoryLockGuard::new(self, conn)))
} else {
Ok(Either::Right(conn))
}
}
/// Execute `pg_advisory_unlock()` for this lock's key on the given connection.
///
/// This is used by [`PgAdvisoryLockGuard::release_now()`] and is also provided for manually
/// releasing the lock from connections returned by [`PgAdvisoryLockGuard::leak()`].
///
/// An error should only be returned if there is something wrong with the connection,
/// in which case the lock will be automatically released by the connection closing anyway.
///
/// The `boolean` value is that returned by `pg_advisory_lock()`. If it is `false`, it
/// indicates that the lock was not actually held by the given connection and that a warning
/// has been logged by the Postgres server.
pub async fn force_release<C: AsMut<PgConnection>>(&self, mut conn: C) -> Result<(C, bool)> {
let released: bool = match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1)")
.bind(key)
.fetch_one(conn.as_mut())
.await?
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1, $2)")
.bind(key1)
.bind(key2)
.fetch_one(conn.as_mut())
.await?
}
};
Ok((conn, released))
}
fn get_release_query(&self) -> &str {
self.release_query.get_or_init(|| match &self.key {
PgAdvisoryLockKey::BigInt(key) => format!("SELECT pg_advisory_unlock({key})"),
PgAdvisoryLockKey::IntPair(key1, key2) => {
format!("SELECT pg_advisory_unlock({key1}, {key2})")
}
})
}
}
impl PgAdvisoryLockKey {
/// Converts `Self::Bigint(bigint)` to `Some(bigint)` and all else to `None`.
pub fn as_bigint(&self) -> Option<i64> {
if let Self::BigInt(bigint) = self {
Some(*bigint)
} else {
None
}
}
}
const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken";
impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
fn new(lock: &'lock PgAdvisoryLock, conn: C) -> Self {
PgAdvisoryLockGuard {
lock,
conn: Some(conn),
}
}
/// Immediately release the held advisory lock instead of when the connection is next used.
///
/// An error should only be returned if there is something wrong with the connection,
/// in which case the lock will be automatically released by the connection closing anyway.
///
/// If `pg_advisory_unlock()` returns `false`, a warning will be logged, both by SQLx as
/// well as the Postgres server. This would only happen if the lock was released without
/// using this guard, or the connection was swapped using [`std::mem::replace()`].
pub async fn release_now(mut self) -> Result<C> {
let (conn, released) = self
.lock
.force_release(self.conn.take().expect(NONE_ERR))
.await?;
if !released {
tracing::warn!(
lock = ?self.lock.key,
"PgAdvisoryLockGuard: advisory lock was not held by the contained connection",
);
}
Ok(conn)
}
/// Cancel the release of the advisory lock, keeping it held until the connection is closed.
///
/// To manually release the lock later, see [`PgAdvisoryLock::force_release()`].
pub fn leak(mut self) -> C {
self.conn.take().expect(NONE_ERR)
}
}
impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<'lock, C> {
type Target = PgConnection;
fn deref(&self) -> &Self::Target {
self.conn.as_ref().expect(NONE_ERR).as_ref()
}
}
/// Mutable access to the underlying connection is provided so it can still be used like normal,
/// even allowing locks to be taken recursively.
///
/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
/// guard attempts to release the lock.
impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut
for PgAdvisoryLockGuard<'lock, C>
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.conn.as_mut().expect(NONE_ERR).as_mut()
}
}
impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
for PgAdvisoryLockGuard<'lock, C>
{
fn as_ref(&self) -> &PgConnection {
self.conn.as_ref().expect(NONE_ERR).as_ref()
}
}
/// Mutable access to the underlying connection is provided so it can still be used like normal,
/// even allowing locks to be taken recursively.
///
/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
/// guard attempts to release the lock.
impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<'lock, C> {
fn as_mut(&mut self) -> &mut PgConnection {
self.conn.as_mut().expect(NONE_ERR).as_mut()
}
}
/// Queues a `pg_advisory_unlock()` call on the wrapped connection which will be flushed
/// to the server the next time it is used, or when it is returned to [`PgPool`][crate::PgPool]
/// in the case of [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
impl<'lock, C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<'lock, C> {
fn drop(&mut self) {
if let Some(mut conn) = self.conn.take() {
// Queue a simple query message to execute next time the connection is used.
// The `async fn` versions can safely use the prepared statement protocol,
// but this is the safest way to queue a query to execute on the next opportunity.
conn.as_mut()
.queue_simple_query(self.lock.get_release_query());
}
}
}