pub struct PgListener { /* private fields */ }
A stream of asynchronous notifications from Postgres.

This listener will auto-reconnect. If the active connection being used ever dies, this listener will detect that event, create a new connection, will re-subscribe to all of the originally specified channels, and will resume operations as normal.



impl PgListener


pub async fn connect(url: &str) -> Result<Self, Error>


pub async fn connect_with(pool: &Pool<Postgres>) -> Result<Self, Error>


pub fn ignore_pool_close_event(&mut self, val: bool)

Set whether or not to ignore Pool::close_event(). Defaults to false.

By default, when Pool::close() is called on the pool this listener is using while Self::recv() or Self::try_recv() are waiting for a message, the wait is cancelled and Err(PoolClosed) is returned.

This is because Pool::close() will wait until all connections are returned and closed, including the one being used by this listener.

Otherwise, pool.close().await would have to wait until PgListener encountered a need to acquire a new connection (timeout, error, etc.) and dropped the one it was currently holding, at which point .recv() or .try_recv() would return Err(PoolClosed) on the attempt to acquire a new connection anyway.

However, if you want PgListener to ignore the close event and continue waiting for a message as long as it can, set this to true.

Does nothing if this was constructed with PgListener::connect(), as that creates an internal pool just for the new instance of PgListener which cannot be closed manually.


pub async fn listen(&mut self, channel: &str) -> Result<(), Error>

Starts listening for notifications on a channel. The channel name is quoted here to ensure case sensitivity.


pub async fn listen_all( &mut self, channels: impl IntoIterator<Item = &str> ) -> Result<(), Error>

Starts listening for notifications on all channels.


pub async fn unlisten(&mut self, channel: &str) -> Result<(), Error>

Stops listening for notifications on a channel. The channel name is quoted here to ensure case sensitivity.


pub async fn unlisten_all(&mut self) -> Result<(), Error>

Stops listening for notifications on all channels.


pub async fn recv(&mut self) -> Result<PgNotification, Error>

Receives the next notification available from any of the subscribed channels.

If the connection to PostgreSQL is lost, it is automatically reconnected on the next call to recv(), and should be entirely transparent (as long as it was just an intermittent network failure or long-lived connection reaper).

As notifications are transient, any received while the connection was lost, will not be returned. If you’d prefer the reconnection to be explicit and have a chance to do something before, please see try_recv.

loop {
    // ask for next notification, re-connecting (transparently) if needed
    let notification = listener.recv().await?;

    // handle notification, do something interesting

pub async fn try_recv(&mut self) -> Result<Option<PgNotification>, Error>

Receives the next notification available from any of the subscribed channels.

If the connection to PostgreSQL is lost, None is returned, and the connection is reconnected on the next call to try_recv().

loop {
    // start handling notifications, connecting if needed
    while let Some(notification) = listener.try_recv().await? {
        // handle notification

    // connection lost, do something interesting

pub fn into_stream( self ) -> impl Stream<Item = Result<PgNotification, Error>> + Unpin

Consume this listener, returning a Stream of notifications.

The backing connection will be automatically reconnected should it be lost.

This has the same potential drawbacks as recv.

Trait Implementations§


impl Debug for PgListener


fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

impl Drop for PgListener


fn drop(&mut self)

Executes the destructor for this type. Read more

impl<'c> Executor<'c> for &'c mut PgListener


type Database = Postgres


fn fetch_many<'e, 'q: 'e, E>( self, query: E ) -> BoxStream<'e, Result<Either<PgQueryResult, PgRow>, Error>>
where E: Execute<'q, Self::Database> + 'q, 'c: 'e,

Execute multiple queries and return the generated results as a stream from each query, in a stream.

fn fetch_optional<'e, 'q: 'e, E>( self, query: E ) -> BoxFuture<'e, Result<Option<PgRow>, Error>>
where E: Execute<'q, Self::Database> + 'q, 'c: 'e,

Execute the query and returns at most one row.

fn prepare_with<'e, 'q: 'e>( self, query: &'q str, parameters: &'e [PgTypeInfo] ) -> BoxFuture<'e, Result<PgStatement<'q>, Error>>
where 'c: 'e,

Prepare the SQL query, with parameter type information, to inspect the type information about its parameters and results. Read more

fn execute<'e, 'q, E>( self, query: E ) -> Pin<Box<dyn Future<Output = Result<<Self::Database as Database>::QueryResult, Error>> + Send + 'e>>
where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>,

Execute the query and return the total number of rows affected.

fn execute_many<'e, 'q, E>( self, query: E ) -> Pin<Box<dyn Stream<Item = Result<<Self::Database as Database>::QueryResult, Error>> + Send + 'e>>
where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>,

Execute multiple queries and return the rows affected from each query, in a stream.

fn fetch<'e, 'q, E>( self, query: E ) -> Pin<Box<dyn Stream<Item = Result<<Self::Database as Database>::Row, Error>> + Send + 'e>>
where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>,

Execute the query and return the generated results as a stream.

fn fetch_all<'e, 'q, E>( self, query: E ) -> Pin<Box<dyn Future<Output = Result<Vec<<Self::Database as Database>::Row>, Error>> + Send + 'e>>
where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>,

Execute the query and return all the generated results, collected into a Vec.

fn fetch_one<'e, 'q, E>( self, query: E ) -> Pin<Box<dyn Future<Output = Result<<Self::Database as Database>::Row, Error>> + Send + 'e>>
where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>,

Execute the query and returns exactly one row.

fn prepare<'e, 'q>( self, query: &'q str ) -> Pin<Box<dyn Future<Output = Result<<Self::Database as HasStatement<'q>>::Statement, Error>> + Send + 'e>>
where 'q: 'e, 'c: 'e,

Prepare the SQL query to inspect the type information of its parameters and results. Read more

