pub struct PgListener { /* private fields */ }
postgres
only.Expand description
A stream of asynchronous notifications from Postgres.
This listener will auto-reconnect. If the active connection being used ever dies, this listener will detect that event, create a new connection, will re-subscribe to all of the originally specified channels, and will resume operations as normal.
Implementations§
Source§impl PgListener
impl PgListener
pub async fn connect(url: &str) -> Result<PgListener, Error>
pub async fn connect_with(pool: &Pool<Postgres>) -> Result<PgListener, Error>
Sourcepub fn ignore_pool_close_event(&mut self, val: bool)
pub fn ignore_pool_close_event(&mut self, val: bool)
Set whether or not to ignore Pool::close_event()
. Defaults to false
.
By default, when Pool::close()
is called on the pool this listener is using
while Self::recv()
or Self::try_recv()
are waiting for a message, the wait is
cancelled and Err(PoolClosed)
is returned.
This is because Pool::close()
will wait until all connections are returned and closed,
including the one being used by this listener.
Otherwise, pool.close().await
would have to wait until PgListener
encountered a
need to acquire a new connection (timeout, error, etc.) and dropped the one it was
currently holding, at which point .recv()
or .try_recv()
would return Err(PoolClosed)
on the attempt to acquire a new connection anyway.
However, if you want PgListener
to ignore the close event and continue waiting for a
message as long as it can, set this to true
.
Does nothing if this was constructed with PgListener::connect()
, as that creates an
internal pool just for the new instance of PgListener
which cannot be closed manually.
Sourcepub fn eager_reconnect(&mut self, val: bool)
pub fn eager_reconnect(&mut self, val: bool)
Set whether a lost connection in try_recv()
should be re-established before it returns
Ok(None)
, or on the next call to try_recv()
.
By default, this is true
and the connection is re-established before returning Ok(None)
.
If this is set to false
then notifications will continue to be lost until the next call
to try_recv()
. If your recovery logic uses a different database connection then
notifications that occur after it completes may be lost without any way to tell that they
have been.
Sourcepub async fn listen(&mut self, channel: &str) -> Result<(), Error>
pub async fn listen(&mut self, channel: &str) -> Result<(), Error>
Starts listening for notifications on a channel. The channel name is quoted here to ensure case sensitivity.
Sourcepub async fn listen_all(
&mut self,
channels: impl IntoIterator<Item = &str>,
) -> Result<(), Error>
pub async fn listen_all( &mut self, channels: impl IntoIterator<Item = &str>, ) -> Result<(), Error>
Starts listening for notifications on all channels.
Sourcepub async fn unlisten(&mut self, channel: &str) -> Result<(), Error>
pub async fn unlisten(&mut self, channel: &str) -> Result<(), Error>
Stops listening for notifications on a channel. The channel name is quoted here to ensure case sensitivity.
Sourcepub async fn unlisten_all(&mut self) -> Result<(), Error>
pub async fn unlisten_all(&mut self) -> Result<(), Error>
Stops listening for notifications on all channels.
Sourcepub async fn recv(&mut self) -> Result<PgNotification, Error>
pub async fn recv(&mut self) -> Result<PgNotification, Error>
Receives the next notification available from any of the subscribed channels.
If the connection to PostgreSQL is lost, it is automatically reconnected on the next
call to recv()
, and should be entirely transparent (as long as it was just an
intermittent network failure or long-lived connection reaper).
As notifications are transient, any received while the connection was lost, will not
be returned. If you’d prefer the reconnection to be explicit and have a chance to
do something before, please see try_recv
.
§Example
let mut listener = PgListener::connect("postgres:// ...").await?;
loop {
// ask for next notification, re-connecting (transparently) if needed
let notification = listener.recv().await?;
// handle notification, do something interesting
}
Sourcepub async fn try_recv(&mut self) -> Result<Option<PgNotification>, Error>
pub async fn try_recv(&mut self) -> Result<Option<PgNotification>, Error>
Receives the next notification available from any of the subscribed channels.
If the connection to PostgreSQL is lost, None
is returned, and the connection is
reconnected either immediately, or on the next call to try_recv()
, depending on
the value of eager_reconnect
.
§Example
loop {
// start handling notifications, connecting if needed
while let Some(notification) = listener.try_recv().await? {
// handle notification
}
// connection lost, do something interesting
}
Sourcepub fn next_buffered(&mut self) -> Option<PgNotification>
pub fn next_buffered(&mut self) -> Option<PgNotification>
Receives the next notification that already exists in the connection buffer, if any.
This is similar to try_recv
, except it will not wait if the connection has not yet received a notification.
This is helpful if you want to retrieve all buffered notifications and process them in batches.
Sourcepub fn into_stream(
self,
) -> impl Stream<Item = Result<PgNotification, Error>> + Unpin
pub fn into_stream( self, ) -> impl Stream<Item = Result<PgNotification, Error>> + Unpin
Consume this listener, returning a Stream
of notifications.
The backing connection will be automatically reconnected should it be lost.
This has the same potential drawbacks as recv
.
Trait Implementations§
Source§impl<'c> Acquire<'c> for &'c mut PgListener
impl<'c> Acquire<'c> for &'c mut PgListener
type Database = Postgres
type Connection = &'c mut PgConnection
fn acquire( self, ) -> Pin<Box<dyn Future<Output = Result<<&'c mut PgListener as Acquire<'c>>::Connection, Error>> + Send + 'c>>
fn begin( self, ) -> Pin<Box<dyn Future<Output = Result<Transaction<'c, <&'c mut PgListener as Acquire<'c>>::Database>, Error>> + Send + 'c>>
Source§impl Debug for PgListener
impl Debug for PgListener
Source§impl Drop for PgListener
impl Drop for PgListener
Source§impl<'c> Executor<'c> for &'c mut PgListener
impl<'c> Executor<'c> for &'c mut PgListener
type Database = Postgres
Source§fn fetch_many<'e, 'q, E>(
self,
query: E,
) -> Pin<Box<dyn Stream<Item = Result<Either<PgQueryResult, PgRow>, Error>> + Send + 'e>>
fn fetch_many<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Stream<Item = Result<Either<PgQueryResult, PgRow>, Error>> + Send + 'e>>
Source§fn fetch_optional<'e, 'q, E>(
self,
query: E,
) -> Pin<Box<dyn Future<Output = Result<Option<PgRow>, Error>> + Send + 'e>>
fn fetch_optional<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result<Option<PgRow>, Error>> + Send + 'e>>
Source§fn prepare_with<'e, 'q>(
self,
query: &'q str,
parameters: &'e [PgTypeInfo],
) -> Pin<Box<dyn Future<Output = Result<PgStatement<'q>, Error>> + Send + 'e>>where
'q: 'e,
'c: 'e,
fn prepare_with<'e, 'q>(
self,
query: &'q str,
parameters: &'e [PgTypeInfo],
) -> Pin<Box<dyn Future<Output = Result<PgStatement<'q>, Error>> + Send + 'e>>where
'q: 'e,
'c: 'e,
Source§fn execute<'e, 'q, E>(
self,
query: E,
) -> Pin<Box<dyn Future<Output = Result<<Self::Database as Database>::QueryResult, Error>> + Send + 'e>>
fn execute<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result<<Self::Database as Database>::QueryResult, Error>> + Send + 'e>>
Source§fn execute_many<'e, 'q, E>(
self,
query: E,
) -> Pin<Box<dyn Stream<Item = Result<<Self::Database as Database>::QueryResult, Error>> + Send + 'e>>
fn execute_many<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Stream<Item = Result<<Self::Database as Database>::QueryResult, Error>> + Send + 'e>>
Source§fn fetch<'e, 'q, E>(
self,
query: E,
) -> Pin<Box<dyn Stream<Item = Result<<Self::Database as Database>::Row, Error>> + Send + 'e>>
fn fetch<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Stream<Item = Result<<Self::Database as Database>::Row, Error>> + Send + 'e>>
Source§fn fetch_all<'e, 'q, E>(
self,
query: E,
) -> Pin<Box<dyn Future<Output = Result<Vec<<Self::Database as Database>::Row>, Error>> + Send + 'e>>
fn fetch_all<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result<Vec<<Self::Database as Database>::Row>, Error>> + Send + 'e>>
Vec
.Auto Trait Implementations§
impl Freeze for PgListener
impl !RefUnwindSafe for PgListener
impl Send for PgListener
impl Sync for PgListener
impl Unpin for PgListener
impl !UnwindSafe for PgListener
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more