fuel_core_producer/
block_producer.rs

1use crate::{
2    block_producer::gas_price::{
3        ChainStateInfoProvider,
4        GasPriceProvider as GasPriceProviderConstraint,
5    },
6    ports::{
7        self,
8        BlockProducerDatabase,
9        RelayerBlockInfo,
10    },
11    Config,
12};
13use anyhow::{
14    anyhow,
15    Context,
16};
17use fuel_core_storage::transactional::{
18    AtomicView,
19    Changes,
20    HistoricalView,
21};
22use fuel_core_types::{
23    blockchain::{
24        block::Block,
25        header::{
26            ApplicationHeader,
27            ConsensusHeader,
28            PartialBlockHeader,
29        },
30        primitives::DaBlockHeight,
31    },
32    fuel_tx::{
33        field::{
34            InputContract,
35            MintGasPrice,
36        },
37        Transaction,
38    },
39    fuel_types::{
40        BlockHeight,
41        Bytes32,
42    },
43    services::{
44        block_producer::Components,
45        executor::{
46            StorageReadReplayEvent,
47            TransactionExecutionStatus,
48            UncommittedResult,
49        },
50    },
51    tai64::Tai64,
52};
53use std::{
54    future::Future,
55    sync::Arc,
56};
57use tokio::sync::Mutex;
58use tracing::debug;
59
60#[cfg(test)]
61mod tests;
62
63pub mod gas_price;
64
65#[derive(Debug, derive_more::Display)]
66pub enum Error {
67    #[display(fmt = "Genesis block is absent")]
68    NoGenesisBlock,
69    #[display(
70        fmt = "The block height {height} should be higher than the previous block height {previous_block}"
71    )]
72    BlockHeightShouldBeHigherThanPrevious {
73        height: BlockHeight,
74        previous_block: BlockHeight,
75    },
76    #[display(fmt = "Previous block height {_0} doesn't exist")]
77    MissingBlock(BlockHeight),
78    #[display(
79        fmt = "Best finalized da_height {best} is behind previous block da_height {previous_block}"
80    )]
81    InvalidDaFinalizationState {
82        best: DaBlockHeight,
83        previous_block: DaBlockHeight,
84    },
85}
86
87impl From<Error> for anyhow::Error {
88    fn from(error: Error) -> Self {
89        anyhow::Error::msg(error)
90    }
91}
92
93pub struct Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
94{
95    pub config: Config,
96    pub view_provider: ViewProvider,
97    pub txpool: TxPool,
98    pub executor: Arc<Executor>,
99    pub relayer: Box<dyn ports::Relayer>,
100    // use a tokio lock since we want callers to yield until the previous block
101    // execution has completed (which may take a while).
102    pub lock: Mutex<()>,
103    pub gas_price_provider: GasPriceProvider,
104    pub chain_state_info_provider: ChainStateProvider,
105}
106
107impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
108    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
109where
110    ViewProvider: AtomicView + 'static,
111    ViewProvider::LatestView: BlockProducerDatabase,
112    ChainStateProvider: ChainStateInfoProvider,
113{
114    pub async fn produce_and_execute_predefined<D>(
115        &self,
116        predefined_block: &Block,
117        deadline: D,
118    ) -> anyhow::Result<UncommittedResult<Changes>>
119    where
120        Executor: ports::BlockProducer<Vec<Transaction>, Deadline = D> + 'static,
121    {
122        let _production_guard = self.lock.try_lock().map_err(|_| {
123            anyhow!("Failed to acquire the production lock, block production is already in progress")
124        })?;
125
126        let mut transactions_source = predefined_block.transactions().to_vec();
127
128        let height = predefined_block.header().consensus().height;
129
130        let block_time = predefined_block.header().consensus().time;
131
132        let da_height = predefined_block.header().da_height();
133
134        let view = self.view_provider.latest_view()?;
135
136        let header_to_produce =
137            self.new_header_with_da_height(height, block_time, da_height, &view)?;
138
139        let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
140
141        if header_to_produce.height() <= &latest_height {
142            return Err(Error::BlockHeightShouldBeHigherThanPrevious {
143                height,
144                previous_block: latest_height,
145            }
146            .into())
147        }
148
149        let maybe_mint_tx = transactions_source.pop();
150        let mint_tx =
151            maybe_mint_tx
152                .and_then(|tx| tx.as_mint().cloned())
153                .ok_or(anyhow!(
154                    "The last transaction in the block should be a mint transaction"
155                ))?;
156
157        let gas_price = *mint_tx.gas_price();
158        let coinbase_recipient = mint_tx.input_contract().contract_id;
159
160        let component = Components {
161            header_to_produce,
162            transactions_source,
163            coinbase_recipient,
164            gas_price,
165        };
166
167        let result = self
168            .executor
169            .produce_without_commit(component, deadline)
170            .await
171            .map_err(Into::<anyhow::Error>::into)
172            .with_context(|| {
173                format!("Failed to produce block {height:?} due to execution failure")
174            })?;
175
176        debug!("Produced block with result: {:?}", result.result());
177        Ok(result)
178    }
179}
180impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
181    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
182where
183    ViewProvider: AtomicView + 'static,
184    ViewProvider::LatestView: BlockProducerDatabase,
185    GasPriceProvider: GasPriceProviderConstraint,
186    ChainStateProvider: ChainStateInfoProvider,
187{
188    /// Produces and execute block for the specified height.
189    async fn produce_and_execute<TxSource, F, Deadline>(
190        &self,
191        height: BlockHeight,
192        block_time: Tai64,
193        tx_source: impl FnOnce(u64, BlockHeight) -> F,
194        deadline: Deadline,
195    ) -> anyhow::Result<UncommittedResult<Changes>>
196    where
197        Executor: ports::BlockProducer<TxSource, Deadline = Deadline> + 'static,
198        F: Future<Output = anyhow::Result<TxSource>>,
199    {
200        //  - get previous block info (hash, root, etc)
201        //  - select best da_height from relayer
202        //  - get available txs from txpool
203        //  - select best txs based on factors like:
204        //      1. fees
205        //      2. parallel throughput
206        //  - Execute block with production mode to correctly malleate txs outputs and block headers
207
208        // prevent simultaneous block production calls
209        let _production_guard = self.lock.try_lock().map_err(|_| {
210            anyhow!("Failed to acquire the production lock, block production is already in progress")
211        })?;
212
213        let gas_price = self.production_gas_price().await?;
214
215        let source = tx_source(gas_price, height).await?;
216
217        let view = self.view_provider.latest_view()?;
218
219        let header = self
220            .new_header_with_new_da_height(height, block_time, &view)
221            .await?;
222
223        let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
224
225        if header.height() <= &latest_height {
226            return Err(Error::BlockHeightShouldBeHigherThanPrevious {
227                height,
228                previous_block: latest_height,
229            }
230            .into())
231        }
232
233        let component = Components {
234            header_to_produce: header,
235            transactions_source: source,
236            coinbase_recipient: self.config.coinbase_recipient.unwrap_or_default(),
237            gas_price,
238        };
239
240        // Store the context string in case we error.
241        let context_string =
242            format!("Failed to produce block {height:?} due to execution failure");
243        let result = self
244            .executor
245            .produce_without_commit(component, deadline)
246            .await
247            .map_err(Into::<anyhow::Error>::into)
248            .context(context_string)?;
249
250        debug!("Produced block with result: {:?}", result.result());
251        Ok(result)
252    }
253
254    async fn production_gas_price(&self) -> anyhow::Result<u64> {
255        self.gas_price_provider
256            .production_gas_price()
257            .map_err(|e| anyhow!("No gas price found: {e:?}"))
258    }
259
260    async fn dry_run_gas_price(&self) -> anyhow::Result<u64> {
261        self.gas_price_provider
262            .dry_run_gas_price()
263            .map_err(|e| anyhow!("No gas price found: {e:?}"))
264    }
265}
266
267impl<
268        ViewProvider,
269        TxPool,
270        Executor,
271        TxSource,
272        GasPriceProvider,
273        ChainStateProvider,
274        Deadline,
275    > Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
276where
277    ViewProvider: AtomicView + 'static,
278    ViewProvider::LatestView: BlockProducerDatabase,
279    TxPool: ports::TxPool<TxSource = TxSource> + 'static,
280    Executor: ports::BlockProducer<TxSource, Deadline = Deadline> + 'static,
281    GasPriceProvider: GasPriceProviderConstraint,
282    ChainStateProvider: ChainStateInfoProvider,
283{
284    /// Produces and execute block for the specified height with transactions from the `TxPool`.
285    pub async fn produce_and_execute_block_txpool(
286        &self,
287        height: BlockHeight,
288        block_time: Tai64,
289        deadline: Deadline,
290    ) -> anyhow::Result<UncommittedResult<Changes>> {
291        self.produce_and_execute::<TxSource, _, Deadline>(
292            height,
293            block_time,
294            |gas_price, height| self.txpool.get_source(gas_price, height),
295            deadline,
296        )
297        .await
298    }
299}
300
301impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
302    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
303where
304    ViewProvider: AtomicView + 'static,
305    ViewProvider::LatestView: BlockProducerDatabase,
306    Executor: ports::BlockProducer<Vec<Transaction>, Deadline = ()> + 'static,
307    GasPriceProvider: GasPriceProviderConstraint,
308    ChainStateProvider: ChainStateInfoProvider,
309{
310    /// Produces and execute block for the specified height with `transactions`.
311    pub async fn produce_and_execute_block_transactions(
312        &self,
313        height: BlockHeight,
314        block_time: Tai64,
315        transactions: Vec<Transaction>,
316    ) -> anyhow::Result<UncommittedResult<Changes>> {
317        self.produce_and_execute(
318            height,
319            block_time,
320            |_, _| async { Ok(transactions) },
321            (),
322        )
323        .await
324    }
325}
326
327impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
328    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
329where
330    ViewProvider: AtomicView + 'static,
331    ViewProvider::LatestView: BlockProducerDatabase,
332    Executor: ports::DryRunner + 'static,
333    GasPriceProvider: GasPriceProviderConstraint,
334    ChainStateProvider: ChainStateInfoProvider,
335{
336    /// Simulates multiple transactions without altering any state. Does not acquire the production lock.
337    /// since it is basically a "read only" operation and shouldn't get in the way of normal
338    /// production.
339    pub async fn dry_run(
340        &self,
341        transactions: Vec<Transaction>,
342        height: Option<BlockHeight>,
343        time: Option<Tai64>,
344        utxo_validation: Option<bool>,
345        gas_price: Option<u64>,
346    ) -> anyhow::Result<Vec<(Transaction, TransactionExecutionStatus)>> {
347        let view = self.view_provider.latest_view()?;
348        let latest_height = view.latest_height().unwrap_or_default();
349
350        let simulated_height = height.unwrap_or_else(|| {
351            latest_height
352                .succ()
353                .expect("It is impossible to overflow the current block height")
354        });
355
356        let simulated_time = time.unwrap_or_else(|| {
357            view.get_block(&latest_height)
358                .map(|block| block.header().time())
359                .unwrap_or(Tai64::UNIX_EPOCH)
360        });
361
362        let header = self.new_header(simulated_height, simulated_time, &view)?;
363
364        let gas_price = if let Some(inner) = gas_price {
365            inner
366        } else {
367            self.dry_run_gas_price().await?
368        };
369
370        // The dry run execution should use the state of the blockchain based on the
371        // last available block, not on the upcoming one. It means that we need to
372        // use the same configuration as the last block -> the same DA height.
373        // It is deterministic from the result perspective, plus it is more performant
374        // because we don't need to wait for the relayer to sync.
375        let component = Components {
376            header_to_produce: header,
377            transactions_source: transactions.clone(),
378            coinbase_recipient: self.config.coinbase_recipient.unwrap_or_default(),
379            gas_price,
380        };
381
382        let executor = self.executor.clone();
383
384        // use the blocking threadpool for dry_run to avoid clogging up the main async runtime
385        let txs = tokio_rayon::spawn_fifo(
386            move || -> anyhow::Result<Vec<(Transaction, TransactionExecutionStatus)>> {
387                Ok(executor.dry_run(component, utxo_validation, height)?)
388            },
389        )
390        .await?;
391
392        if txs.iter().any(|(transaction, tx_status)| {
393            transaction.is_script() && tx_status.result.receipts().is_empty()
394        }) {
395            Err(anyhow!("Expected at least one set of receipts"))
396        } else {
397            Ok(txs)
398        }
399    }
400}
401
402impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
403    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
404where
405    ViewProvider: HistoricalView + 'static,
406    ViewProvider::LatestView: BlockProducerDatabase,
407    Executor: ports::StorageReadReplayRecorder + 'static,
408    GasPriceProvider: GasPriceProviderConstraint,
409    ChainStateProvider: ChainStateInfoProvider,
410{
411    /// Re-executes an old block, getting the storage read events.
412    pub async fn storage_read_replay(
413        &self,
414        height: BlockHeight,
415    ) -> anyhow::Result<Vec<StorageReadReplayEvent>> {
416        let view = self.view_provider.latest_view()?;
417
418        let executor = self.executor.clone();
419
420        // use the blocking threadpool to avoid clogging up the main async runtime
421        tokio_rayon::spawn_fifo(move || {
422            let block = view.get_full_block(&height)?;
423            Ok(executor.storage_read_replay(&block)?)
424        })
425        .await
426    }
427}
428
429pub const NO_NEW_DA_HEIGHT_FOUND: &str = "No new da_height found";
430
431impl<ViewProvider, TxPool, Executor, GP, ChainStateProvider>
432    Producer<ViewProvider, TxPool, Executor, GP, ChainStateProvider>
433where
434    ViewProvider: AtomicView + 'static,
435    ViewProvider::LatestView: BlockProducerDatabase,
436    ChainStateProvider: ChainStateInfoProvider,
437{
438    /// Create the header for a new block at the provided height
439    async fn new_header_with_new_da_height(
440        &self,
441        height: BlockHeight,
442        block_time: Tai64,
443        view: &ViewProvider::LatestView,
444    ) -> anyhow::Result<PartialBlockHeader> {
445        let mut block_header = self.new_header(height, block_time, view)?;
446        let previous_da_height = block_header.da_height;
447        let gas_limit = self
448            .chain_state_info_provider
449            .consensus_params_at_version(&block_header.consensus_parameters_version)?
450            .block_gas_limit();
451        // We have a hard limit of u16::MAX transactions per block, including the final mint transactions.
452        // Therefore we choose the `new_da_height` to never include more than u16::MAX - 1 transactions in a block.
453        let new_da_height = self
454            .select_new_da_height(gas_limit, previous_da_height, u16::MAX - 1)
455            .await?;
456
457        block_header.application.da_height = new_da_height;
458
459        Ok(block_header)
460    }
461    /// Create the header for a new block at the provided height
462    fn new_header_with_da_height(
463        &self,
464        height: BlockHeight,
465        block_time: Tai64,
466        da_height: DaBlockHeight,
467        view: &ViewProvider::LatestView,
468    ) -> anyhow::Result<PartialBlockHeader> {
469        let mut block_header = self.new_header(height, block_time, view)?;
470        block_header.application.da_height = da_height;
471        Ok(block_header)
472    }
473
474    async fn select_new_da_height(
475        &self,
476        gas_limit: u64,
477        previous_da_height: DaBlockHeight,
478        transactions_limit: u16,
479    ) -> anyhow::Result<DaBlockHeight> {
480        let mut new_best = previous_da_height;
481        let mut total_cost: u64 = 0;
482        let transactions_limit: u64 = transactions_limit as u64;
483        let mut total_transactions: u64 = 0;
484        let highest = self
485            .relayer
486            .wait_for_at_least_height(&previous_da_height)
487            .await?;
488        if highest < previous_da_height {
489            return Err(Error::InvalidDaFinalizationState {
490                best: highest,
491                previous_block: previous_da_height,
492            }
493            .into());
494        }
495
496        if highest == previous_da_height {
497            return Ok(highest);
498        }
499
500        let next_da_height = previous_da_height.saturating_add(1);
501        for height in next_da_height..=highest.0 {
502            let RelayerBlockInfo { gas_cost, tx_count } = self
503                .relayer
504                .get_cost_and_transactions_number_for_block(&DaBlockHeight(height))
505                .await?;
506            total_cost = total_cost.saturating_add(gas_cost);
507            total_transactions = total_transactions.saturating_add(tx_count);
508            if total_cost > gas_limit || total_transactions > transactions_limit {
509                break;
510            }
511
512            new_best = DaBlockHeight(height);
513        }
514
515        if new_best == previous_da_height {
516            Err(anyhow!(NO_NEW_DA_HEIGHT_FOUND))
517        } else {
518            Ok(new_best)
519        }
520    }
521
522    fn new_header(
523        &self,
524        height: BlockHeight,
525        block_time: Tai64,
526        view: &ViewProvider::LatestView,
527    ) -> anyhow::Result<PartialBlockHeader> {
528        let previous_block_info = self.previous_block_info(height, view)?;
529        let consensus_parameters_version = view.latest_consensus_parameters_version()?;
530        let state_transition_bytecode_version =
531            view.latest_state_transition_bytecode_version()?;
532
533        Ok(PartialBlockHeader {
534            application: ApplicationHeader {
535                da_height: previous_block_info.da_height,
536                consensus_parameters_version,
537                state_transition_bytecode_version,
538                generated: Default::default(),
539            },
540            consensus: ConsensusHeader {
541                prev_root: previous_block_info.prev_root,
542                height,
543                time: block_time,
544                generated: Default::default(),
545            },
546        })
547    }
548
549    fn previous_block_info(
550        &self,
551        height: BlockHeight,
552        view: &ViewProvider::LatestView,
553    ) -> anyhow::Result<PreviousBlockInfo> {
554        let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
555
556        // get info from previous block height
557        let prev_height =
558            height
559                .pred()
560                .ok_or(Error::BlockHeightShouldBeHigherThanPrevious {
561                    height: 0u32.into(),
562                    previous_block: latest_height,
563                })?;
564        let previous_block = view.get_block(&prev_height)?;
565        let prev_root = view.block_header_merkle_root(&prev_height)?;
566
567        Ok(PreviousBlockInfo {
568            prev_root,
569            da_height: previous_block.header().da_height(),
570        })
571    }
572}
573
574struct PreviousBlockInfo {
575    prev_root: Bytes32,
576    da_height: DaBlockHeight,
577}