1use crate::{Provider, RootProvider};
4use alloy_consensus::BlockHeader;
5use alloy_json_rpc::RpcError;
6use alloy_network::{BlockResponse, Network};
7use alloy_primitives::{
8 map::{B256HashMap, B256HashSet},
9 TxHash, B256,
10};
11use alloy_transport::{utils::Spawnable, TransportError};
12use futures::{stream::StreamExt, FutureExt, Stream};
13use std::{
14 collections::{BTreeMap, VecDeque},
15 fmt,
16 future::Future,
17 time::Duration,
18};
19use tokio::{
20 select,
21 sync::{mpsc, oneshot, watch},
22};
23
24#[cfg(target_arch = "wasm32")]
25use wasmtimer::{
26 std::Instant,
27 tokio::{interval, sleep_until},
28};
29
30#[cfg(not(target_arch = "wasm32"))]
31use {
32 std::time::Instant,
33 tokio::time::{interval, sleep_until},
34};
35
36#[derive(Debug, thiserror::Error)]
38pub enum PendingTransactionError {
39 #[error("failed to register pending transaction to watch")]
41 FailedToRegister,
42
43 #[error(transparent)]
45 TransportError(#[from] TransportError),
46
47 #[error(transparent)]
49 Recv(#[from] oneshot::error::RecvError),
50
51 #[error(transparent)]
53 TxWatcher(#[from] WatchTxError),
54}
55
56#[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"]
90#[derive(Debug)]
91#[doc(alias = "PendingTxBuilder")]
92pub struct PendingTransactionBuilder<N: Network> {
93 config: PendingTransactionConfig,
94 provider: RootProvider<N>,
95}
96
97impl<N: Network> PendingTransactionBuilder<N> {
98 pub const fn new(provider: RootProvider<N>, tx_hash: TxHash) -> Self {
100 Self::from_config(provider, PendingTransactionConfig::new(tx_hash))
101 }
102
103 pub const fn from_config(provider: RootProvider<N>, config: PendingTransactionConfig) -> Self {
105 Self { config, provider }
106 }
107
108 pub const fn inner(&self) -> &PendingTransactionConfig {
110 &self.config
111 }
112
113 pub fn into_inner(self) -> PendingTransactionConfig {
115 self.config
116 }
117
118 pub const fn provider(&self) -> &RootProvider<N> {
120 &self.provider
121 }
122
123 pub fn split(self) -> (RootProvider<N>, PendingTransactionConfig) {
125 (self.provider, self.config)
126 }
127
128 #[doc(alias = "transaction_hash")]
130 pub const fn tx_hash(&self) -> &TxHash {
131 self.config.tx_hash()
132 }
133
134 #[doc(alias = "set_transaction_hash")]
136 pub fn set_tx_hash(&mut self, tx_hash: TxHash) {
137 self.config.set_tx_hash(tx_hash);
138 }
139
140 #[doc(alias = "with_transaction_hash")]
142 pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
143 self.config.tx_hash = tx_hash;
144 self
145 }
146
147 #[doc(alias = "confirmations")]
149 pub const fn required_confirmations(&self) -> u64 {
150 self.config.required_confirmations()
151 }
152
153 #[doc(alias = "set_confirmations")]
155 pub fn set_required_confirmations(&mut self, confirmations: u64) {
156 self.config.set_required_confirmations(confirmations);
157 }
158
159 #[doc(alias = "with_confirmations")]
161 pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
162 self.config.required_confirmations = confirmations;
163 self
164 }
165
166 pub const fn timeout(&self) -> Option<Duration> {
168 self.config.timeout()
169 }
170
171 pub fn set_timeout(&mut self, timeout: Option<Duration>) {
173 self.config.set_timeout(timeout);
174 }
175
176 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
178 self.config.timeout = timeout;
179 self
180 }
181
182 #[doc(alias = "build")]
192 pub async fn register(self) -> Result<PendingTransaction, PendingTransactionError> {
193 self.provider.watch_pending_transaction(self.config).await
194 }
195
196 pub async fn watch(self) -> Result<TxHash, PendingTransactionError> {
204 self.register().await?.await
205 }
206
207 pub async fn get_receipt(self) -> Result<N::ReceiptResponse, PendingTransactionError> {
219 let hash = self.config.tx_hash;
220 let mut pending_tx = self.provider.watch_pending_transaction(self.config).await?;
221
222 let mut interval = interval(self.provider.client().poll_interval());
225
226 loop {
227 let mut confirmed = false;
228
229 select! {
230 _ = interval.tick() => {},
231 res = &mut pending_tx => {
232 let _ = res?;
233 confirmed = true;
234 }
235 }
236
237 let receipt = self.provider.get_transaction_receipt(hash).await?;
239 if let Some(receipt) = receipt {
240 return Ok(receipt);
241 }
242
243 if confirmed {
244 return Err(RpcError::NullResp.into());
245 }
246 }
247 }
248}
249
250#[must_use = "this type does nothing unless you call `with_provider`"]
255#[derive(Clone, Debug)]
256#[doc(alias = "PendingTxConfig", alias = "TxPendingConfig")]
257pub struct PendingTransactionConfig {
258 #[doc(alias = "transaction_hash")]
260 tx_hash: TxHash,
261
262 required_confirmations: u64,
264
265 timeout: Option<Duration>,
267}
268
269impl PendingTransactionConfig {
270 pub const fn new(tx_hash: TxHash) -> Self {
272 Self { tx_hash, required_confirmations: 1, timeout: None }
273 }
274
275 #[doc(alias = "transaction_hash")]
277 pub const fn tx_hash(&self) -> &TxHash {
278 &self.tx_hash
279 }
280
281 #[doc(alias = "set_transaction_hash")]
283 pub fn set_tx_hash(&mut self, tx_hash: TxHash) {
284 self.tx_hash = tx_hash;
285 }
286
287 #[doc(alias = "with_transaction_hash")]
289 pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
290 self.tx_hash = tx_hash;
291 self
292 }
293
294 #[doc(alias = "confirmations")]
296 pub const fn required_confirmations(&self) -> u64 {
297 self.required_confirmations
298 }
299
300 #[doc(alias = "set_confirmations")]
302 pub fn set_required_confirmations(&mut self, confirmations: u64) {
303 self.required_confirmations = confirmations;
304 }
305
306 #[doc(alias = "with_confirmations")]
308 pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
309 self.required_confirmations = confirmations;
310 self
311 }
312
313 pub const fn timeout(&self) -> Option<Duration> {
315 self.timeout
316 }
317
318 pub fn set_timeout(&mut self, timeout: Option<Duration>) {
320 self.timeout = timeout;
321 }
322
323 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
325 self.timeout = timeout;
326 self
327 }
328
329 pub const fn with_provider<N: Network>(
331 self,
332 provider: RootProvider<N>,
333 ) -> PendingTransactionBuilder<N> {
334 PendingTransactionBuilder::from_config(provider, self)
335 }
336}
337
338#[derive(Debug, thiserror::Error)]
340pub enum WatchTxError {
341 #[error("transaction was not confirmed within the timeout")]
343 Timeout,
344}
345
346#[doc(alias = "TransactionWatcher")]
347struct TxWatcher {
348 config: PendingTransactionConfig,
349 received_at_block: Option<u64>,
352 tx: oneshot::Sender<Result<(), WatchTxError>>,
353}
354
355impl TxWatcher {
356 fn notify(self, result: Result<(), WatchTxError>) {
358 debug!(tx=%self.config.tx_hash, "notifying");
359 let _ = self.tx.send(result);
360 }
361}
362
363#[doc(alias = "PendingTx", alias = "TxPending")]
369pub struct PendingTransaction {
370 #[doc(alias = "transaction_hash")]
372 pub(crate) tx_hash: TxHash,
373 pub(crate) rx: oneshot::Receiver<Result<(), WatchTxError>>,
376}
377
378impl fmt::Debug for PendingTransaction {
379 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380 f.debug_struct("PendingTransaction").field("tx_hash", &self.tx_hash).finish()
381 }
382}
383
384impl PendingTransaction {
385 pub fn ready(tx_hash: TxHash) -> Self {
387 let (tx, rx) = oneshot::channel();
388 tx.send(Ok(())).ok(); Self { tx_hash, rx }
390 }
391
392 #[doc(alias = "transaction_hash")]
394 pub const fn tx_hash(&self) -> &TxHash {
395 &self.tx_hash
396 }
397}
398
399impl Future for PendingTransaction {
400 type Output = Result<TxHash, PendingTransactionError>;
401
402 fn poll(
403 mut self: std::pin::Pin<&mut Self>,
404 cx: &mut std::task::Context<'_>,
405 ) -> std::task::Poll<Self::Output> {
406 self.rx.poll_unpin(cx).map(|res| {
407 res??;
408 Ok(self.tx_hash)
409 })
410 }
411}
412
413#[derive(Clone, Debug)]
415pub(crate) struct HeartbeatHandle<N: Network> {
416 tx: mpsc::Sender<TxWatcher>,
417 latest: watch::Receiver<Option<N::BlockResponse>>,
418}
419
420impl<N: Network> HeartbeatHandle<N> {
421 #[doc(alias = "watch_transaction")]
423 pub(crate) async fn watch_tx(
424 &self,
425 config: PendingTransactionConfig,
426 received_at_block: Option<u64>,
427 ) -> Result<PendingTransaction, PendingTransactionConfig> {
428 let (tx, rx) = oneshot::channel();
429 let tx_hash = config.tx_hash;
430 match self.tx.send(TxWatcher { config, received_at_block, tx }).await {
431 Ok(()) => Ok(PendingTransaction { tx_hash, rx }),
432 Err(e) => Err(e.0.config),
433 }
434 }
435
436 #[allow(dead_code)]
438 pub(crate) const fn latest(&self) -> &watch::Receiver<Option<N::BlockResponse>> {
439 &self.latest
440 }
441}
442
443pub(crate) struct Heartbeat<N, S> {
446 stream: futures::stream::Fuse<S>,
448
449 past_blocks: VecDeque<(u64, B256HashSet)>,
451
452 unconfirmed: B256HashMap<TxWatcher>,
454
455 waiting_confs: BTreeMap<u64, Vec<TxWatcher>>,
457
458 reap_at: BTreeMap<Instant, B256>,
460
461 _network: std::marker::PhantomData<N>,
462}
463
464impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
465 pub(crate) fn new(stream: S) -> Self {
467 Self {
468 stream: stream.fuse(),
469 past_blocks: Default::default(),
470 unconfirmed: Default::default(),
471 waiting_confs: Default::default(),
472 reap_at: Default::default(),
473 _network: Default::default(),
474 }
475 }
476
477 fn check_confirmations(&mut self, current_height: u64) {
479 let to_keep = self.waiting_confs.split_off(&(current_height + 1));
480 let to_notify = std::mem::replace(&mut self.waiting_confs, to_keep);
481 for watcher in to_notify.into_values().flatten() {
482 watcher.notify(Ok(()));
483 }
484 }
485
486 fn next_reap(&self) -> Instant {
489 self.reap_at
490 .first_key_value()
491 .map(|(k, _)| *k)
492 .unwrap_or_else(|| Instant::now() + Duration::from_secs(60_000))
493 }
494
495 fn reap_timeouts(&mut self) {
497 let now = Instant::now();
498 let to_keep = self.reap_at.split_off(&now);
499 let to_reap = std::mem::replace(&mut self.reap_at, to_keep);
500
501 for tx_hash in to_reap.values() {
502 if let Some(watcher) = self.unconfirmed.remove(tx_hash) {
503 debug!(tx=%tx_hash, "reaped");
504 watcher.notify(Err(WatchTxError::Timeout));
505 }
506 }
507 }
508
509 fn move_reorg_to_unconfirmed(&mut self, new_height: u64) {
513 for waiters in self.waiting_confs.values_mut() {
514 *waiters = std::mem::take(waiters).into_iter().filter_map(|watcher| {
515 if let Some(received_at_block) = watcher.received_at_block {
516 if received_at_block >= new_height {
518 let hash = watcher.config.tx_hash;
519 debug!(tx=%hash, %received_at_block, %new_height, "return to unconfirmed due to reorg");
520 self.unconfirmed.insert(hash, watcher);
521 return None;
522 }
523 }
524 Some(watcher)
525 }).collect();
526 }
527 }
528
529 fn handle_watch_ix(&mut self, to_watch: TxWatcher) {
532 debug!(tx=%to_watch.config.tx_hash, "watching");
534 trace!(?to_watch.config, ?to_watch.received_at_block);
535 if let Some(received_at_block) = to_watch.received_at_block {
536 let current_block =
539 self.past_blocks.back().map(|(h, _)| *h).unwrap_or(received_at_block);
540 self.add_to_waiting_list(to_watch, current_block);
541 return;
542 }
543
544 if let Some(timeout) = to_watch.config.timeout {
545 self.reap_at.insert(Instant::now() + timeout, to_watch.config.tx_hash);
546 }
547 for (block_height, txs) in self.past_blocks.iter().rev() {
550 if txs.contains(&to_watch.config.tx_hash) {
551 let confirmations = to_watch.config.required_confirmations;
552 let confirmed_at = *block_height + confirmations - 1;
553 let current_height = self.past_blocks.back().map(|(h, _)| *h).unwrap();
554
555 if confirmed_at <= current_height {
556 to_watch.notify(Ok(()));
557 } else {
558 debug!(tx=%to_watch.config.tx_hash, %block_height, confirmations, "adding to waiting list");
559 self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
560 }
561 return;
562 }
563 }
564
565 self.unconfirmed.insert(to_watch.config.tx_hash, to_watch);
566 }
567
568 fn add_to_waiting_list(&mut self, watcher: TxWatcher, block_height: u64) {
569 let confirmations = watcher.config.required_confirmations;
570 debug!(tx=%watcher.config.tx_hash, %block_height, confirmations, "adding to waiting list");
571 self.waiting_confs.entry(block_height + confirmations - 1).or_default().push(watcher);
572 }
573
574 fn handle_new_block(
578 &mut self,
579 block: N::BlockResponse,
580 latest: &watch::Sender<Option<N::BlockResponse>>,
581 ) {
582 let block_height = block.header().as_ref().number();
584
585 const MAX_BLOCKS_TO_RETAIN: usize = 10;
592 if self.past_blocks.len() >= MAX_BLOCKS_TO_RETAIN {
593 self.past_blocks.pop_front();
594 }
595 if let Some((last_height, _)) = self.past_blocks.back().as_ref() {
596 if *last_height + 1 != block_height {
598 warn!(%block_height, last_height, "reorg detected");
600 self.move_reorg_to_unconfirmed(block_height);
601 self.past_blocks.retain(|(h, _)| *h < block_height);
603 }
604 }
605 self.past_blocks.push_back((block_height, block.transactions().hashes().collect()));
606
607 let to_check: Vec<_> = block
609 .transactions()
610 .hashes()
611 .filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash))
612 .collect();
613 for mut watcher in to_check {
614 let confirmations = watcher.config.required_confirmations;
616 if confirmations <= 1 {
617 watcher.notify(Ok(()));
618 continue;
619 }
620 if let Some(set_block) = watcher.received_at_block {
624 warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set");
625 } else {
627 watcher.received_at_block = Some(block_height);
628 }
629 self.add_to_waiting_list(watcher, block_height);
630 }
631
632 self.check_confirmations(block_height);
633
634 debug!(%block_height, "updating latest block");
638 let _ = latest.send_replace(Some(block));
639 }
640}
641
642#[cfg(target_arch = "wasm32")]
643impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
644 pub(crate) fn spawn(self) -> HeartbeatHandle<N> {
646 let (task, handle) = self.consume();
647 task.spawn_task();
648 handle
649 }
650}
651
652#[cfg(not(target_arch = "wasm32"))]
653impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + Send + 'static> Heartbeat<N, S> {
654 pub(crate) fn spawn(self) -> HeartbeatHandle<N> {
656 let (task, handle) = self.consume();
657 task.spawn_task();
658 handle
659 }
660}
661
662impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
663 fn consume(self) -> (impl Future<Output = ()>, HeartbeatHandle<N>) {
664 let (latest, latest_rx) = watch::channel(None::<N::BlockResponse>);
665 let (ix_tx, ixns) = mpsc::channel(16);
666 (self.into_future(latest, ixns), HeartbeatHandle { tx: ix_tx, latest: latest_rx })
667 }
668
669 async fn into_future(
670 mut self,
671 latest: watch::Sender<Option<N::BlockResponse>>,
672 mut ixns: mpsc::Receiver<TxWatcher>,
673 ) {
674 'shutdown: loop {
675 {
676 let next_reap = self.next_reap();
677 let sleep = std::pin::pin!(sleep_until(next_reap.into()));
678
679 select! {
682 biased;
683
684 ix_opt = ixns.recv() => match ix_opt {
686 Some(to_watch) => self.handle_watch_ix(to_watch),
687 None => break 'shutdown, },
689
690 Some(block) = self.stream.next() => {
692 self.handle_new_block(block, &latest);
693 },
694
695 _ = sleep => {},
698 }
699 }
700
701 self.reap_timeouts();
703 }
704 }
705}