1use alloy_network::{Ethereum, Network};
2use alloy_primitives::{BlockNumber, U64};
3use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient};
4use alloy_transport::RpcError;
5use async_stream::stream;
6use futures::{Stream, StreamExt};
7use lru::LruCache;
8use std::{marker::PhantomData, num::NonZeroUsize};
9
10#[cfg(feature = "pubsub")]
11use futures::{future::Either, FutureExt};
12
13const BLOCK_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(10) };
15
16const MAX_RETRIES: usize = 3;
18
19const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX;
21
22pub(crate) struct NewBlocks<N: Network = Ethereum> {
24 client: WeakClient,
25 next_yield: BlockNumber,
29 known_blocks: LruCache<BlockNumber, N::BlockResponse>,
31 _phantom: PhantomData<N>,
32}
33
34impl<N: Network> NewBlocks<N> {
35 pub(crate) fn new(client: WeakClient) -> Self {
36 Self {
37 client,
38 next_yield: NO_BLOCK_NUMBER,
39 known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
40 _phantom: PhantomData,
41 }
42 }
43
44 #[cfg(test)]
45 #[allow(unused)]
46 const fn with_next_yield(mut self, next_yield: u64) -> Self {
47 self.next_yield = next_yield;
48 self
49 }
50
51 pub(crate) fn into_stream(self) -> impl Stream<Item = N::BlockResponse> + 'static {
52 #[cfg(feature = "pubsub")]
54 if let Some(client) = self.client.upgrade() {
55 if client.pubsub_frontend().is_some() {
56 let subscriber = self.into_subscription_stream().map(futures::stream::iter);
57 let subscriber = futures::stream::once(subscriber);
58 return Either::Left(subscriber.flatten().flatten());
59 }
60 }
61
62 #[cfg(feature = "pubsub")]
65 let right = Either::Right;
66 #[cfg(not(feature = "pubsub"))]
67 let right = std::convert::identity;
68 right(self.into_poll_stream())
69 }
70
71 #[cfg(feature = "pubsub")]
72 async fn into_subscription_stream(
73 self,
74 ) -> Option<impl Stream<Item = N::BlockResponse> + 'static> {
75 let Some(client) = self.client.upgrade() else {
76 debug!("client dropped");
77 return None;
78 };
79 let Some(pubsub) = client.pubsub_frontend() else {
80 error!("pubsub_frontend returned None after being Some");
81 return None;
82 };
83 let id = match client.request("eth_subscribe", ("newHeads",)).await {
84 Ok(id) => id,
85 Err(err) => {
86 error!(%err, "failed to subscribe to newHeads");
87 return None;
88 }
89 };
90 let sub = match pubsub.get_subscription(id).await {
91 Ok(sub) => sub,
92 Err(err) => {
93 error!(%err, "failed to get subscription");
94 return None;
95 }
96 };
97 Some(sub.into_typed::<N::BlockResponse>().into_stream())
98 }
99
100 fn into_poll_stream(mut self) -> impl Stream<Item = N::BlockResponse> + 'static {
101 stream! {
102 let poll_task_builder: PollerBuilder<NoParams, U64> =
104 PollerBuilder::new(self.client.clone(), "eth_blockNumber", []);
105 let mut poll_task = poll_task_builder.spawn().into_stream_raw();
106 'task: loop {
107 while let Some(known_block) = self.known_blocks.pop(&self.next_yield) {
109 debug!(number=self.next_yield, "yielding block");
110 self.next_yield += 1;
111 yield known_block;
112 }
113
114 let block_number = match poll_task.next().await {
116 Some(Ok(block_number)) => block_number,
117 Some(Err(err)) => {
118 debug!(%err, "polling stream lagged");
120 continue 'task;
121 }
122 None => {
123 debug!("polling stream ended");
124 break 'task;
125 }
126 };
127 let block_number = block_number.to::<u64>();
128 trace!(%block_number, "got block number");
129 if self.next_yield == NO_BLOCK_NUMBER {
130 assert!(block_number < NO_BLOCK_NUMBER, "too many blocks");
131 self.next_yield = block_number;
132 } else if block_number < self.next_yield {
133 debug!(block_number, self.next_yield, "not advanced yet");
134 continue 'task;
135 }
136
137 let Some(client) = self.client.upgrade() else {
139 debug!("client dropped");
140 break 'task;
141 };
142
143 let mut retries = MAX_RETRIES;
146 for number in self.next_yield..=block_number {
147 debug!(number, "fetching block");
148 let block = match client.request("eth_getBlockByNumber", (U64::from(number), false)).await {
149 Ok(Some(block)) => block,
150 Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
151 debug!(number, %err, "failed to fetch block, retrying");
152 retries -= 1;
153 continue;
154 }
155 Ok(None) if retries > 0 => {
156 debug!(number, "failed to fetch block (doesn't exist), retrying");
157 retries -= 1;
158 continue;
159 }
160 Err(err) => {
161 error!(number, %err, "failed to fetch block");
162 break 'task;
163 }
164 Ok(None) => {
165 error!(number, "failed to fetch block (doesn't exist)");
166 break 'task;
167 }
168 };
169 self.known_blocks.put(number, block);
170 if self.known_blocks.len() == BLOCK_CACHE_SIZE.get() {
171 debug!(number, "cache full");
173 break;
174 }
175 }
176 }
177 }
178 }
179}
180
181#[cfg(all(test, feature = "anvil-api"))] mod tests {
183 use super::*;
184 use crate::{ext::AnvilApi, Provider, ProviderBuilder};
185 use alloy_node_bindings::Anvil;
186 use std::{future::Future, time::Duration};
187
188 async fn timeout<T: Future>(future: T) -> T::Output {
189 try_timeout(future).await.expect("Timeout")
190 }
191
192 async fn try_timeout<T: Future>(future: T) -> Option<T::Output> {
193 tokio::time::timeout(Duration::from_secs(2), future).await.ok()
194 }
195
196 #[tokio::test]
197 async fn yield_block_http() {
198 yield_block(false).await;
199 }
200 #[tokio::test]
201 #[cfg(feature = "ws")]
202 async fn yield_block_ws() {
203 yield_block(true).await;
204 }
205 async fn yield_block(ws: bool) {
206 let anvil = Anvil::new().spawn();
207
208 let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
209 let provider = ProviderBuilder::new().connect(&url).await.unwrap();
210
211 let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
212 let mut stream = Box::pin(new_blocks.into_stream());
213 if ws {
214 let _ = try_timeout(stream.next()).await; }
216
217 provider.anvil_mine(Some(1), None).await.unwrap();
219
220 let block = timeout(stream.next()).await.expect("Block wasn't fetched");
221 assert_eq!(block.header.number, 1);
222 }
223
224 #[tokio::test]
225 async fn yield_many_blocks_http() {
226 yield_many_blocks(false).await;
227 }
228 #[tokio::test]
229 #[cfg(feature = "ws")]
230 async fn yield_many_blocks_ws() {
231 yield_many_blocks(true).await;
232 }
233 async fn yield_many_blocks(ws: bool) {
234 const BLOCKS_TO_MINE: usize = BLOCK_CACHE_SIZE.get() + 1;
236
237 let anvil = Anvil::new().spawn();
238
239 let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
240 let provider = ProviderBuilder::new().connect(&url).await.unwrap();
241
242 let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
243 let mut stream = Box::pin(new_blocks.into_stream());
244 if ws {
245 let _ = try_timeout(stream.next()).await; }
247
248 provider.anvil_mine(Some(BLOCKS_TO_MINE as u64), None).await.unwrap();
250
251 let blocks = timeout(stream.take(BLOCKS_TO_MINE).collect::<Vec<_>>()).await;
252 assert_eq!(blocks.len(), BLOCKS_TO_MINE);
253 let first = blocks[0].header.number;
254 assert_eq!(first, 1);
255 for (i, block) in blocks.iter().enumerate() {
256 assert_eq!(block.header.number, first + i as u64);
257 }
258 }
259}