1use cache::{
6 Cache,
7 CachedDataBatch,
8};
9use fuel_core_services::{
10 SharedMutex,
11 StateWatcher,
12 TraceErr,
13};
14use fuel_core_types::{
15 self,
16 blockchain::{
17 block::Block,
18 SealedBlock,
19 SealedBlockHeader,
20 },
21 fuel_types::BlockHeight,
22 services::p2p::{
23 PeerId,
24 SourcePeer,
25 Transactions,
26 },
27};
28use futures::{
29 stream::StreamExt,
30 FutureExt,
31 Stream,
32};
33use std::{
34 future::Future,
35 num::NonZeroU32,
36 ops::{
37 Range,
38 RangeInclusive,
39 },
40 sync::Arc,
41};
42use tokio::{
43 pin,
44 sync::{
45 mpsc,
46 Notify,
47 },
48 task::JoinHandle,
49};
50use tracing::Instrument;
51
52use crate::{
53 ports::{
54 BlockImporterPort,
55 ConsensusPort,
56 PeerReportReason,
57 PeerToPeerPort,
58 },
59 state::State,
60};
61
62mod cache;
63
64#[cfg(any(test, feature = "benchmarking"))]
65pub mod test_helpers;
68
69#[cfg(test)]
70mod tests;
71
72#[cfg(test)]
73mod back_pressure_tests;
74
75#[derive(Clone, Copy, Debug)]
76pub struct Config {
78 pub block_stream_buffer_size: usize,
80 pub header_batch_size: usize,
82}
83
84impl Default for Config {
85 fn default() -> Self {
86 Self {
87 block_stream_buffer_size: 10,
88 header_batch_size: 100,
89 }
90 }
91}
92
93pub struct Import<P, E, C> {
96 state: SharedMutex<State>,
98 notify: Arc<Notify>,
100 params: Config,
102 p2p: Arc<P>,
104 executor: Arc<E>,
106 consensus: Arc<C>,
108 cache: Cache,
110}
111
112#[derive(Debug, Clone)]
114enum BlockHeaderData {
115 Cached(CachedDataBatch),
117 Fetched(Batch<SealedBlockHeader>),
119}
120
121impl<P, E, C> Import<P, E, C> {
122 pub fn new(
125 state: SharedMutex<State>,
126 notify: Arc<Notify>,
127 params: Config,
128 p2p: Arc<P>,
129 executor: Arc<E>,
130 consensus: Arc<C>,
131 ) -> Self {
132 Self {
133 state,
134 notify,
135 params,
136 p2p,
137 executor,
138 consensus,
139 cache: Cache::new(),
140 }
141 }
142
143 pub fn notify_one(&self) {
145 self.notify.notify_one()
146 }
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
150struct Batch<T> {
151 peer: Option<PeerId>,
152 range: Range<u32>,
153 results: Vec<T>,
154}
155
156impl<T> Batch<T> {
157 pub fn new(peer: Option<PeerId>, range: Range<u32>, results: Vec<T>) -> Self {
158 Self {
159 peer,
160 range,
161 results,
162 }
163 }
164
165 pub fn is_err(&self) -> bool {
166 self.results.len() < self.range.len()
167 }
168}
169
170type SealedHeaderBatch = Batch<SealedBlockHeader>;
171type SealedBlockBatch = Batch<SealedBlock>;
172
173impl<P, E, C> Import<P, E, C>
174where
175 P: PeerToPeerPort + Send + Sync + 'static,
176 E: BlockImporterPort + Send + Sync + 'static,
177 C: ConsensusPort + Send + Sync + 'static,
178{
179 #[tracing::instrument(skip_all)]
180 pub async fn import(&mut self, shutdown: &mut StateWatcher) -> anyhow::Result<bool> {
182 self.import_inner(shutdown).await?;
183
184 Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await)
185 }
186
187 async fn import_inner(&mut self, shutdown: &StateWatcher) -> anyhow::Result<()> {
188 if let Some(range) = self.state.apply(|s| s.process_range()) {
190 let count = self.launch_stream(range.clone(), shutdown).await;
192
193 let range_len = range.size_hint().0;
195
196 if count < range_len {
198 let count = u32::try_from(count)
199 .expect("Size of the range can't be more than maximum `BlockHeight`");
200 let incomplete_range = range.start().saturating_add(count)..=*range.end();
201 self.state
202 .apply(|s| s.failed_to_process(incomplete_range.clone()));
203 return Err(anyhow::anyhow!(
204 "Failed to import range of blocks: {:?}",
205 incomplete_range
206 ));
207 }
208 }
209 Ok(())
210 }
211
212 fn fetch_batches_task(
213 &self,
214 range: RangeInclusive<u32>,
215 shutdown: &StateWatcher,
216 ) -> (JoinHandle<()>, mpsc::Receiver<SealedBlockBatch>) {
217 let Self {
218 params,
219 p2p,
220 consensus,
221 cache,
222 ..
223 } = &self;
224 let batch_size = u32::try_from(params.header_batch_size)
225 .expect("Header batch size must be less u32::MAX");
226 let batch_size =
227 NonZeroU32::new(batch_size).expect("Header batch size must be non-zero");
228
229 let (batch_sender, batch_receiver) = mpsc::channel(1);
230
231 let fetch_batches_task = tokio::spawn({
232 let params = *params;
233 let p2p = p2p.clone();
234 let consensus = consensus.clone();
235 let cache = cache.clone();
236 let block_stream_buffer_size = params.block_stream_buffer_size;
237 let mut shutdown_signal = shutdown.clone();
238 async move {
239 let block_stream = get_block_stream(
240 range.clone(),
241 batch_size,
242 p2p,
243 consensus,
244 cache.clone(),
245 );
246
247 let shutdown_future = {
248 let mut s = shutdown_signal.clone();
249 async move {
250 let _ = s.while_started().await;
251 tracing::info!("In progress import stream shutting down");
252 }
253 };
254
255 let stream = block_stream
256 .map({
257 let shutdown_signal = shutdown_signal.clone();
258 move |stream_block_batch| {
259 let mut shutdown_signal = shutdown_signal.clone();
260 tokio::spawn(async move {
261 tokio::select! {
262 biased;
263 _ = shutdown_signal.while_started() => None,
266 blocks = stream_block_batch => Some(blocks),
268 }
269 }).map(|task| {
270 task.trace_err("Failed to join the task").ok().flatten()
271 })
272 }
273 })
274 .buffered(block_stream_buffer_size)
276 .take_until(shutdown_future)
278 .into_scan_none()
279 .scan_none()
280 .into_scan_err()
281 .scan_err();
282
283 pin!(stream);
284
285 while let Some(block_batch) = stream.next().await {
286 tokio::select! {
287 biased;
288 _ = shutdown_signal.while_started() => {
289 break;
290 },
291 result = batch_sender.send(block_batch) => {
292 if result.is_err() {
293 break
294 }
295 },
296 }
297 }
298 }
299 });
300
301 (fetch_batches_task, batch_receiver)
302 }
303
304 #[tracing::instrument(skip(self, shutdown))]
305 async fn launch_stream(
312 &mut self,
313 range: RangeInclusive<u32>,
314 shutdown: &StateWatcher,
315 ) -> usize {
316 let Self {
317 state,
318 p2p,
319 executor,
320 ..
321 } = &self;
322
323 let (fetch_batches_task, batch_receiver) =
324 self.fetch_batches_task(range, shutdown);
325 let result = tokio_stream::wrappers::ReceiverStream::new(batch_receiver)
326 .then(|batch| {
327 let mut cache = self.cache.clone();
328 async move {
329 let Batch {
330 peer,
331 range,
332 results,
333 } = batch;
334
335 let mut done = vec![];
336 let mut shutdown = shutdown.clone();
337 for sealed_block in results {
338 let height = *sealed_block.entity.header().height();
339 let res = tokio::select! {
340 biased;
341 _ = shutdown.while_started() => {
342 break;
343 },
344 res = execute_and_commit(executor.as_ref(), state, sealed_block) => {
345 cache.remove_element(&height);
346 res
347 },
348 };
349
350 match &res {
351 Ok(_) => {
352 done.push(());
353 },
354 Err(e) => {
355 tracing::error!("Failed to execute and commit block from peer {:?}: {:?}", peer, e);
358 break;
359 },
360 };
361 }
362
363 let batch = Batch::new(peer.clone(), range.clone(), done);
364
365 if !batch.is_err() {
366 report_peer(p2p, peer, PeerReportReason::SuccessfulBlockImport);
367 }
368
369 batch
370 }
371 .instrument(tracing::debug_span!("execute_and_commit"))
372 .in_current_span()
373 })
374 .into_scan_err()
376 .scan_err()
377 .fold(0usize, |count, batch| async move {
380 count.checked_add(batch.results.len()).expect("It is impossible to fetch so much data to overflow `usize`")
381 })
382 .await;
383
384 let _ = fetch_batches_task
386 .await
387 .trace_err("Failed to join the fetch batches task");
388 result
389 }
390}
391
392fn get_block_stream<
393 P: PeerToPeerPort + Send + Sync + 'static,
394 C: ConsensusPort + Send + Sync + 'static,
395>(
396 range: RangeInclusive<u32>,
397 header_batch_size: NonZeroU32,
398 p2p: Arc<P>,
399 consensus: Arc<C>,
400 cache: Cache,
401) -> impl Stream<Item = impl Future<Output = SealedBlockBatch>> {
402 cache
403 .get_chunks(range.clone(), header_batch_size)
404 .map({
405 let p2p = p2p.clone();
406 move |cached_data_batch| {
407 let p2p = p2p.clone();
408 async move {
409 if let CachedDataBatch::None(range) = cached_data_batch {
410 BlockHeaderData::Fetched(get_headers_batch(range, &p2p).await)
411 } else {
412 BlockHeaderData::Cached(cached_data_batch)
413 }
414 }
415 }
416 })
417 .map({
418 let p2p = p2p.clone();
419 let consensus = consensus.clone();
420 let cache = cache.clone();
421 move |header_batch| {
422 let p2p = p2p.clone();
423 let consensus = consensus.clone();
424 let mut cache = cache.clone();
425 async move {
426 match header_batch.await {
427 BlockHeaderData::Cached(cached_data) => {
428 BlockHeaderData::Cached(cached_data)
429 }
430 BlockHeaderData::Fetched(fetched_batch) => {
431 let Batch {
432 peer,
433 range,
434 results,
435 } = fetched_batch;
436 let checked_headers = results
437 .into_iter()
438 .take_while(|header| {
439 check_sealed_header(
440 header,
441 peer.clone(),
442 &p2p,
443 &consensus,
444 )
445 })
446 .collect::<Vec<_>>();
447 let batch = Batch::new(peer, range.clone(), checked_headers);
448 if !batch.is_err() {
449 cache.insert_headers(batch.clone());
450 }
451 BlockHeaderData::Fetched(batch)
452 }
453 }
454 }
455 }
456 })
457 .map({
458 let p2p = p2p.clone();
459 let consensus = consensus.clone();
460 move |headers| {
461 let p2p = p2p.clone();
462 let consensus = consensus.clone();
463 let mut cache = cache.clone();
464 async move {
465 match headers.await {
466 BlockHeaderData::Cached(CachedDataBatch::Blocks(batch)) => batch,
467 BlockHeaderData::Cached(CachedDataBatch::Headers(batch))
468 | BlockHeaderData::Fetched(batch) => {
469 let Batch {
470 peer,
471 range,
472 results,
473 } = batch;
474 if results.is_empty() {
475 SealedBlockBatch::new(peer, range, vec![])
476 } else {
477 await_da_height(
478 results
479 .last()
480 .expect("We checked headers are not empty above"),
481 &consensus,
482 )
483 .await;
484 let headers =
485 SealedHeaderBatch::new(peer, range.clone(), results);
486 let batch = get_blocks(&p2p, headers).await;
487 if !batch.is_err() {
488 cache.insert_blocks(batch.clone());
489 }
490 batch
491 }
492 }
493 BlockHeaderData::Cached(CachedDataBatch::None(_)) => {
494 tracing::error!("Cached data batch should never be created outside of the caching algorithm.");
495 Batch::new(None, 0..1, vec![])
496 }
497 }
498 }
499 .instrument(tracing::debug_span!("consensus_and_transactions"))
500 .in_current_span()
501 }
502 })
503}
504
505fn check_sealed_header<
506 P: PeerToPeerPort + Send + Sync + 'static,
507 C: ConsensusPort + Send + Sync + 'static,
508>(
509 header: &SealedBlockHeader,
510 peer_id: Option<PeerId>,
511 p2p: &Arc<P>,
512 consensus: &Arc<C>,
513) -> bool {
514 let validity = consensus
515 .check_sealed_header(header)
516 .trace_err("Failed to check consensus on header")
517 .unwrap_or(false);
518 if !validity {
519 report_peer(p2p, peer_id.clone(), PeerReportReason::BadBlockHeader);
520 }
521 validity
522}
523
524async fn await_da_height<C: ConsensusPort + Send + Sync + 'static>(
525 header: &SealedBlockHeader,
526 consensus: &Arc<C>,
527) {
528 let _ = consensus
529 .await_da_height(&header.entity.da_height)
530 .await
531 .trace_err("Failed to wait for DA layer to sync");
532}
533
534async fn wait_for_notify_or_shutdown(
537 notify: &Notify,
538 shutdown: &mut StateWatcher,
539) -> bool {
540 let n = notify.notified();
541 let s = shutdown.while_started();
542 futures::pin_mut!(n);
543 futures::pin_mut!(s);
544
545 let r = futures::future::select(n, s).await;
547
548 matches!(r, futures::future::Either::Left(_))
550}
551
552async fn get_sealed_block_headers<P>(
553 range: Range<u32>,
554 p2p: &Arc<P>,
555) -> Option<SourcePeer<Vec<SealedBlockHeader>>>
556where
557 P: PeerToPeerPort + Send + Sync + 'static,
558{
559 tracing::debug!(
560 "getting header range from {} to {} inclusive",
561 range.start,
562 range.end
563 );
564 p2p.get_sealed_block_headers(range)
565 .await
566 .trace_err("Failed to get headers")
567 .ok()
568 .map(|res| res.map(|data| data.unwrap_or_default()))
569}
570
571async fn get_transactions<P>(
572 range: Range<u32>,
573 peer_id: Option<PeerId>,
574 p2p: &Arc<P>,
575) -> Option<SourcePeer<Vec<Transactions>>>
576where
577 P: PeerToPeerPort + Send + Sync + 'static,
578{
579 match peer_id {
580 Some(peer_id) => {
581 let source_peer = peer_id.clone().bind(range.clone());
582 let Ok(Some(txs)) = p2p
583 .get_transactions_from_peer(source_peer)
584 .await
585 .trace_err("Failed to get transactions")
586 else {
587 report_peer(
588 p2p,
589 Some(peer_id.clone()),
590 PeerReportReason::MissingTransactions,
591 );
592 return None;
593 };
594 Some(SourcePeer { peer_id, data: txs })
595 }
596 None => {
597 let Ok(SourcePeer { peer_id, data }) = p2p
598 .get_transactions(range.clone())
599 .await
600 .trace_err("Failed to get transactions")
601 else {
602 return None;
603 };
604 let Some(txs) = data else {
605 report_peer(
606 p2p,
607 Some(peer_id.clone()),
608 PeerReportReason::MissingTransactions,
609 );
610 return None;
611 };
612 Some(SourcePeer { peer_id, data: txs })
613 }
614 }
615}
616
617async fn get_headers_batch<P>(range: Range<u32>, p2p: &Arc<P>) -> SealedHeaderBatch
618where
619 P: PeerToPeerPort + Send + Sync + 'static,
620{
621 tracing::debug!(
622 "getting header range from {} to {} inclusive",
623 range.start,
624 range.end
625 );
626 let Some(sourced_headers) = get_sealed_block_headers(range.clone(), p2p).await else {
627 return Batch::new(None, range, vec![])
628 };
629 let SourcePeer {
630 peer_id,
631 data: headers,
632 } = sourced_headers;
633 let heights = range.clone().map(BlockHeight::from);
634 let headers = headers
635 .into_iter()
636 .zip(heights)
637 .take_while(move |(header, expected_height)| {
638 let height = header.entity.height();
639 height == expected_height
640 })
641 .map(|(header, _)| header)
642 .collect::<Vec<_>>();
643 if headers.len() != range.len() {
644 report_peer(
645 p2p,
646 Some(peer_id.clone()),
647 PeerReportReason::MissingBlockHeaders,
648 );
649 }
650 Batch::new(Some(peer_id), range, headers)
651}
652
653fn report_peer<P>(p2p: &Arc<P>, peer_id: Option<PeerId>, reason: PeerReportReason)
654where
655 P: PeerToPeerPort + Send + Sync + 'static,
656{
657 if let Some(peer_id) = peer_id {
658 tracing::info!("Reporting peer for {:?}", reason);
659
660 let _ = p2p
662 .report_peer(peer_id.clone(), reason)
663 .trace_err(&format!("Failed to report peer {:?}", peer_id));
664 }
665}
666
667#[tracing::instrument(skip(p2p, headers))]
669async fn get_blocks<P>(p2p: &Arc<P>, headers: SealedHeaderBatch) -> SealedBlockBatch
670where
671 P: PeerToPeerPort + Send + Sync + 'static,
672{
673 let Batch {
674 results: headers,
675 range,
676 peer,
677 } = headers;
678
679 let Some(SourcePeer {
680 peer_id,
681 data: transactions,
682 }) = get_transactions(range.clone(), peer.clone(), p2p).await
683 else {
684 return Batch::new(peer, range, vec![])
685 };
686
687 let iter = headers.into_iter().zip(transactions.into_iter());
688 let mut blocks = vec![];
689 for (block_header, transactions) in iter {
690 let SealedBlockHeader {
691 consensus,
692 entity: header,
693 } = block_header;
694 let block =
695 Block::try_from_executed(header, transactions.0).map(|block| SealedBlock {
696 entity: block,
697 consensus,
698 });
699 if let Some(block) = block {
700 blocks.push(block);
701 } else {
702 report_peer(
703 p2p,
704 Some(peer_id.clone()),
705 PeerReportReason::InvalidTransactions,
706 );
707 break
708 }
709 }
710 Batch::new(Some(peer_id), range, blocks)
711}
712
713#[tracing::instrument(
714 skip_all,
715 fields(
716 height = **block.entity.header().height(),
717 id = %block.entity.header().consensus().generated.application_hash
718 ),
719 err
720)]
721async fn execute_and_commit<E>(
722 executor: &E,
723 state: &SharedMutex<State>,
724 block: SealedBlock,
725) -> anyhow::Result<()>
726where
727 E: BlockImporterPort + Send + Sync + 'static,
728{
729 let height = *block.entity.header().height();
731 let r = executor.execute_and_commit(block).await;
732
733 if let Err(err) = &r {
735 tracing::error!("Execution of height {} failed: {:?}", *height, err);
736 } else {
737 state.apply(|s| s.commit(*height));
738 }
739 r
740}
741
742trait StreamUtil: Sized {
744 fn into_scan_none(self) -> ScanNone<Self> {
746 ScanNone(self)
747 }
748
749 fn into_scan_err(self) -> ScanErr<Self> {
751 ScanErr(self)
752 }
753}
754
755impl<S> StreamUtil for S {}
756
757struct ScanErr<S>(S);
758struct ScanNone<S>(S);
759
760impl<S> ScanNone<S> {
761 fn scan_none<'a, T: 'a>(self) -> impl Stream<Item = T> + 'a
762 where
763 S: Stream<Item = Option<T>> + Send + 'a,
764 {
765 let stream = self.0.boxed::<'a>();
766 futures::stream::unfold(stream, |mut stream| async move {
767 let element = stream.next().await??;
768 Some((element, stream))
769 })
770 }
771}
772
773impl<S> ScanErr<S> {
774 fn scan_err<'a, T: 'a>(self) -> impl Stream<Item = Batch<T>> + 'a
775 where
776 S: Stream<Item = Batch<T>> + Send + 'a,
777 {
778 let stream = self.0.boxed::<'a>();
779 futures::stream::unfold((false, stream), |(mut err, mut stream)| async move {
780 if err {
781 None
782 } else {
783 let batch = stream.next().await?;
784 err = batch.is_err();
785 Some((batch, (err, stream)))
786 }
787 })
788 }
789}