1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
use crate::{
utils::{interval, PinBoxFut},
JsonRpcClient, Middleware, Provider,
};
use ethers_core::types::U256;
use futures_core::stream::Stream;
use futures_util::StreamExt;
use pin_project::pin_project;
use serde::{de::DeserializeOwned, Serialize};
use std::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
time::Duration,
vec::IntoIter,
};
/// The default polling interval for filters and pending transactions
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(7000);
/// The polling interval to use for local endpoints, See [`crate::is_local_endpoint()`]
pub const DEFAULT_LOCAL_POLL_INTERVAL: Duration = Duration::from_millis(100);
enum FilterWatcherState<'a, R> {
WaitForInterval,
GetFilterChanges(PinBoxFut<'a, Vec<R>>),
NextItem(IntoIter<R>),
}
#[must_use = "filters do nothing unless you stream them"]
/// Streams data from an installed filter via `eth_getFilterChanges`
#[pin_project]
pub struct FilterWatcher<'a, P, R> {
/// The filter's installed id on the ethereum node
pub id: U256,
pub(crate) provider: &'a Provider<P>,
// The polling interval
interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
/// statemachine driven by the Stream impl
state: FilterWatcherState<'a, R>,
}
impl<'a, P, R> FilterWatcher<'a, P, R>
where
P: JsonRpcClient,
R: Send + Sync + DeserializeOwned,
{
/// Creates a new watcher with the provided factory and filter id.
pub fn new<T: Into<U256>>(id: T, provider: &'a Provider<P>) -> Self {
Self {
id: id.into(),
interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
state: FilterWatcherState::WaitForInterval,
provider,
}
}
/// Sets the stream's polling interval
pub fn interval(mut self, duration: Duration) -> Self {
self.interval = Box::new(interval(duration));
self
}
/// Alias for Box::pin, must be called in order to pin the stream and be able
/// to call `next` on it.
pub fn stream(self) -> Pin<Box<Self>> {
Box::pin(self)
}
}
// Advances the filter's state machine
impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
where
P: JsonRpcClient,
R: Serialize + Send + Sync + DeserializeOwned + Debug + 'a,
{
type Item = R;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let id = *this.id;
loop {
*this.state = match &mut this.state {
FilterWatcherState::WaitForInterval => {
// Wait the polling period
let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));
let fut = Box::pin(this.provider.get_filter_changes(id));
FilterWatcherState::GetFilterChanges(fut)
}
FilterWatcherState::GetFilterChanges(fut) => {
// NOTE: If the provider returns an error, this will return an empty
// vector. Should we make this return a Result instead? Ideally if we're
// in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error).
let items: Vec<R> =
futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
FilterWatcherState::NextItem(items.into_iter())
}
// Consume 1 element from the vector. If more elements are in the vector,
// the next call will immediately go to this branch instead of trying to get
// filter changes again. Once the whole vector is consumed, it will poll again
// for new logs
FilterWatcherState::NextItem(iter) => {
if let item @ Some(_) = iter.next() {
cx.waker().wake_by_ref();
return Poll::Ready(item)
}
FilterWatcherState::WaitForInterval
}
};
}
}
}