alloy_provider/
heart.rs

1//! Block heartbeat and pending transaction watcher.
2
3use crate::{Provider, RootProvider};
4use alloy_consensus::BlockHeader;
5use alloy_json_rpc::RpcError;
6use alloy_network::{BlockResponse, Network};
7use alloy_primitives::{
8    map::{B256HashMap, B256HashSet},
9    TxHash, B256,
10};
11use alloy_transport::{utils::Spawnable, TransportError};
12use futures::{stream::StreamExt, FutureExt, Stream};
13use std::{
14    collections::{BTreeMap, VecDeque},
15    fmt,
16    future::Future,
17    time::Duration,
18};
19use tokio::{
20    select,
21    sync::{mpsc, oneshot, watch},
22};
23
24#[cfg(target_arch = "wasm32")]
25use wasmtimer::{
26    std::Instant,
27    tokio::{interval, sleep_until},
28};
29
30#[cfg(not(target_arch = "wasm32"))]
31use {
32    std::time::Instant,
33    tokio::time::{interval, sleep_until},
34};
35
36/// Errors which may occur when watching a pending transaction.
37#[derive(Debug, thiserror::Error)]
38pub enum PendingTransactionError {
39    /// Failed to register pending transaction in heartbeat.
40    #[error("failed to register pending transaction to watch")]
41    FailedToRegister,
42
43    /// Underlying transport error.
44    #[error(transparent)]
45    TransportError(#[from] TransportError),
46
47    /// Error occured while getting response from the heartbeat.
48    #[error(transparent)]
49    Recv(#[from] oneshot::error::RecvError),
50
51    /// Errors that may occur when watching a transaction.
52    #[error(transparent)]
53    TxWatcher(#[from] WatchTxError),
54}
55
56/// A builder for configuring a pending transaction watcher.
57///
58/// # Examples
59///
60/// Send and wait for a transaction to be confirmed 2 times, with a timeout of 60 seconds:
61///
62/// ```no_run
63/// # async fn example<N: alloy_network::Network>(provider: impl alloy_provider::Provider, tx: alloy_rpc_types_eth::transaction::TransactionRequest) -> Result<(), Box<dyn std::error::Error>> {
64/// // Send a transaction, and configure the pending transaction.
65/// let builder = provider.send_transaction(tx)
66///     .await?
67///     .with_required_confirmations(2)
68///     .with_timeout(Some(std::time::Duration::from_secs(60)));
69/// // Register the pending transaction with the provider.
70/// let pending_tx = builder.register().await?;
71/// // Wait for the transaction to be confirmed 2 times.
72/// let tx_hash = pending_tx.await?;
73/// # Ok(())
74/// # }
75/// ```
76///
77/// This can also be more concisely written using `watch`:
78/// ```no_run
79/// # async fn example<N: alloy_network::Network>(provider: impl alloy_provider::Provider, tx: alloy_rpc_types_eth::transaction::TransactionRequest) -> Result<(), Box<dyn std::error::Error>> {
80/// let tx_hash = provider.send_transaction(tx)
81///     .await?
82///     .with_required_confirmations(2)
83///     .with_timeout(Some(std::time::Duration::from_secs(60)))
84///     .watch()
85///     .await?;
86/// # Ok(())
87/// # }
88/// ```
89#[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"]
90#[derive(Debug)]
91#[doc(alias = "PendingTxBuilder")]
92pub struct PendingTransactionBuilder<N: Network> {
93    config: PendingTransactionConfig,
94    provider: RootProvider<N>,
95}
96
97impl<N: Network> PendingTransactionBuilder<N> {
98    /// Creates a new pending transaction builder.
99    pub const fn new(provider: RootProvider<N>, tx_hash: TxHash) -> Self {
100        Self::from_config(provider, PendingTransactionConfig::new(tx_hash))
101    }
102
103    /// Creates a new pending transaction builder from the given configuration.
104    pub const fn from_config(provider: RootProvider<N>, config: PendingTransactionConfig) -> Self {
105        Self { config, provider }
106    }
107
108    /// Returns the inner configuration.
109    pub const fn inner(&self) -> &PendingTransactionConfig {
110        &self.config
111    }
112
113    /// Consumes this builder, returning the inner configuration.
114    pub fn into_inner(self) -> PendingTransactionConfig {
115        self.config
116    }
117
118    /// Returns the provider.
119    pub const fn provider(&self) -> &RootProvider<N> {
120        &self.provider
121    }
122
123    /// Consumes this builder, returning the provider and the configuration.
124    pub fn split(self) -> (RootProvider<N>, PendingTransactionConfig) {
125        (self.provider, self.config)
126    }
127
128    /// Returns the transaction hash.
129    #[doc(alias = "transaction_hash")]
130    pub const fn tx_hash(&self) -> &TxHash {
131        self.config.tx_hash()
132    }
133
134    /// Sets the transaction hash.
135    #[doc(alias = "set_transaction_hash")]
136    pub fn set_tx_hash(&mut self, tx_hash: TxHash) {
137        self.config.set_tx_hash(tx_hash);
138    }
139
140    /// Sets the transaction hash.
141    #[doc(alias = "with_transaction_hash")]
142    pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
143        self.config.tx_hash = tx_hash;
144        self
145    }
146
147    /// Returns the number of confirmations to wait for.
148    #[doc(alias = "confirmations")]
149    pub const fn required_confirmations(&self) -> u64 {
150        self.config.required_confirmations()
151    }
152
153    /// Sets the number of confirmations to wait for.
154    #[doc(alias = "set_confirmations")]
155    pub fn set_required_confirmations(&mut self, confirmations: u64) {
156        self.config.set_required_confirmations(confirmations);
157    }
158
159    /// Sets the number of confirmations to wait for.
160    #[doc(alias = "with_confirmations")]
161    pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
162        self.config.required_confirmations = confirmations;
163        self
164    }
165
166    /// Returns the timeout.
167    pub const fn timeout(&self) -> Option<Duration> {
168        self.config.timeout()
169    }
170
171    /// Sets the timeout.
172    pub fn set_timeout(&mut self, timeout: Option<Duration>) {
173        self.config.set_timeout(timeout);
174    }
175
176    /// Sets the timeout.
177    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
178        self.config.timeout = timeout;
179        self
180    }
181
182    /// Registers the watching configuration with the provider.
183    ///
184    /// This does not wait for the transaction to be confirmed, but returns a [`PendingTransaction`]
185    /// that can be awaited at a later moment.
186    ///
187    /// See:
188    /// - [`watch`](Self::watch) for watching the transaction without fetching the receipt.
189    /// - [`get_receipt`](Self::get_receipt) for fetching the receipt after the transaction has been
190    ///   confirmed.
191    #[doc(alias = "build")]
192    pub async fn register(self) -> Result<PendingTransaction, PendingTransactionError> {
193        self.provider.watch_pending_transaction(self.config).await
194    }
195
196    /// Waits for the transaction to confirm with the given number of confirmations.
197    ///
198    /// See:
199    /// - [`register`](Self::register): for registering the transaction without waiting for it to be
200    ///   confirmed.
201    /// - [`get_receipt`](Self::get_receipt) for fetching the receipt after the transaction has been
202    ///   confirmed.
203    pub async fn watch(self) -> Result<TxHash, PendingTransactionError> {
204        self.register().await?.await
205    }
206
207    /// Waits for the transaction to confirm with the given number of confirmations, and
208    /// then fetches its receipt.
209    ///
210    /// Note that this method will call `eth_getTransactionReceipt` on the [**root
211    /// provider**](RootProvider), and not on a specific network provider. This means that any
212    /// overrides or customizations made to the network provider will not be used.
213    ///
214    /// See:
215    /// - [`register`](Self::register): for registering the transaction without waiting for it to be
216    ///   confirmed.
217    /// - [`watch`](Self::watch) for watching the transaction without fetching the receipt.
218    pub async fn get_receipt(self) -> Result<N::ReceiptResponse, PendingTransactionError> {
219        let hash = self.config.tx_hash;
220        let mut pending_tx = self.provider.watch_pending_transaction(self.config).await?;
221
222        // FIXME: this is a hotfix to prevent a race condition where the heartbeat would miss the
223        // block the tx was mined in
224        let mut interval = interval(self.provider.client().poll_interval());
225
226        loop {
227            let mut confirmed = false;
228
229            select! {
230                _ = interval.tick() => {},
231                res = &mut pending_tx => {
232                    let _ = res?;
233                    confirmed = true;
234                }
235            }
236
237            // try to fetch the receipt
238            let receipt = self.provider.get_transaction_receipt(hash).await?;
239            if let Some(receipt) = receipt {
240                return Ok(receipt);
241            }
242
243            if confirmed {
244                return Err(RpcError::NullResp.into());
245            }
246        }
247    }
248}
249
250/// Configuration for watching a pending transaction.
251///
252/// This type can be used to create a [`PendingTransactionBuilder`], but in general it is only used
253/// internally.
254#[must_use = "this type does nothing unless you call `with_provider`"]
255#[derive(Clone, Debug)]
256#[doc(alias = "PendingTxConfig", alias = "TxPendingConfig")]
257pub struct PendingTransactionConfig {
258    /// The transaction hash to watch for.
259    #[doc(alias = "transaction_hash")]
260    tx_hash: TxHash,
261
262    /// Require a number of confirmations.
263    required_confirmations: u64,
264
265    /// Optional timeout for the transaction.
266    timeout: Option<Duration>,
267}
268
269impl PendingTransactionConfig {
270    /// Create a new watch for a transaction.
271    pub const fn new(tx_hash: TxHash) -> Self {
272        Self { tx_hash, required_confirmations: 1, timeout: None }
273    }
274
275    /// Returns the transaction hash.
276    #[doc(alias = "transaction_hash")]
277    pub const fn tx_hash(&self) -> &TxHash {
278        &self.tx_hash
279    }
280
281    /// Sets the transaction hash.
282    #[doc(alias = "set_transaction_hash")]
283    pub fn set_tx_hash(&mut self, tx_hash: TxHash) {
284        self.tx_hash = tx_hash;
285    }
286
287    /// Sets the transaction hash.
288    #[doc(alias = "with_transaction_hash")]
289    pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
290        self.tx_hash = tx_hash;
291        self
292    }
293
294    /// Returns the number of confirmations to wait for.
295    #[doc(alias = "confirmations")]
296    pub const fn required_confirmations(&self) -> u64 {
297        self.required_confirmations
298    }
299
300    /// Sets the number of confirmations to wait for.
301    #[doc(alias = "set_confirmations")]
302    pub fn set_required_confirmations(&mut self, confirmations: u64) {
303        self.required_confirmations = confirmations;
304    }
305
306    /// Sets the number of confirmations to wait for.
307    #[doc(alias = "with_confirmations")]
308    pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
309        self.required_confirmations = confirmations;
310        self
311    }
312
313    /// Returns the timeout.
314    pub const fn timeout(&self) -> Option<Duration> {
315        self.timeout
316    }
317
318    /// Sets the timeout.
319    pub fn set_timeout(&mut self, timeout: Option<Duration>) {
320        self.timeout = timeout;
321    }
322
323    /// Sets the timeout.
324    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
325        self.timeout = timeout;
326        self
327    }
328
329    /// Wraps this configuration with a provider to expose watching methods.
330    pub const fn with_provider<N: Network>(
331        self,
332        provider: RootProvider<N>,
333    ) -> PendingTransactionBuilder<N> {
334        PendingTransactionBuilder::from_config(provider, self)
335    }
336}
337
338/// Errors which may occur in heartbeat when watching a transaction.
339#[derive(Debug, thiserror::Error)]
340pub enum WatchTxError {
341    /// Transaction was not confirmed after configured timeout.
342    #[error("transaction was not confirmed within the timeout")]
343    Timeout,
344}
345
346#[doc(alias = "TransactionWatcher")]
347struct TxWatcher {
348    config: PendingTransactionConfig,
349    /// The block at which the transaction was received. To be filled once known.
350    /// Invariant: any confirmed transaction in `Heart` has this value set.
351    received_at_block: Option<u64>,
352    tx: oneshot::Sender<Result<(), WatchTxError>>,
353}
354
355impl TxWatcher {
356    /// Notify the waiter.
357    fn notify(self, result: Result<(), WatchTxError>) {
358        debug!(tx=%self.config.tx_hash, "notifying");
359        let _ = self.tx.send(result);
360    }
361}
362
363/// Represents a transaction that is yet to be confirmed a specified number of times.
364///
365/// This struct is a future created by [`PendingTransactionBuilder`] that resolves to the
366/// transaction hash once the underlying transaction has been confirmed the specified number of
367/// times in the network.
368#[doc(alias = "PendingTx", alias = "TxPending")]
369pub struct PendingTransaction {
370    /// The transaction hash.
371    #[doc(alias = "transaction_hash")]
372    pub(crate) tx_hash: TxHash,
373    /// The receiver for the notification.
374    // TODO: send a receipt?
375    pub(crate) rx: oneshot::Receiver<Result<(), WatchTxError>>,
376}
377
378impl fmt::Debug for PendingTransaction {
379    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380        f.debug_struct("PendingTransaction").field("tx_hash", &self.tx_hash).finish()
381    }
382}
383
384impl PendingTransaction {
385    /// Creates a ready pending transaction.
386    pub fn ready(tx_hash: TxHash) -> Self {
387        let (tx, rx) = oneshot::channel();
388        tx.send(Ok(())).ok(); // Make sure that the receiver is notified already.
389        Self { tx_hash, rx }
390    }
391
392    /// Returns this transaction's hash.
393    #[doc(alias = "transaction_hash")]
394    pub const fn tx_hash(&self) -> &TxHash {
395        &self.tx_hash
396    }
397}
398
399impl Future for PendingTransaction {
400    type Output = Result<TxHash, PendingTransactionError>;
401
402    fn poll(
403        mut self: std::pin::Pin<&mut Self>,
404        cx: &mut std::task::Context<'_>,
405    ) -> std::task::Poll<Self::Output> {
406        self.rx.poll_unpin(cx).map(|res| {
407            res??;
408            Ok(self.tx_hash)
409        })
410    }
411}
412
413/// A handle to the heartbeat task.
414#[derive(Clone, Debug)]
415pub(crate) struct HeartbeatHandle<N: Network> {
416    tx: mpsc::Sender<TxWatcher>,
417    latest: watch::Receiver<Option<N::BlockResponse>>,
418}
419
420impl<N: Network> HeartbeatHandle<N> {
421    /// Watch for a transaction to be confirmed with the given config.
422    #[doc(alias = "watch_transaction")]
423    pub(crate) async fn watch_tx(
424        &self,
425        config: PendingTransactionConfig,
426        received_at_block: Option<u64>,
427    ) -> Result<PendingTransaction, PendingTransactionConfig> {
428        let (tx, rx) = oneshot::channel();
429        let tx_hash = config.tx_hash;
430        match self.tx.send(TxWatcher { config, received_at_block, tx }).await {
431            Ok(()) => Ok(PendingTransaction { tx_hash, rx }),
432            Err(e) => Err(e.0.config),
433        }
434    }
435
436    /// Returns a watcher that always sees the latest block.
437    #[allow(dead_code)]
438    pub(crate) const fn latest(&self) -> &watch::Receiver<Option<N::BlockResponse>> {
439        &self.latest
440    }
441}
442
443// TODO: Parameterize with `Network`
444/// A heartbeat task that receives blocks and watches for transactions.
445pub(crate) struct Heartbeat<N, S> {
446    /// The stream of incoming blocks to watch.
447    stream: futures::stream::Fuse<S>,
448
449    /// Lookbehind blocks in form of mapping block number -> vector of transaction hashes.
450    past_blocks: VecDeque<(u64, B256HashSet)>,
451
452    /// Transactions to watch for.
453    unconfirmed: B256HashMap<TxWatcher>,
454
455    /// Ordered map of transactions waiting for confirmations.
456    waiting_confs: BTreeMap<u64, Vec<TxWatcher>>,
457
458    /// Ordered map of transactions to reap at a certain time.
459    reap_at: BTreeMap<Instant, B256>,
460
461    _network: std::marker::PhantomData<N>,
462}
463
464impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
465    /// Create a new heartbeat task.
466    pub(crate) fn new(stream: S) -> Self {
467        Self {
468            stream: stream.fuse(),
469            past_blocks: Default::default(),
470            unconfirmed: Default::default(),
471            waiting_confs: Default::default(),
472            reap_at: Default::default(),
473            _network: Default::default(),
474        }
475    }
476
477    /// Check if any transactions have enough confirmations to notify.
478    fn check_confirmations(&mut self, current_height: u64) {
479        let to_keep = self.waiting_confs.split_off(&(current_height + 1));
480        let to_notify = std::mem::replace(&mut self.waiting_confs, to_keep);
481        for watcher in to_notify.into_values().flatten() {
482            watcher.notify(Ok(()));
483        }
484    }
485
486    /// Get the next time to reap a transaction. If no reaps, this is a very
487    /// long time from now (i.e. will not be woken).
488    fn next_reap(&self) -> Instant {
489        self.reap_at
490            .first_key_value()
491            .map(|(k, _)| *k)
492            .unwrap_or_else(|| Instant::now() + Duration::from_secs(60_000))
493    }
494
495    /// Reap any timeout
496    fn reap_timeouts(&mut self) {
497        let now = Instant::now();
498        let to_keep = self.reap_at.split_off(&now);
499        let to_reap = std::mem::replace(&mut self.reap_at, to_keep);
500
501        for tx_hash in to_reap.values() {
502            if let Some(watcher) = self.unconfirmed.remove(tx_hash) {
503                debug!(tx=%tx_hash, "reaped");
504                watcher.notify(Err(WatchTxError::Timeout));
505            }
506        }
507    }
508
509    /// Reap transactions overridden by the reorg.
510    /// Accepts new chain height as an argument, and drops any subscriptions
511    /// that were received in blocks affected by the reorg (e.g. >= new_height).
512    fn move_reorg_to_unconfirmed(&mut self, new_height: u64) {
513        for waiters in self.waiting_confs.values_mut() {
514            *waiters = std::mem::take(waiters).into_iter().filter_map(|watcher| {
515                if let Some(received_at_block) = watcher.received_at_block {
516                    // All blocks after and _including_ the new height are reaped.
517                    if received_at_block >= new_height {
518                        let hash = watcher.config.tx_hash;
519                        debug!(tx=%hash, %received_at_block, %new_height, "return to unconfirmed due to reorg");
520                        self.unconfirmed.insert(hash, watcher);
521                        return None;
522                    }
523                }
524                Some(watcher)
525            }).collect();
526        }
527    }
528
529    /// Handle a watch instruction by adding it to the watch list, and
530    /// potentially adding it to our `reap_at` list.
531    fn handle_watch_ix(&mut self, to_watch: TxWatcher) {
532        // Start watching for the transaction.
533        debug!(tx=%to_watch.config.tx_hash, "watching");
534        trace!(?to_watch.config, ?to_watch.received_at_block);
535        if let Some(received_at_block) = to_watch.received_at_block {
536            // Transaction is already confirmed, we just need to wait for the required
537            // confirmations.
538            let current_block =
539                self.past_blocks.back().map(|(h, _)| *h).unwrap_or(received_at_block);
540            self.add_to_waiting_list(to_watch, current_block);
541            return;
542        }
543
544        if let Some(timeout) = to_watch.config.timeout {
545            self.reap_at.insert(Instant::now() + timeout, to_watch.config.tx_hash);
546        }
547        // Transaction may be confirmed already, check the lookbehind history first.
548        // If so, insert it into the waiting list.
549        for (block_height, txs) in self.past_blocks.iter().rev() {
550            if txs.contains(&to_watch.config.tx_hash) {
551                let confirmations = to_watch.config.required_confirmations;
552                let confirmed_at = *block_height + confirmations - 1;
553                let current_height = self.past_blocks.back().map(|(h, _)| *h).unwrap();
554
555                if confirmed_at <= current_height {
556                    to_watch.notify(Ok(()));
557                } else {
558                    debug!(tx=%to_watch.config.tx_hash, %block_height, confirmations, "adding to waiting list");
559                    self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
560                }
561                return;
562            }
563        }
564
565        self.unconfirmed.insert(to_watch.config.tx_hash, to_watch);
566    }
567
568    fn add_to_waiting_list(&mut self, watcher: TxWatcher, block_height: u64) {
569        let confirmations = watcher.config.required_confirmations;
570        debug!(tx=%watcher.config.tx_hash, %block_height, confirmations, "adding to waiting list");
571        self.waiting_confs.entry(block_height + confirmations - 1).or_default().push(watcher);
572    }
573
574    /// Handle a new block by checking if any of the transactions we're
575    /// watching are in it, and if so, notifying the watcher. Also updates
576    /// the latest block.
577    fn handle_new_block(
578        &mut self,
579        block: N::BlockResponse,
580        latest: &watch::Sender<Option<N::BlockResponse>>,
581    ) {
582        // Blocks without numbers are ignored, as they're not part of the chain.
583        let block_height = block.header().as_ref().number();
584
585        // Add the block the lookbehind.
586        // The value is chosen arbitrarily to not have a huge memory footprint but still
587        // catch most cases where user subscribes for an already mined transaction.
588        // Note that we expect provider to check whether transaction is already mined
589        // before subscribing, so here we only need to consider time before sending a notification
590        // and processing it.
591        const MAX_BLOCKS_TO_RETAIN: usize = 10;
592        if self.past_blocks.len() >= MAX_BLOCKS_TO_RETAIN {
593            self.past_blocks.pop_front();
594        }
595        if let Some((last_height, _)) = self.past_blocks.back().as_ref() {
596            // Check that the chain is continuous.
597            if *last_height + 1 != block_height {
598                // Move all the transactions that were reset by the reorg to the unconfirmed list.
599                warn!(%block_height, last_height, "reorg detected");
600                self.move_reorg_to_unconfirmed(block_height);
601                // Remove past blocks that are now invalid.
602                self.past_blocks.retain(|(h, _)| *h < block_height);
603            }
604        }
605        self.past_blocks.push_back((block_height, block.transactions().hashes().collect()));
606
607        // Check if we are watching for any of the transactions in this block.
608        let to_check: Vec<_> = block
609            .transactions()
610            .hashes()
611            .filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash))
612            .collect();
613        for mut watcher in to_check {
614            // If `confirmations` is not more than 1 we can notify the watcher immediately.
615            let confirmations = watcher.config.required_confirmations;
616            if confirmations <= 1 {
617                watcher.notify(Ok(()));
618                continue;
619            }
620            // Otherwise add it to the waiting list.
621
622            // Set the block at which the transaction was received.
623            if let Some(set_block) = watcher.received_at_block {
624                warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set");
625                // We don't override the set value.
626            } else {
627                watcher.received_at_block = Some(block_height);
628            }
629            self.add_to_waiting_list(watcher, block_height);
630        }
631
632        self.check_confirmations(block_height);
633
634        // Update the latest block. We use `send_replace` here to ensure the
635        // latest block is always up to date, even if no receivers exist.
636        // C.f. https://docs.rs/tokio/latest/tokio/sync/watch/struct.Sender.html#method.send
637        debug!(%block_height, "updating latest block");
638        let _ = latest.send_replace(Some(block));
639    }
640}
641
642#[cfg(target_arch = "wasm32")]
643impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
644    /// Spawn the heartbeat task, returning a [`HeartbeatHandle`].
645    pub(crate) fn spawn(self) -> HeartbeatHandle<N> {
646        let (task, handle) = self.consume();
647        task.spawn_task();
648        handle
649    }
650}
651
652#[cfg(not(target_arch = "wasm32"))]
653impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + Send + 'static> Heartbeat<N, S> {
654    /// Spawn the heartbeat task, returning a [`HeartbeatHandle`].
655    pub(crate) fn spawn(self) -> HeartbeatHandle<N> {
656        let (task, handle) = self.consume();
657        task.spawn_task();
658        handle
659    }
660}
661
662impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
663    fn consume(self) -> (impl Future<Output = ()>, HeartbeatHandle<N>) {
664        let (latest, latest_rx) = watch::channel(None::<N::BlockResponse>);
665        let (ix_tx, ixns) = mpsc::channel(16);
666        (self.into_future(latest, ixns), HeartbeatHandle { tx: ix_tx, latest: latest_rx })
667    }
668
669    async fn into_future(
670        mut self,
671        latest: watch::Sender<Option<N::BlockResponse>>,
672        mut ixns: mpsc::Receiver<TxWatcher>,
673    ) {
674        'shutdown: loop {
675            {
676                let next_reap = self.next_reap();
677                let sleep = std::pin::pin!(sleep_until(next_reap.into()));
678
679                // We bias the select so that we always handle new messages
680                // before checking blocks, and reap timeouts are last.
681                select! {
682                    biased;
683
684                    // Watch for new transactions.
685                    ix_opt = ixns.recv() => match ix_opt {
686                        Some(to_watch) => self.handle_watch_ix(to_watch),
687                        None => break 'shutdown, // ix channel is closed
688                    },
689
690                    // Wake up to handle new blocks.
691                    Some(block) = self.stream.next() => {
692                        self.handle_new_block(block, &latest);
693                    },
694
695                    // This arm ensures we always wake up to reap timeouts,
696                    // even if there are no other events.
697                    _ = sleep => {},
698                }
699            }
700
701            // Always reap timeouts
702            self.reap_timeouts();
703        }
704    }
705}