alloy_rpc_client

Struct PollChannel

Source
pub struct PollChannel<Resp> { /* private fields */ }
Expand description

A channel yielding responses from a poller task.

This stream is backed by a coroutine, and will continue to produce responses until the poller task is dropped. The poller task is dropped when all RpcClient instances are dropped, or when all listening PollChannel are dropped.

The poller task also ignores errors from the server and deserialization errors, and will continue to poll until the client is dropped.

Implementations§

Source§

impl<Resp> PollChannel<Resp>
where Resp: RpcReturn + Clone,

Source

pub fn resubscribe(&self) -> Self

Resubscribe to the poller task.

Source

pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin

Converts the poll channel into a stream.

Source

pub fn into_stream_raw(self) -> BroadcastStream<Resp>

Converts the poll channel into a stream that also yields lag errors.

Methods from Deref<Target = Receiver<Resp>>§

Source

pub fn len(&self) -> usize

Returns the number of messages that were sent into the channel and that this Receiver has yet to receive.

If the returned value from len is larger than the next largest power of 2 of the capacity of the channel any call to recv will return an Err(RecvError::Lagged) and any call to try_recv will return an Err(TryRecvError::Lagged), e.g. if the capacity of the channel is 10, recv will start to return Err(RecvError::Lagged) once len returns values larger than 16.

§Examples
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel(16);

    tx.send(10).unwrap();
    tx.send(20).unwrap();

    assert_eq!(rx1.len(), 2);
    assert_eq!(rx1.recv().await.unwrap(), 10);
    assert_eq!(rx1.len(), 1);
    assert_eq!(rx1.recv().await.unwrap(), 20);
    assert_eq!(rx1.len(), 0);
}
Source

pub fn is_empty(&self) -> bool

Returns true if there aren’t any messages in the channel that the Receiver has yet to receive.

§Examples
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel(16);

    assert!(rx1.is_empty());

    tx.send(10).unwrap();
    tx.send(20).unwrap();

    assert!(!rx1.is_empty());
    assert_eq!(rx1.recv().await.unwrap(), 10);
    assert_eq!(rx1.recv().await.unwrap(), 20);
    assert!(rx1.is_empty());
}
Source

pub fn same_channel(&self, other: &Receiver<T>) -> bool

Returns true if receivers belong to the same channel.

§Examples
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, rx) = broadcast::channel::<()>(16);
    let rx2 = tx.subscribe();

    assert!(rx.same_channel(&rx2));

    let (_tx3, rx3) = broadcast::channel::<()>(16);

    assert!(!rx3.same_channel(&rx2));
}
Source

pub fn resubscribe(&self) -> Receiver<T>

Re-subscribes to the channel starting from the current tail element.

This Receiver handle will receive a clone of all values sent after it has resubscribed. This will not include elements that are in the queue of the current receiver. Consider the following example.

§Examples
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
  let (tx, mut rx) = broadcast::channel(2);

  tx.send(1).unwrap();
  let mut rx2 = rx.resubscribe();
  tx.send(2).unwrap();

  assert_eq!(rx2.recv().await.unwrap(), 2);
  assert_eq!(rx.recv().await.unwrap(), 1);
}
Source

pub async fn recv(&mut self) -> Result<T, RecvError>

Receives the next value for this receiver.

Each Receiver handle will receive a clone of all values sent after it has subscribed.

Err(RecvError::Closed) is returned when all Sender halves have dropped, indicating that no further values can be sent on the channel.

If the Receiver handle falls behind, once the channel is full, newly sent values will overwrite old values. At this point, a call to recv will return with Err(RecvError::Lagged) and the Receiver’s internal cursor is updated to point to the oldest value still held by the channel. A subsequent call to recv will return this value unless it has been since overwritten.

§Cancel safety

This method is cancel safe. If recv is used as the event in a tokio::select! statement and some other branch completes first, it is guaranteed that no messages were received on this channel.

§Examples
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel(16);
    let mut rx2 = tx.subscribe();

    tokio::spawn(async move {
        assert_eq!(rx1.recv().await.unwrap(), 10);
        assert_eq!(rx1.recv().await.unwrap(), 20);
    });

    tokio::spawn(async move {
        assert_eq!(rx2.recv().await.unwrap(), 10);
        assert_eq!(rx2.recv().await.unwrap(), 20);
    });

    tx.send(10).unwrap();
    tx.send(20).unwrap();
}

Handling lag

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = broadcast::channel(2);

    tx.send(10).unwrap();
    tx.send(20).unwrap();
    tx.send(30).unwrap();

    // The receiver lagged behind
    assert!(rx.recv().await.is_err());

    // At this point, we can abort or continue with lost messages

    assert_eq!(20, rx.recv().await.unwrap());
    assert_eq!(30, rx.recv().await.unwrap());
}
Source

pub fn try_recv(&mut self) -> Result<T, TryRecvError>

Attempts to return a pending value on this receiver without awaiting.

This is useful for a flavor of “optimistic check” before deciding to await on a receiver.

Compared with recv, this function has three failure cases instead of two (one for closed, one for an empty buffer, one for a lagging receiver).

Err(TryRecvError::Closed) is returned when all Sender halves have dropped, indicating that no further values can be sent on the channel.

If the Receiver handle falls behind, once the channel is full, newly sent values will overwrite old values. At this point, a call to recv will return with Err(TryRecvError::Lagged) and the Receiver’s internal cursor is updated to point to the oldest value still held by the channel. A subsequent call to try_recv will return this value unless it has been since overwritten. If there are no values to receive, Err(TryRecvError::Empty) is returned.

§Examples
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = broadcast::channel(16);

    assert!(rx.try_recv().is_err());

    tx.send(10).unwrap();

    let value = rx.try_recv().unwrap();
    assert_eq!(10, value);
}
Source

pub fn blocking_recv(&mut self) -> Result<T, RecvError>

Blocking receive to call outside of asynchronous contexts.

§Panics

This function panics if called within an asynchronous execution context.

§Examples
use std::thread;
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = broadcast::channel(16);

    let sync_code = thread::spawn(move || {
        assert_eq!(rx.blocking_recv(), Ok(10));
    });

    let _ = tx.send(10);
    sync_code.join().unwrap();
}

Trait Implementations§

Source§

impl<Resp: Debug> Debug for PollChannel<Resp>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<Resp> Deref for PollChannel<Resp>

Source§

type Target = Receiver<Resp>

The resulting type after dereferencing.
Source§

fn deref(&self) -> &Self::Target

Dereferences the value.
Source§

impl<Resp> DerefMut for PollChannel<Resp>

Source§

fn deref_mut(&mut self) -> &mut Self::Target

Mutably dereferences the value.
Source§

impl<Resp> From<Receiver<Resp>> for PollChannel<Resp>

Source§

fn from(rx: Receiver<Resp>) -> Self

Converts to this type from the input type.

Auto Trait Implementations§

§

impl<Resp> Freeze for PollChannel<Resp>

§

impl<Resp> RefUnwindSafe for PollChannel<Resp>

§

impl<Resp> Send for PollChannel<Resp>
where Resp: Send,

§

impl<Resp> Sync for PollChannel<Resp>
where Resp: Send,

§

impl<Resp> Unpin for PollChannel<Resp>

§

impl<Resp> UnwindSafe for PollChannel<Resp>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> MaybeSendSync for T