fuel_core_sync/
import.rs

1//! # Importer Task
2//! This module contains the import task which is responsible for
3//! importing blocks from the network into the local blockchain.
4
5use cache::{
6    Cache,
7    CachedDataBatch,
8};
9use fuel_core_services::{
10    SharedMutex,
11    StateWatcher,
12    TraceErr,
13};
14use fuel_core_types::{
15    self,
16    blockchain::{
17        block::Block,
18        SealedBlock,
19        SealedBlockHeader,
20    },
21    fuel_types::BlockHeight,
22    services::p2p::{
23        PeerId,
24        SourcePeer,
25        Transactions,
26    },
27};
28use futures::{
29    stream::StreamExt,
30    FutureExt,
31    Stream,
32};
33use std::{
34    future::Future,
35    num::NonZeroU32,
36    ops::{
37        Range,
38        RangeInclusive,
39    },
40    sync::Arc,
41};
42use tokio::{
43    pin,
44    sync::{
45        mpsc,
46        Notify,
47    },
48    task::JoinHandle,
49};
50use tracing::Instrument;
51
52use crate::{
53    ports::{
54        BlockImporterPort,
55        ConsensusPort,
56        PeerReportReason,
57        PeerToPeerPort,
58    },
59    state::State,
60};
61
62mod cache;
63
64#[cfg(any(test, feature = "benchmarking"))]
65/// Accessories for testing the sync. Available only when compiling under test
66/// or benchmarking.
67pub mod test_helpers;
68
69#[cfg(test)]
70mod tests;
71
72#[cfg(test)]
73mod back_pressure_tests;
74
75#[derive(Clone, Copy, Debug)]
76/// Parameters for the import task.
77pub struct Config {
78    /// The maximum number of get transaction requests to make in a single batch.
79    pub block_stream_buffer_size: usize,
80    /// The maximum number of headers to request in a single batch.
81    pub header_batch_size: usize,
82}
83
84impl Default for Config {
85    fn default() -> Self {
86        Self {
87            block_stream_buffer_size: 10,
88            header_batch_size: 100,
89        }
90    }
91}
92
93/// The combination of shared state, configuration, and services that define
94/// import behavior.
95pub struct Import<P, E, C> {
96    /// Shared state between import and sync tasks.
97    state: SharedMutex<State>,
98    /// Notify import when sync has new work.
99    notify: Arc<Notify>,
100    /// Configuration parameters.
101    params: Config,
102    /// Network port.
103    p2p: Arc<P>,
104    /// Executor port.
105    executor: Arc<E>,
106    /// Consensus port.
107    consensus: Arc<C>,
108    /// A cache of already validated header or blocks.
109    cache: Cache,
110}
111
112/// The data that is fetched either in the network or in the cache for a range of headers or blocks.
113#[derive(Debug, Clone)]
114enum BlockHeaderData {
115    /// The headers (or full blocks) have been fetched and checked.
116    Cached(CachedDataBatch),
117    /// The headers has just been fetched from the network.
118    Fetched(Batch<SealedBlockHeader>),
119}
120
121impl<P, E, C> Import<P, E, C> {
122    /// Configure an import behavior from a shared state, configuration and
123    /// services that can be executed by an ImportTask.
124    pub fn new(
125        state: SharedMutex<State>,
126        notify: Arc<Notify>,
127        params: Config,
128        p2p: Arc<P>,
129        executor: Arc<E>,
130        consensus: Arc<C>,
131    ) -> Self {
132        Self {
133            state,
134            notify,
135            params,
136            p2p,
137            executor,
138            consensus,
139            cache: Cache::new(),
140        }
141    }
142
143    /// Signal other asynchronous tasks that an import event has occurred.
144    pub fn notify_one(&self) {
145        self.notify.notify_one()
146    }
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
150struct Batch<T> {
151    peer: Option<PeerId>,
152    range: Range<u32>,
153    results: Vec<T>,
154}
155
156impl<T> Batch<T> {
157    pub fn new(peer: Option<PeerId>, range: Range<u32>, results: Vec<T>) -> Self {
158        Self {
159            peer,
160            range,
161            results,
162        }
163    }
164
165    pub fn is_err(&self) -> bool {
166        self.results.len() < self.range.len()
167    }
168}
169
170type SealedHeaderBatch = Batch<SealedBlockHeader>;
171type SealedBlockBatch = Batch<SealedBlock>;
172
173impl<P, E, C> Import<P, E, C>
174where
175    P: PeerToPeerPort + Send + Sync + 'static,
176    E: BlockImporterPort + Send + Sync + 'static,
177    C: ConsensusPort + Send + Sync + 'static,
178{
179    #[tracing::instrument(skip_all)]
180    /// Execute imports until a shutdown is requested.
181    pub async fn import(&mut self, shutdown: &mut StateWatcher) -> anyhow::Result<bool> {
182        self.import_inner(shutdown).await?;
183
184        Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await)
185    }
186
187    async fn import_inner(&mut self, shutdown: &StateWatcher) -> anyhow::Result<()> {
188        // If there is a range to process, launch the stream.
189        if let Some(range) = self.state.apply(|s| s.process_range()) {
190            // Launch the stream to import the range.
191            let count = self.launch_stream(range.clone(), shutdown).await;
192
193            // Get the size of the range.
194            let range_len = range.size_hint().0;
195
196            // If we did not process the entire range, mark the failed heights as failed.
197            if count < range_len {
198                let count = u32::try_from(count)
199                    .expect("Size of the range can't be more than maximum `BlockHeight`");
200                let incomplete_range = range.start().saturating_add(count)..=*range.end();
201                self.state
202                    .apply(|s| s.failed_to_process(incomplete_range.clone()));
203                return Err(anyhow::anyhow!(
204                    "Failed to import range of blocks: {:?}",
205                    incomplete_range
206                ));
207            }
208        }
209        Ok(())
210    }
211
212    fn fetch_batches_task(
213        &self,
214        range: RangeInclusive<u32>,
215        shutdown: &StateWatcher,
216    ) -> (JoinHandle<()>, mpsc::Receiver<SealedBlockBatch>) {
217        let Self {
218            params,
219            p2p,
220            consensus,
221            cache,
222            ..
223        } = &self;
224        let batch_size = u32::try_from(params.header_batch_size)
225            .expect("Header batch size must be less u32::MAX");
226        let batch_size =
227            NonZeroU32::new(batch_size).expect("Header batch size must be non-zero");
228
229        let (batch_sender, batch_receiver) = mpsc::channel(1);
230
231        let fetch_batches_task = tokio::spawn({
232            let params = *params;
233            let p2p = p2p.clone();
234            let consensus = consensus.clone();
235            let cache = cache.clone();
236            let block_stream_buffer_size = params.block_stream_buffer_size;
237            let mut shutdown_signal = shutdown.clone();
238            async move {
239                let block_stream = get_block_stream(
240                    range.clone(),
241                    batch_size,
242                    p2p,
243                    consensus,
244                    cache.clone(),
245                );
246
247                let shutdown_future = {
248                    let mut s = shutdown_signal.clone();
249                    async move {
250                        let _ = s.while_started().await;
251                        tracing::info!("In progress import stream shutting down");
252                    }
253                };
254
255                let stream = block_stream
256                    .map({
257                        let shutdown_signal = shutdown_signal.clone();
258                        move |stream_block_batch| {
259                            let mut shutdown_signal = shutdown_signal.clone();
260                            tokio::spawn(async move {
261                                tokio::select! {
262                                    biased;
263                                    // If a shutdown signal is received during the stream, terminate early and
264                                    // return an empty response
265                                    _ = shutdown_signal.while_started() => None,
266                                    // Stream a batch of blocks
267                                    blocks = stream_block_batch => Some(blocks),
268                                }
269                            }).map(|task| {
270                                task.trace_err("Failed to join the task").ok().flatten()
271                            })
272                        }
273                    })
274                    // Request up to `block_stream_buffer_size` headers/transactions from the network.
275                    .buffered(block_stream_buffer_size)
276                    // Continue the stream until the shutdown signal is received.
277                    .take_until(shutdown_future)
278                    .into_scan_none()
279                    .scan_none()
280                    .into_scan_err()
281                    .scan_err();
282
283                pin!(stream);
284
285                while let Some(block_batch) = stream.next().await {
286                    tokio::select! {
287                        biased;
288                        _ = shutdown_signal.while_started() => {
289                            break;
290                        },
291                        result = batch_sender.send(block_batch) => {
292                            if result.is_err() {
293                                break
294                            }
295                        },
296                    }
297                }
298            }
299        });
300
301        (fetch_batches_task, batch_receiver)
302    }
303
304    #[tracing::instrument(skip(self, shutdown))]
305    /// Launches a stream to import and execute a range of blocks.
306    ///
307    /// This stream will process all blocks up to the given range or
308    /// an error occurs.
309    /// If an error occurs, the preceding blocks still be processed
310    /// and the error will be returned.
311    async fn launch_stream(
312        &mut self,
313        range: RangeInclusive<u32>,
314        shutdown: &StateWatcher,
315    ) -> usize {
316        let Self {
317            state,
318            p2p,
319            executor,
320            ..
321        } = &self;
322
323        let (fetch_batches_task, batch_receiver) =
324            self.fetch_batches_task(range, shutdown);
325        let result = tokio_stream::wrappers::ReceiverStream::new(batch_receiver)
326            .then(|batch| {
327                let mut cache = self.cache.clone();
328                async move {
329                    let Batch {
330                        peer,
331                        range,
332                        results,
333                    } = batch;
334
335                    let mut done = vec![];
336                    let mut shutdown = shutdown.clone();
337                    for sealed_block in results {
338                        let height = *sealed_block.entity.header().height();
339                        let res = tokio::select! {
340                            biased;
341                            _ = shutdown.while_started() => {
342                                break;
343                            },
344                            res = execute_and_commit(executor.as_ref(), state, sealed_block) => {
345                                cache.remove_element(&height);
346                                res
347                            },
348                        };
349
350                        match &res {
351                            Ok(_) => {
352                                done.push(());
353                            },
354                            Err(e) => {
355                                // If this fails, then it means that consensus has approved a block that is invalid.
356                                // This would suggest a more serious issue than a bad peer, e.g. a fork or an out-of-date client.
357                                tracing::error!("Failed to execute and commit block from peer {:?}: {:?}", peer, e);
358                                break;
359                            },
360                        };
361                    }
362
363                    let batch = Batch::new(peer.clone(), range.clone(), done);
364
365                    if !batch.is_err() {
366                        report_peer(p2p, peer, PeerReportReason::SuccessfulBlockImport);
367                    }
368
369                    batch
370                }
371                .instrument(tracing::debug_span!("execute_and_commit"))
372                .in_current_span()
373            })
374            // Continue the stream unless an error occurs.
375            .into_scan_err()
376            .scan_err()
377            // Count the number of successfully executed blocks.
378            // Fold the stream into a count.
379            .fold(0usize, |count, batch| async move {
380                count.checked_add(batch.results.len()).expect("It is impossible to fetch so much data to overflow `usize`")
381            })
382            .await;
383
384        // Wait for spawned task to finish
385        let _ = fetch_batches_task
386            .await
387            .trace_err("Failed to join the fetch batches task");
388        result
389    }
390}
391
392fn get_block_stream<
393    P: PeerToPeerPort + Send + Sync + 'static,
394    C: ConsensusPort + Send + Sync + 'static,
395>(
396    range: RangeInclusive<u32>,
397    header_batch_size: NonZeroU32,
398    p2p: Arc<P>,
399    consensus: Arc<C>,
400    cache: Cache,
401) -> impl Stream<Item = impl Future<Output = SealedBlockBatch>> {
402    cache
403        .get_chunks(range.clone(), header_batch_size)
404        .map({
405            let p2p = p2p.clone();
406            move |cached_data_batch| {
407                let p2p = p2p.clone();
408                async move {
409                    if let CachedDataBatch::None(range) = cached_data_batch {
410                        BlockHeaderData::Fetched(get_headers_batch(range, &p2p).await)
411                    } else {
412                        BlockHeaderData::Cached(cached_data_batch)
413                    }
414                }
415            }
416        })
417        .map({
418            let p2p = p2p.clone();
419            let consensus = consensus.clone();
420            let cache = cache.clone();
421            move |header_batch| {
422                let p2p = p2p.clone();
423                let consensus = consensus.clone();
424                let mut cache = cache.clone();
425                async move {
426                    match header_batch.await {
427                        BlockHeaderData::Cached(cached_data) => {
428                            BlockHeaderData::Cached(cached_data)
429                        }
430                        BlockHeaderData::Fetched(fetched_batch) => {
431                            let Batch {
432                                peer,
433                                range,
434                                results,
435                            } = fetched_batch;
436                            let checked_headers = results
437                                .into_iter()
438                                .take_while(|header| {
439                                    check_sealed_header(
440                                        header,
441                                        peer.clone(),
442                                        &p2p,
443                                        &consensus,
444                                    )
445                                })
446                                .collect::<Vec<_>>();
447                            let batch = Batch::new(peer, range.clone(), checked_headers);
448                            if !batch.is_err() {
449                                cache.insert_headers(batch.clone());
450                            }
451                            BlockHeaderData::Fetched(batch)
452                        }
453                    }
454                }
455            }
456        })
457        .map({
458            let p2p = p2p.clone();
459            let consensus = consensus.clone();
460            move |headers| {
461                let p2p = p2p.clone();
462                let consensus = consensus.clone();
463                let mut cache = cache.clone();
464                async move {
465                    match headers.await {
466                        BlockHeaderData::Cached(CachedDataBatch::Blocks(batch)) => batch,
467                        BlockHeaderData::Cached(CachedDataBatch::Headers(batch))
468                        | BlockHeaderData::Fetched(batch) => {
469                            let Batch {
470                                peer,
471                                range,
472                                results,
473                            } = batch;
474                            if results.is_empty() {
475                                SealedBlockBatch::new(peer, range, vec![])
476                            } else {
477                                await_da_height(
478                                    results
479                                        .last()
480                                        .expect("We checked headers are not empty above"),
481                                    &consensus,
482                                )
483                                .await;
484                                let headers =
485                                    SealedHeaderBatch::new(peer, range.clone(), results);
486                                let batch = get_blocks(&p2p, headers).await;
487                                if !batch.is_err() {
488                                    cache.insert_blocks(batch.clone());
489                                }
490                                batch
491                            }
492                        }
493                        BlockHeaderData::Cached(CachedDataBatch::None(_)) => {
494                            tracing::error!("Cached data batch should never be created outside of the caching algorithm.");
495                            Batch::new(None, 0..1, vec![])
496                        }
497                    }
498                }
499                .instrument(tracing::debug_span!("consensus_and_transactions"))
500                .in_current_span()
501            }
502        })
503}
504
505fn check_sealed_header<
506    P: PeerToPeerPort + Send + Sync + 'static,
507    C: ConsensusPort + Send + Sync + 'static,
508>(
509    header: &SealedBlockHeader,
510    peer_id: Option<PeerId>,
511    p2p: &Arc<P>,
512    consensus: &Arc<C>,
513) -> bool {
514    let validity = consensus
515        .check_sealed_header(header)
516        .trace_err("Failed to check consensus on header")
517        .unwrap_or(false);
518    if !validity {
519        report_peer(p2p, peer_id.clone(), PeerReportReason::BadBlockHeader);
520    }
521    validity
522}
523
524async fn await_da_height<C: ConsensusPort + Send + Sync + 'static>(
525    header: &SealedBlockHeader,
526    consensus: &Arc<C>,
527) {
528    let _ = consensus
529        .await_da_height(&header.entity.da_height)
530        .await
531        .trace_err("Failed to wait for DA layer to sync");
532}
533
534/// Waits for a notify or shutdown signal.
535/// Returns true if the notify signal was received.
536async fn wait_for_notify_or_shutdown(
537    notify: &Notify,
538    shutdown: &mut StateWatcher,
539) -> bool {
540    let n = notify.notified();
541    let s = shutdown.while_started();
542    futures::pin_mut!(n);
543    futures::pin_mut!(s);
544
545    // Select the first signal to be received.
546    let r = futures::future::select(n, s).await;
547
548    // Check if the notify signal was received.
549    matches!(r, futures::future::Either::Left(_))
550}
551
552async fn get_sealed_block_headers<P>(
553    range: Range<u32>,
554    p2p: &Arc<P>,
555) -> Option<SourcePeer<Vec<SealedBlockHeader>>>
556where
557    P: PeerToPeerPort + Send + Sync + 'static,
558{
559    tracing::debug!(
560        "getting header range from {} to {} inclusive",
561        range.start,
562        range.end
563    );
564    p2p.get_sealed_block_headers(range)
565        .await
566        .trace_err("Failed to get headers")
567        .ok()
568        .map(|res| res.map(|data| data.unwrap_or_default()))
569}
570
571async fn get_transactions<P>(
572    range: Range<u32>,
573    peer_id: Option<PeerId>,
574    p2p: &Arc<P>,
575) -> Option<SourcePeer<Vec<Transactions>>>
576where
577    P: PeerToPeerPort + Send + Sync + 'static,
578{
579    match peer_id {
580        Some(peer_id) => {
581            let source_peer = peer_id.clone().bind(range.clone());
582            let Ok(Some(txs)) = p2p
583                .get_transactions_from_peer(source_peer)
584                .await
585                .trace_err("Failed to get transactions")
586            else {
587                report_peer(
588                    p2p,
589                    Some(peer_id.clone()),
590                    PeerReportReason::MissingTransactions,
591                );
592                return None;
593            };
594            Some(SourcePeer { peer_id, data: txs })
595        }
596        None => {
597            let Ok(SourcePeer { peer_id, data }) = p2p
598                .get_transactions(range.clone())
599                .await
600                .trace_err("Failed to get transactions")
601            else {
602                return None;
603            };
604            let Some(txs) = data else {
605                report_peer(
606                    p2p,
607                    Some(peer_id.clone()),
608                    PeerReportReason::MissingTransactions,
609                );
610                return None;
611            };
612            Some(SourcePeer { peer_id, data: txs })
613        }
614    }
615}
616
617async fn get_headers_batch<P>(range: Range<u32>, p2p: &Arc<P>) -> SealedHeaderBatch
618where
619    P: PeerToPeerPort + Send + Sync + 'static,
620{
621    tracing::debug!(
622        "getting header range from {} to {} inclusive",
623        range.start,
624        range.end
625    );
626    let Some(sourced_headers) = get_sealed_block_headers(range.clone(), p2p).await else {
627        return Batch::new(None, range, vec![])
628    };
629    let SourcePeer {
630        peer_id,
631        data: headers,
632    } = sourced_headers;
633    let heights = range.clone().map(BlockHeight::from);
634    let headers = headers
635        .into_iter()
636        .zip(heights)
637        .take_while(move |(header, expected_height)| {
638            let height = header.entity.height();
639            height == expected_height
640        })
641        .map(|(header, _)| header)
642        .collect::<Vec<_>>();
643    if headers.len() != range.len() {
644        report_peer(
645            p2p,
646            Some(peer_id.clone()),
647            PeerReportReason::MissingBlockHeaders,
648        );
649    }
650    Batch::new(Some(peer_id), range, headers)
651}
652
653fn report_peer<P>(p2p: &Arc<P>, peer_id: Option<PeerId>, reason: PeerReportReason)
654where
655    P: PeerToPeerPort + Send + Sync + 'static,
656{
657    if let Some(peer_id) = peer_id {
658        tracing::info!("Reporting peer for {:?}", reason);
659
660        // Failure to report a peer is a non-fatal error; ignore the error
661        let _ = p2p
662            .report_peer(peer_id.clone(), reason)
663            .trace_err(&format!("Failed to report peer {:?}", peer_id));
664    }
665}
666
667/// Get blocks correlating to the headers from a specific peer
668#[tracing::instrument(skip(p2p, headers))]
669async fn get_blocks<P>(p2p: &Arc<P>, headers: SealedHeaderBatch) -> SealedBlockBatch
670where
671    P: PeerToPeerPort + Send + Sync + 'static,
672{
673    let Batch {
674        results: headers,
675        range,
676        peer,
677    } = headers;
678
679    let Some(SourcePeer {
680        peer_id,
681        data: transactions,
682    }) = get_transactions(range.clone(), peer.clone(), p2p).await
683    else {
684        return Batch::new(peer, range, vec![])
685    };
686
687    let iter = headers.into_iter().zip(transactions.into_iter());
688    let mut blocks = vec![];
689    for (block_header, transactions) in iter {
690        let SealedBlockHeader {
691            consensus,
692            entity: header,
693        } = block_header;
694        let block =
695            Block::try_from_executed(header, transactions.0).map(|block| SealedBlock {
696                entity: block,
697                consensus,
698            });
699        if let Some(block) = block {
700            blocks.push(block);
701        } else {
702            report_peer(
703                p2p,
704                Some(peer_id.clone()),
705                PeerReportReason::InvalidTransactions,
706            );
707            break
708        }
709    }
710    Batch::new(Some(peer_id), range, blocks)
711}
712
713#[tracing::instrument(
714    skip_all,
715    fields(
716        height = **block.entity.header().height(),
717        id = %block.entity.header().consensus().generated.application_hash
718    ),
719    err
720)]
721async fn execute_and_commit<E>(
722    executor: &E,
723    state: &SharedMutex<State>,
724    block: SealedBlock,
725) -> anyhow::Result<()>
726where
727    E: BlockImporterPort + Send + Sync + 'static,
728{
729    // Execute and commit the block.
730    let height = *block.entity.header().height();
731    let r = executor.execute_and_commit(block).await;
732
733    // If the block executed successfully, mark it as committed.
734    if let Err(err) = &r {
735        tracing::error!("Execution of height {} failed: {:?}", *height, err);
736    } else {
737        state.apply(|s| s.commit(*height));
738    }
739    r
740}
741
742/// Extra stream utilities.
743trait StreamUtil: Sized {
744    /// Scan the stream for `None`.
745    fn into_scan_none(self) -> ScanNone<Self> {
746        ScanNone(self)
747    }
748
749    /// Scan the stream for errors.
750    fn into_scan_err(self) -> ScanErr<Self> {
751        ScanErr(self)
752    }
753}
754
755impl<S> StreamUtil for S {}
756
757struct ScanErr<S>(S);
758struct ScanNone<S>(S);
759
760impl<S> ScanNone<S> {
761    fn scan_none<'a, T: 'a>(self) -> impl Stream<Item = T> + 'a
762    where
763        S: Stream<Item = Option<T>> + Send + 'a,
764    {
765        let stream = self.0.boxed::<'a>();
766        futures::stream::unfold(stream, |mut stream| async move {
767            let element = stream.next().await??;
768            Some((element, stream))
769        })
770    }
771}
772
773impl<S> ScanErr<S> {
774    fn scan_err<'a, T: 'a>(self) -> impl Stream<Item = Batch<T>> + 'a
775    where
776        S: Stream<Item = Batch<T>> + Send + 'a,
777    {
778        let stream = self.0.boxed::<'a>();
779        futures::stream::unfold((false, stream), |(mut err, mut stream)| async move {
780            if err {
781                None
782            } else {
783                let batch = stream.next().await?;
784                err = batch.is_err();
785                Some((batch, (err, stream)))
786            }
787        })
788    }
789}