alloy_provider/
blocks.rs

1use alloy_network::{Ethereum, Network};
2use alloy_primitives::{BlockNumber, U64};
3use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient};
4use alloy_transport::RpcError;
5use async_stream::stream;
6use futures::{Stream, StreamExt};
7use lru::LruCache;
8use std::{marker::PhantomData, num::NonZeroUsize};
9
10#[cfg(feature = "pubsub")]
11use futures::{future::Either, FutureExt};
12
13/// The size of the block cache.
14const BLOCK_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(10) };
15
16/// Maximum number of retries for fetching a block.
17const MAX_RETRIES: usize = 3;
18
19/// Default block number for when we don't have a block yet.
20const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX;
21
22/// Streams new blocks from the client.
23pub(crate) struct NewBlocks<N: Network = Ethereum> {
24    client: WeakClient,
25    /// The next block to yield.
26    /// [`NO_BLOCK_NUMBER`] indicates that it will be updated on the first poll.
27    /// Only used by the polling task.
28    next_yield: BlockNumber,
29    /// LRU cache of known blocks. Only used by the polling task.
30    known_blocks: LruCache<BlockNumber, N::BlockResponse>,
31    _phantom: PhantomData<N>,
32}
33
34impl<N: Network> NewBlocks<N> {
35    pub(crate) fn new(client: WeakClient) -> Self {
36        Self {
37            client,
38            next_yield: NO_BLOCK_NUMBER,
39            known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
40            _phantom: PhantomData,
41        }
42    }
43
44    #[cfg(test)]
45    #[allow(unused)]
46    const fn with_next_yield(mut self, next_yield: u64) -> Self {
47        self.next_yield = next_yield;
48        self
49    }
50
51    pub(crate) fn into_stream(self) -> impl Stream<Item = N::BlockResponse> + 'static {
52        // Return a stream that lazily subscribes to `newHeads` on the first poll.
53        #[cfg(feature = "pubsub")]
54        if let Some(client) = self.client.upgrade() {
55            if client.pubsub_frontend().is_some() {
56                let subscriber = self.into_subscription_stream().map(futures::stream::iter);
57                let subscriber = futures::stream::once(subscriber);
58                return Either::Left(subscriber.flatten().flatten());
59            }
60        }
61
62        // Returns a stream that lazily initializes an `eth_blockNumber` polling task on the first
63        // poll, mapped with `eth_getBlockByNumber`.
64        #[cfg(feature = "pubsub")]
65        let right = Either::Right;
66        #[cfg(not(feature = "pubsub"))]
67        let right = std::convert::identity;
68        right(self.into_poll_stream())
69    }
70
71    #[cfg(feature = "pubsub")]
72    async fn into_subscription_stream(
73        self,
74    ) -> Option<impl Stream<Item = N::BlockResponse> + 'static> {
75        let Some(client) = self.client.upgrade() else {
76            debug!("client dropped");
77            return None;
78        };
79        let Some(pubsub) = client.pubsub_frontend() else {
80            error!("pubsub_frontend returned None after being Some");
81            return None;
82        };
83        let id = match client.request("eth_subscribe", ("newHeads",)).await {
84            Ok(id) => id,
85            Err(err) => {
86                error!(%err, "failed to subscribe to newHeads");
87                return None;
88            }
89        };
90        let sub = match pubsub.get_subscription(id).await {
91            Ok(sub) => sub,
92            Err(err) => {
93                error!(%err, "failed to get subscription");
94                return None;
95            }
96        };
97        Some(sub.into_typed::<N::BlockResponse>().into_stream())
98    }
99
100    fn into_poll_stream(mut self) -> impl Stream<Item = N::BlockResponse> + 'static {
101        stream! {
102        // Spawned lazily on the first `poll`.
103        let poll_task_builder: PollerBuilder<NoParams, U64> =
104            PollerBuilder::new(self.client.clone(), "eth_blockNumber", []);
105        let mut poll_task = poll_task_builder.spawn().into_stream_raw();
106        'task: loop {
107            // Clear any buffered blocks.
108            while let Some(known_block) = self.known_blocks.pop(&self.next_yield) {
109                debug!(number=self.next_yield, "yielding block");
110                self.next_yield += 1;
111                yield known_block;
112            }
113
114            // Get the tip.
115            let block_number = match poll_task.next().await {
116                Some(Ok(block_number)) => block_number,
117                Some(Err(err)) => {
118                    // This is fine.
119                    debug!(%err, "polling stream lagged");
120                    continue 'task;
121                }
122                None => {
123                    debug!("polling stream ended");
124                    break 'task;
125                }
126            };
127            let block_number = block_number.to::<u64>();
128            trace!(%block_number, "got block number");
129            if self.next_yield == NO_BLOCK_NUMBER {
130                assert!(block_number < NO_BLOCK_NUMBER, "too many blocks");
131                self.next_yield = block_number;
132            } else if block_number < self.next_yield {
133                debug!(block_number, self.next_yield, "not advanced yet");
134                continue 'task;
135            }
136
137            // Upgrade the provider.
138            let Some(client) = self.client.upgrade() else {
139                debug!("client dropped");
140                break 'task;
141            };
142
143            // Then try to fill as many blocks as possible.
144            // TODO: Maybe use `join_all`
145            let mut retries = MAX_RETRIES;
146            for number in self.next_yield..=block_number {
147                debug!(number, "fetching block");
148                let block = match client.request("eth_getBlockByNumber", (U64::from(number), false)).await {
149                    Ok(Some(block)) => block,
150                    Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
151                        debug!(number, %err, "failed to fetch block, retrying");
152                        retries -= 1;
153                        continue;
154                    }
155                    Ok(None) if retries > 0 => {
156                        debug!(number, "failed to fetch block (doesn't exist), retrying");
157                        retries -= 1;
158                        continue;
159                    }
160                    Err(err) => {
161                        error!(number, %err, "failed to fetch block");
162                        break 'task;
163                    }
164                    Ok(None) => {
165                        error!(number, "failed to fetch block (doesn't exist)");
166                        break 'task;
167                    }
168                };
169                self.known_blocks.put(number, block);
170                if self.known_blocks.len() == BLOCK_CACHE_SIZE.get() {
171                    // Cache is full, should be consumed before filling more blocks.
172                    debug!(number, "cache full");
173                    break;
174                }
175            }
176        }
177        }
178    }
179}
180
181#[cfg(all(test, feature = "anvil-api"))] // Tests rely heavily on ability to mine blocks on demand.
182mod tests {
183    use super::*;
184    use crate::{ext::AnvilApi, Provider, ProviderBuilder};
185    use alloy_node_bindings::Anvil;
186    use std::{future::Future, time::Duration};
187
188    async fn timeout<T: Future>(future: T) -> T::Output {
189        try_timeout(future).await.expect("Timeout")
190    }
191
192    async fn try_timeout<T: Future>(future: T) -> Option<T::Output> {
193        tokio::time::timeout(Duration::from_secs(2), future).await.ok()
194    }
195
196    #[tokio::test]
197    async fn yield_block_http() {
198        yield_block(false).await;
199    }
200    #[tokio::test]
201    #[cfg(feature = "ws")]
202    async fn yield_block_ws() {
203        yield_block(true).await;
204    }
205    async fn yield_block(ws: bool) {
206        let anvil = Anvil::new().spawn();
207
208        let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
209        let provider = ProviderBuilder::new().connect(&url).await.unwrap();
210
211        let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
212        let mut stream = Box::pin(new_blocks.into_stream());
213        if ws {
214            let _ = try_timeout(stream.next()).await; // Subscribe to newHeads.
215        }
216
217        // We will also use provider to manipulate anvil instance via RPC.
218        provider.anvil_mine(Some(1), None).await.unwrap();
219
220        let block = timeout(stream.next()).await.expect("Block wasn't fetched");
221        assert_eq!(block.header.number, 1);
222    }
223
224    #[tokio::test]
225    async fn yield_many_blocks_http() {
226        yield_many_blocks(false).await;
227    }
228    #[tokio::test]
229    #[cfg(feature = "ws")]
230    async fn yield_many_blocks_ws() {
231        yield_many_blocks(true).await;
232    }
233    async fn yield_many_blocks(ws: bool) {
234        // Make sure that we can process more blocks than fits in the cache.
235        const BLOCKS_TO_MINE: usize = BLOCK_CACHE_SIZE.get() + 1;
236
237        let anvil = Anvil::new().spawn();
238
239        let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
240        let provider = ProviderBuilder::new().connect(&url).await.unwrap();
241
242        let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
243        let mut stream = Box::pin(new_blocks.into_stream());
244        if ws {
245            let _ = try_timeout(stream.next()).await; // Subscribe to newHeads.
246        }
247
248        // We will also use provider to manipulate anvil instance via RPC.
249        provider.anvil_mine(Some(BLOCKS_TO_MINE as u64), None).await.unwrap();
250
251        let blocks = timeout(stream.take(BLOCKS_TO_MINE).collect::<Vec<_>>()).await;
252        assert_eq!(blocks.len(), BLOCKS_TO_MINE);
253        let first = blocks[0].header.number;
254        assert_eq!(first, 1);
255        for (i, block) in blocks.iter().enumerate() {
256            assert_eq!(block.header.number, first + i as u64);
257        }
258    }
259}