1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use super::{JsonRpcClient, Middleware, PinBoxFut, Provider, ProviderError};
use ethers_core::types::{Filter, Log, U64};
use futures_core::stream::Stream;
use std::{
    collections::VecDeque,
    pin::Pin,
    task::{Context, Poll},
};
use thiserror::Error;

pub struct LogQuery<'a, P> {
    provider: &'a Provider<P>,
    filter: Filter,
    from_block: Option<U64>,
    page_size: u64,
    current_logs: VecDeque<Log>,
    last_block: Option<U64>,
    state: LogQueryState<'a>,
}

enum LogQueryState<'a> {
    Initial,
    LoadLastBlock(PinBoxFut<'a, U64>),
    LoadLogs(PinBoxFut<'a, Vec<Log>>),
    Consume,
}

impl<'a, P> LogQuery<'a, P>
where
    P: JsonRpcClient,
{
    pub fn new(provider: &'a Provider<P>, filter: &Filter) -> Self {
        Self {
            provider,
            filter: filter.clone(),
            from_block: filter.get_from_block(),
            page_size: 10000,
            current_logs: VecDeque::new(),
            last_block: None,
            state: LogQueryState::Initial,
        }
    }

    /// set page size for pagination
    pub fn with_page_size(mut self, page_size: u64) -> Self {
        self.page_size = page_size;
        self
    }
}

macro_rules! rewake_with_new_state {
    ($ctx:ident, $this:ident, $new_state:expr) => {
        $this.state = $new_state;
        $ctx.waker().wake_by_ref();
        return Poll::Pending
    };
}

#[derive(Error, Debug)]
pub enum LogQueryError<E> {
    #[error(transparent)]
    LoadLastBlockError(E),
    #[error(transparent)]
    LoadLogsError(E),
}

impl<'a, P> Stream for LogQuery<'a, P>
where
    P: JsonRpcClient,
{
    type Item = Result<Log, LogQueryError<ProviderError>>;

    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match &mut self.state {
            LogQueryState::Initial => {
                if !self.filter.is_paginatable() {
                    // if not paginatable, load logs and consume
                    let filter = self.filter.clone();
                    let provider = self.provider;
                    let fut = Box::pin(async move { provider.get_logs(&filter).await });
                    rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
                } else {
                    // if paginatable, load last block
                    let fut = self.provider.get_block_number();
                    rewake_with_new_state!(ctx, self, LogQueryState::LoadLastBlock(fut));
                }
            }
            LogQueryState::LoadLastBlock(fut) => {
                match futures_util::ready!(fut.as_mut().poll(ctx)) {
                    Ok(last_block) => {
                        self.last_block = Some(last_block);

                        // this is okay because we will only enter this state when the filter is
                        // paginatable i.e. from block is set
                        let from_block = self.filter.get_from_block().unwrap();
                        let to_block = from_block + self.page_size;
                        self.from_block = Some(to_block + 1);

                        let filter = self.filter.clone().from_block(from_block).to_block(to_block);
                        let provider = self.provider;
                        // load first page of logs
                        let fut = Box::pin(async move { provider.get_logs(&filter).await });
                        rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
                    }
                    Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))),
                }
            }
            LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) {
                Ok(logs) => {
                    self.current_logs = VecDeque::from(logs);
                    rewake_with_new_state!(ctx, self, LogQueryState::Consume);
                }
                Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))),
            },
            LogQueryState::Consume => {
                let log = self.current_logs.pop_front();
                if log.is_none() {
                    // consumed all the logs
                    if !self.filter.is_paginatable() {
                        Poll::Ready(None)
                    } else {
                        // load new logs if there are still more pages to go through
                        // can safely assume this will always be set in this state
                        let from_block = self.from_block.unwrap();
                        let to_block = from_block + self.page_size;

                        // no more pages to load, and everything is consumed
                        // can safely assume this will always be set in this state
                        if from_block > self.last_block.unwrap() {
                            return Poll::Ready(None)
                        }
                        // load next page
                        self.from_block = Some(to_block + 1);

                        let filter = self.filter.clone().from_block(from_block).to_block(to_block);
                        let provider = self.provider;
                        let fut = Box::pin(async move { provider.get_logs(&filter).await });
                        rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
                    }
                } else {
                    Poll::Ready(log.map(Ok))
                }
            }
        }
    }
}