fuel_core_producer/
block_producer.rs

1use crate::{
2    block_producer::gas_price::{
3        ConsensusParametersProvider,
4        GasPriceProvider as GasPriceProviderConstraint,
5    },
6    ports::{
7        self,
8        BlockProducerDatabase,
9        RelayerBlockInfo,
10    },
11    Config,
12};
13use anyhow::{
14    anyhow,
15    Context,
16};
17use fuel_core_storage::transactional::{
18    AtomicView,
19    Changes,
20};
21use fuel_core_types::{
22    blockchain::{
23        block::Block,
24        header::{
25            ApplicationHeader,
26            ConsensusHeader,
27            PartialBlockHeader,
28        },
29        primitives::DaBlockHeight,
30    },
31    fuel_tx::{
32        field::{
33            InputContract,
34            MintGasPrice,
35        },
36        Transaction,
37    },
38    fuel_types::{
39        BlockHeight,
40        Bytes32,
41    },
42    services::{
43        block_producer::Components,
44        executor::{
45            TransactionExecutionStatus,
46            UncommittedResult,
47        },
48    },
49    tai64::Tai64,
50};
51use std::{
52    future::Future,
53    sync::Arc,
54};
55use tokio::sync::Mutex;
56use tracing::debug;
57
58#[cfg(test)]
59mod tests;
60
61pub mod gas_price;
62
63#[derive(Debug, derive_more::Display)]
64pub enum Error {
65    #[display(fmt = "Genesis block is absent")]
66    NoGenesisBlock,
67    #[display(
68        fmt = "The block height {height} should be higher than the previous block height {previous_block}"
69    )]
70    BlockHeightShouldBeHigherThanPrevious {
71        height: BlockHeight,
72        previous_block: BlockHeight,
73    },
74    #[display(fmt = "Previous block height {_0} doesn't exist")]
75    MissingBlock(BlockHeight),
76    #[display(
77        fmt = "Best finalized da_height {best} is behind previous block da_height {previous_block}"
78    )]
79    InvalidDaFinalizationState {
80        best: DaBlockHeight,
81        previous_block: DaBlockHeight,
82    },
83}
84
85impl From<Error> for anyhow::Error {
86    fn from(error: Error) -> Self {
87        anyhow::Error::msg(error)
88    }
89}
90
91pub struct Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ConsensusProvider> {
92    pub config: Config,
93    pub view_provider: ViewProvider,
94    pub txpool: TxPool,
95    pub executor: Arc<Executor>,
96    pub relayer: Box<dyn ports::Relayer>,
97    // use a tokio lock since we want callers to yield until the previous block
98    // execution has completed (which may take a while).
99    pub lock: Mutex<()>,
100    pub gas_price_provider: GasPriceProvider,
101    pub consensus_parameters_provider: ConsensusProvider,
102}
103
104impl<ViewProvider, TxPool, Executor, GasPriceProvider, ConsensusProvider>
105    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ConsensusProvider>
106where
107    ViewProvider: AtomicView + 'static,
108    ViewProvider::LatestView: BlockProducerDatabase,
109    ConsensusProvider: ConsensusParametersProvider,
110{
111    pub async fn produce_and_execute_predefined(
112        &self,
113        predefined_block: &Block,
114    ) -> anyhow::Result<UncommittedResult<Changes>>
115    where
116        Executor: ports::BlockProducer<Vec<Transaction>> + 'static,
117    {
118        let _production_guard = self.lock.try_lock().map_err(|_| {
119            anyhow!("Failed to acquire the production lock, block production is already in progress")
120        })?;
121
122        let mut transactions_source = predefined_block.transactions().to_vec();
123
124        let height = predefined_block.header().consensus().height;
125
126        let block_time = predefined_block.header().consensus().time;
127
128        let da_height = predefined_block.header().application().da_height;
129
130        let view = self.view_provider.latest_view()?;
131
132        let header_to_produce =
133            self.new_header_with_da_height(height, block_time, da_height, &view)?;
134
135        let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
136
137        if header_to_produce.height() <= &latest_height {
138            return Err(Error::BlockHeightShouldBeHigherThanPrevious {
139                height,
140                previous_block: latest_height,
141            }
142            .into())
143        }
144
145        let maybe_mint_tx = transactions_source.pop();
146        let mint_tx =
147            maybe_mint_tx
148                .and_then(|tx| tx.as_mint().cloned())
149                .ok_or(anyhow!(
150                    "The last transaction in the block should be a mint transaction"
151                ))?;
152
153        let gas_price = *mint_tx.gas_price();
154        let coinbase_recipient = mint_tx.input_contract().contract_id;
155
156        let component = Components {
157            header_to_produce,
158            transactions_source,
159            coinbase_recipient,
160            gas_price,
161        };
162
163        let result = self
164            .executor
165            .produce_without_commit(component)
166            .map_err(Into::<anyhow::Error>::into)
167            .with_context(|| {
168                format!("Failed to produce block {height:?} due to execution failure")
169            })?;
170
171        debug!("Produced block with result: {:?}", result.result());
172        Ok(result)
173    }
174}
175impl<ViewProvider, TxPool, Executor, GasPriceProvider, ConsensusProvider>
176    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ConsensusProvider>
177where
178    ViewProvider: AtomicView + 'static,
179    ViewProvider::LatestView: BlockProducerDatabase,
180    GasPriceProvider: GasPriceProviderConstraint,
181    ConsensusProvider: ConsensusParametersProvider,
182{
183    /// Produces and execute block for the specified height.
184    async fn produce_and_execute<TxSource, F>(
185        &self,
186        height: BlockHeight,
187        block_time: Tai64,
188        tx_source: impl FnOnce(u64, BlockHeight) -> F,
189    ) -> anyhow::Result<UncommittedResult<Changes>>
190    where
191        Executor: ports::BlockProducer<TxSource> + 'static,
192        F: Future<Output = anyhow::Result<TxSource>>,
193    {
194        //  - get previous block info (hash, root, etc)
195        //  - select best da_height from relayer
196        //  - get available txs from txpool
197        //  - select best txs based on factors like:
198        //      1. fees
199        //      2. parallel throughput
200        //  - Execute block with production mode to correctly malleate txs outputs and block headers
201
202        // prevent simultaneous block production calls
203        let _production_guard = self.lock.try_lock().map_err(|_| {
204            anyhow!("Failed to acquire the production lock, block production is already in progress")
205        })?;
206
207        let gas_price = self.production_gas_price().await?;
208
209        let source = tx_source(gas_price, height).await?;
210
211        let view = self.view_provider.latest_view()?;
212
213        let header = self
214            .new_header_with_new_da_height(height, block_time, &view)
215            .await?;
216
217        let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
218
219        if header.height() <= &latest_height {
220            return Err(Error::BlockHeightShouldBeHigherThanPrevious {
221                height,
222                previous_block: latest_height,
223            }
224            .into())
225        }
226
227        let component = Components {
228            header_to_produce: header,
229            transactions_source: source,
230            coinbase_recipient: self.config.coinbase_recipient.unwrap_or_default(),
231            gas_price,
232        };
233
234        // Store the context string in case we error.
235        let context_string =
236            format!("Failed to produce block {height:?} due to execution failure");
237        let result = self
238            .executor
239            .produce_without_commit(component)
240            .map_err(Into::<anyhow::Error>::into)
241            .context(context_string)?;
242
243        debug!("Produced block with result: {:?}", result.result());
244        Ok(result)
245    }
246
247    async fn production_gas_price(&self) -> anyhow::Result<u64> {
248        self.gas_price_provider
249            .production_gas_price()
250            .map_err(|e| anyhow!("No gas price found: {e:?}"))
251    }
252
253    async fn dry_run_gas_price(&self) -> anyhow::Result<u64> {
254        self.gas_price_provider
255            .dry_run_gas_price()
256            .map_err(|e| anyhow!("No gas price found: {e:?}"))
257    }
258}
259
260impl<ViewProvider, TxPool, Executor, TxSource, GasPriceProvider, ConsensusProvider>
261    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ConsensusProvider>
262where
263    ViewProvider: AtomicView + 'static,
264    ViewProvider::LatestView: BlockProducerDatabase,
265    TxPool: ports::TxPool<TxSource = TxSource> + 'static,
266    Executor: ports::BlockProducer<TxSource> + 'static,
267    GasPriceProvider: GasPriceProviderConstraint,
268    ConsensusProvider: ConsensusParametersProvider,
269{
270    /// Produces and execute block for the specified height with transactions from the `TxPool`.
271    pub async fn produce_and_execute_block_txpool(
272        &self,
273        height: BlockHeight,
274        block_time: Tai64,
275    ) -> anyhow::Result<UncommittedResult<Changes>> {
276        self.produce_and_execute::<TxSource, _>(
277            height,
278            block_time,
279            |gas_price, height| self.txpool.get_source(gas_price, height),
280        )
281        .await
282    }
283}
284
285impl<ViewProvider, TxPool, Executor, GasPriceProvider, ConsensusProvider>
286    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ConsensusProvider>
287where
288    ViewProvider: AtomicView + 'static,
289    ViewProvider::LatestView: BlockProducerDatabase,
290    Executor: ports::BlockProducer<Vec<Transaction>> + 'static,
291    GasPriceProvider: GasPriceProviderConstraint,
292    ConsensusProvider: ConsensusParametersProvider,
293{
294    /// Produces and execute block for the specified height with `transactions`.
295    pub async fn produce_and_execute_block_transactions(
296        &self,
297        height: BlockHeight,
298        block_time: Tai64,
299        transactions: Vec<Transaction>,
300    ) -> anyhow::Result<UncommittedResult<Changes>> {
301        self.produce_and_execute(height, block_time, |_, _| async { Ok(transactions) })
302            .await
303    }
304}
305
306impl<ViewProvider, TxPool, Executor, GasPriceProvider, ConsensusProvider>
307    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ConsensusProvider>
308where
309    ViewProvider: AtomicView + 'static,
310    ViewProvider::LatestView: BlockProducerDatabase,
311    Executor: ports::DryRunner + 'static,
312    GasPriceProvider: GasPriceProviderConstraint,
313    ConsensusProvider: ConsensusParametersProvider,
314{
315    /// Simulates multiple transactions without altering any state. Does not acquire the production lock.
316    /// since it is basically a "read only" operation and shouldn't get in the way of normal
317    /// production.
318    pub async fn dry_run(
319        &self,
320        transactions: Vec<Transaction>,
321        height: Option<BlockHeight>,
322        time: Option<Tai64>,
323        utxo_validation: Option<bool>,
324        gas_price: Option<u64>,
325    ) -> anyhow::Result<Vec<TransactionExecutionStatus>> {
326        let view = self.view_provider.latest_view()?;
327        let latest_height = view.latest_height().unwrap_or_default();
328
329        let simulated_height = height.unwrap_or_else(|| {
330            latest_height
331                .succ()
332                .expect("It is impossible to overflow the current block height")
333        });
334
335        let simulated_time = time.unwrap_or_else(|| {
336            view.get_block(&latest_height)
337                .map(|block| block.header().time())
338                .unwrap_or(Tai64::UNIX_EPOCH)
339        });
340
341        let header = self.new_header(simulated_height, simulated_time, &view)?;
342
343        let gas_price = if let Some(inner) = gas_price {
344            inner
345        } else {
346            self.dry_run_gas_price().await?
347        };
348
349        // The dry run execution should use the state of the blockchain based on the
350        // last available block, not on the upcoming one. It means that we need to
351        // use the same configuration as the last block -> the same DA height.
352        // It is deterministic from the result perspective, plus it is more performant
353        // because we don't need to wait for the relayer to sync.
354        let component = Components {
355            header_to_produce: header,
356            transactions_source: transactions.clone(),
357            coinbase_recipient: self.config.coinbase_recipient.unwrap_or_default(),
358            gas_price,
359        };
360
361        let executor = self.executor.clone();
362
363        // use the blocking threadpool for dry_run to avoid clogging up the main async runtime
364        let tx_statuses = tokio_rayon::spawn_fifo(
365            move || -> anyhow::Result<Vec<TransactionExecutionStatus>> {
366                Ok(executor.dry_run(component, utxo_validation)?)
367            },
368        )
369        .await?;
370
371        if transactions
372            .iter()
373            .zip(tx_statuses.iter())
374            .any(|(transaction, tx_status)| {
375                transaction.is_script() && tx_status.result.receipts().is_empty()
376            })
377        {
378            Err(anyhow!("Expected at least one set of receipts"))
379        } else {
380            Ok(tx_statuses)
381        }
382    }
383}
384
385pub const NO_NEW_DA_HEIGHT_FOUND: &str = "No new da_height found";
386
387impl<ViewProvider, TxPool, Executor, GP, ConsensusProvider>
388    Producer<ViewProvider, TxPool, Executor, GP, ConsensusProvider>
389where
390    ViewProvider: AtomicView + 'static,
391    ViewProvider::LatestView: BlockProducerDatabase,
392    ConsensusProvider: ConsensusParametersProvider,
393{
394    /// Create the header for a new block at the provided height
395    async fn new_header_with_new_da_height(
396        &self,
397        height: BlockHeight,
398        block_time: Tai64,
399        view: &ViewProvider::LatestView,
400    ) -> anyhow::Result<PartialBlockHeader> {
401        let mut block_header = self.new_header(height, block_time, view)?;
402        let previous_da_height = block_header.da_height;
403        let gas_limit = self
404            .consensus_parameters_provider
405            .consensus_params_at_version(&block_header.consensus_parameters_version)?
406            .block_gas_limit();
407        // We have a hard limit of u16::MAX transactions per block, including the final mint transactions.
408        // Therefore we choose the `new_da_height` to never include more than u16::MAX - 1 transactions in a block.
409        let new_da_height = self
410            .select_new_da_height(gas_limit, previous_da_height, u16::MAX - 1)
411            .await?;
412
413        block_header.application.da_height = new_da_height;
414
415        Ok(block_header)
416    }
417    /// Create the header for a new block at the provided height
418    fn new_header_with_da_height(
419        &self,
420        height: BlockHeight,
421        block_time: Tai64,
422        da_height: DaBlockHeight,
423        view: &ViewProvider::LatestView,
424    ) -> anyhow::Result<PartialBlockHeader> {
425        let mut block_header = self.new_header(height, block_time, view)?;
426        block_header.application.da_height = da_height;
427        Ok(block_header)
428    }
429
430    async fn select_new_da_height(
431        &self,
432        gas_limit: u64,
433        previous_da_height: DaBlockHeight,
434        transactions_limit: u16,
435    ) -> anyhow::Result<DaBlockHeight> {
436        let mut new_best = previous_da_height;
437        let mut total_cost: u64 = 0;
438        let transactions_limit: u64 = transactions_limit as u64;
439        let mut total_transactions: u64 = 0;
440        let highest = self
441            .relayer
442            .wait_for_at_least_height(&previous_da_height)
443            .await?;
444        if highest < previous_da_height {
445            return Err(Error::InvalidDaFinalizationState {
446                best: highest,
447                previous_block: previous_da_height,
448            }
449            .into());
450        }
451
452        if highest == previous_da_height {
453            return Ok(highest);
454        }
455
456        let next_da_height = previous_da_height.saturating_add(1);
457        for height in next_da_height..=highest.0 {
458            let RelayerBlockInfo { gas_cost, tx_count } = self
459                .relayer
460                .get_cost_and_transactions_number_for_block(&DaBlockHeight(height))
461                .await?;
462            total_cost = total_cost.saturating_add(gas_cost);
463            total_transactions = total_transactions.saturating_add(tx_count);
464            if total_cost > gas_limit || total_transactions > transactions_limit {
465                break;
466            }
467
468            new_best = DaBlockHeight(height);
469        }
470
471        if new_best == previous_da_height {
472            Err(anyhow!(NO_NEW_DA_HEIGHT_FOUND))
473        } else {
474            Ok(new_best)
475        }
476    }
477
478    fn new_header(
479        &self,
480        height: BlockHeight,
481        block_time: Tai64,
482        view: &ViewProvider::LatestView,
483    ) -> anyhow::Result<PartialBlockHeader> {
484        let previous_block_info = self.previous_block_info(height, view)?;
485        let consensus_parameters_version = view.latest_consensus_parameters_version()?;
486        let state_transition_bytecode_version =
487            view.latest_state_transition_bytecode_version()?;
488
489        Ok(PartialBlockHeader {
490            application: ApplicationHeader {
491                da_height: previous_block_info.da_height,
492                consensus_parameters_version,
493                state_transition_bytecode_version,
494                generated: Default::default(),
495            },
496            consensus: ConsensusHeader {
497                prev_root: previous_block_info.prev_root,
498                height,
499                time: block_time,
500                generated: Default::default(),
501            },
502        })
503    }
504
505    fn previous_block_info(
506        &self,
507        height: BlockHeight,
508        view: &ViewProvider::LatestView,
509    ) -> anyhow::Result<PreviousBlockInfo> {
510        let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
511
512        // get info from previous block height
513        let prev_height =
514            height
515                .pred()
516                .ok_or(Error::BlockHeightShouldBeHigherThanPrevious {
517                    height: 0u32.into(),
518                    previous_block: latest_height,
519                })?;
520        let previous_block = view.get_block(&prev_height)?;
521        let prev_root = view.block_header_merkle_root(&prev_height)?;
522
523        Ok(PreviousBlockInfo {
524            prev_root,
525            da_height: previous_block.header().da_height,
526        })
527    }
528}
529
530struct PreviousBlockInfo {
531    prev_root: Bytes32,
532    da_height: DaBlockHeight,
533}