1use crate::{
2 block_producer::gas_price::{
3 ChainStateInfoProvider,
4 GasPriceProvider as GasPriceProviderConstraint,
5 },
6 ports::{
7 self,
8 BlockProducerDatabase,
9 RelayerBlockInfo,
10 },
11 Config,
12};
13use anyhow::{
14 anyhow,
15 Context,
16};
17use fuel_core_storage::transactional::{
18 AtomicView,
19 Changes,
20 HistoricalView,
21};
22use fuel_core_types::{
23 blockchain::{
24 block::Block,
25 header::{
26 ApplicationHeader,
27 ConsensusHeader,
28 PartialBlockHeader,
29 },
30 primitives::DaBlockHeight,
31 },
32 fuel_tx::{
33 field::{
34 InputContract,
35 MintGasPrice,
36 },
37 Transaction,
38 },
39 fuel_types::{
40 BlockHeight,
41 Bytes32,
42 },
43 services::{
44 block_producer::Components,
45 executor::{
46 StorageReadReplayEvent,
47 TransactionExecutionStatus,
48 UncommittedResult,
49 },
50 },
51 tai64::Tai64,
52};
53use std::{
54 future::Future,
55 sync::Arc,
56};
57use tokio::sync::Mutex;
58use tracing::debug;
59
60#[cfg(test)]
61mod tests;
62
63pub mod gas_price;
64
65#[derive(Debug, derive_more::Display)]
66pub enum Error {
67 #[display(fmt = "Genesis block is absent")]
68 NoGenesisBlock,
69 #[display(
70 fmt = "The block height {height} should be higher than the previous block height {previous_block}"
71 )]
72 BlockHeightShouldBeHigherThanPrevious {
73 height: BlockHeight,
74 previous_block: BlockHeight,
75 },
76 #[display(fmt = "Previous block height {_0} doesn't exist")]
77 MissingBlock(BlockHeight),
78 #[display(
79 fmt = "Best finalized da_height {best} is behind previous block da_height {previous_block}"
80 )]
81 InvalidDaFinalizationState {
82 best: DaBlockHeight,
83 previous_block: DaBlockHeight,
84 },
85}
86
87impl From<Error> for anyhow::Error {
88 fn from(error: Error) -> Self {
89 anyhow::Error::msg(error)
90 }
91}
92
93pub struct Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
94{
95 pub config: Config,
96 pub view_provider: ViewProvider,
97 pub txpool: TxPool,
98 pub executor: Arc<Executor>,
99 pub relayer: Box<dyn ports::Relayer>,
100 pub lock: Mutex<()>,
103 pub gas_price_provider: GasPriceProvider,
104 pub chain_state_info_provider: ChainStateProvider,
105}
106
107impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
108 Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
109where
110 ViewProvider: AtomicView + 'static,
111 ViewProvider::LatestView: BlockProducerDatabase,
112 ChainStateProvider: ChainStateInfoProvider,
113{
114 pub async fn produce_and_execute_predefined<D>(
115 &self,
116 predefined_block: &Block,
117 deadline: D,
118 ) -> anyhow::Result<UncommittedResult<Changes>>
119 where
120 Executor: ports::BlockProducer<Vec<Transaction>, Deadline = D> + 'static,
121 {
122 let _production_guard = self.lock.try_lock().map_err(|_| {
123 anyhow!("Failed to acquire the production lock, block production is already in progress")
124 })?;
125
126 let mut transactions_source = predefined_block.transactions().to_vec();
127
128 let height = predefined_block.header().consensus().height;
129
130 let block_time = predefined_block.header().consensus().time;
131
132 let da_height = predefined_block.header().da_height();
133
134 let view = self.view_provider.latest_view()?;
135
136 let header_to_produce =
137 self.new_header_with_da_height(height, block_time, da_height, &view)?;
138
139 let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
140
141 if header_to_produce.height() <= &latest_height {
142 return Err(Error::BlockHeightShouldBeHigherThanPrevious {
143 height,
144 previous_block: latest_height,
145 }
146 .into())
147 }
148
149 let maybe_mint_tx = transactions_source.pop();
150 let mint_tx =
151 maybe_mint_tx
152 .and_then(|tx| tx.as_mint().cloned())
153 .ok_or(anyhow!(
154 "The last transaction in the block should be a mint transaction"
155 ))?;
156
157 let gas_price = *mint_tx.gas_price();
158 let coinbase_recipient = mint_tx.input_contract().contract_id;
159
160 let component = Components {
161 header_to_produce,
162 transactions_source,
163 coinbase_recipient,
164 gas_price,
165 };
166
167 let result = self
168 .executor
169 .produce_without_commit(component, deadline)
170 .await
171 .map_err(Into::<anyhow::Error>::into)
172 .with_context(|| {
173 format!("Failed to produce block {height:?} due to execution failure")
174 })?;
175
176 debug!("Produced block with result: {:?}", result.result());
177 Ok(result)
178 }
179}
180impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
181 Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
182where
183 ViewProvider: AtomicView + 'static,
184 ViewProvider::LatestView: BlockProducerDatabase,
185 GasPriceProvider: GasPriceProviderConstraint,
186 ChainStateProvider: ChainStateInfoProvider,
187{
188 async fn produce_and_execute<TxSource, F, Deadline>(
190 &self,
191 height: BlockHeight,
192 block_time: Tai64,
193 tx_source: impl FnOnce(u64, BlockHeight) -> F,
194 deadline: Deadline,
195 ) -> anyhow::Result<UncommittedResult<Changes>>
196 where
197 Executor: ports::BlockProducer<TxSource, Deadline = Deadline> + 'static,
198 F: Future<Output = anyhow::Result<TxSource>>,
199 {
200 let _production_guard = self.lock.try_lock().map_err(|_| {
210 anyhow!("Failed to acquire the production lock, block production is already in progress")
211 })?;
212
213 let gas_price = self.production_gas_price().await?;
214
215 let source = tx_source(gas_price, height).await?;
216
217 let view = self.view_provider.latest_view()?;
218
219 let header = self
220 .new_header_with_new_da_height(height, block_time, &view)
221 .await?;
222
223 let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
224
225 if header.height() <= &latest_height {
226 return Err(Error::BlockHeightShouldBeHigherThanPrevious {
227 height,
228 previous_block: latest_height,
229 }
230 .into())
231 }
232
233 let component = Components {
234 header_to_produce: header,
235 transactions_source: source,
236 coinbase_recipient: self.config.coinbase_recipient.unwrap_or_default(),
237 gas_price,
238 };
239
240 let context_string =
242 format!("Failed to produce block {height:?} due to execution failure");
243 let result = self
244 .executor
245 .produce_without_commit(component, deadline)
246 .await
247 .map_err(Into::<anyhow::Error>::into)
248 .context(context_string)?;
249
250 debug!("Produced block with result: {:?}", result.result());
251 Ok(result)
252 }
253
254 async fn production_gas_price(&self) -> anyhow::Result<u64> {
255 self.gas_price_provider
256 .production_gas_price()
257 .map_err(|e| anyhow!("No gas price found: {e:?}"))
258 }
259
260 async fn dry_run_gas_price(&self) -> anyhow::Result<u64> {
261 self.gas_price_provider
262 .dry_run_gas_price()
263 .map_err(|e| anyhow!("No gas price found: {e:?}"))
264 }
265}
266
267impl<
268 ViewProvider,
269 TxPool,
270 Executor,
271 TxSource,
272 GasPriceProvider,
273 ChainStateProvider,
274 Deadline,
275 > Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
276where
277 ViewProvider: AtomicView + 'static,
278 ViewProvider::LatestView: BlockProducerDatabase,
279 TxPool: ports::TxPool<TxSource = TxSource> + 'static,
280 Executor: ports::BlockProducer<TxSource, Deadline = Deadline> + 'static,
281 GasPriceProvider: GasPriceProviderConstraint,
282 ChainStateProvider: ChainStateInfoProvider,
283{
284 pub async fn produce_and_execute_block_txpool(
286 &self,
287 height: BlockHeight,
288 block_time: Tai64,
289 deadline: Deadline,
290 ) -> anyhow::Result<UncommittedResult<Changes>> {
291 self.produce_and_execute::<TxSource, _, Deadline>(
292 height,
293 block_time,
294 |gas_price, height| self.txpool.get_source(gas_price, height),
295 deadline,
296 )
297 .await
298 }
299}
300
301impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
302 Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
303where
304 ViewProvider: AtomicView + 'static,
305 ViewProvider::LatestView: BlockProducerDatabase,
306 Executor: ports::BlockProducer<Vec<Transaction>, Deadline = ()> + 'static,
307 GasPriceProvider: GasPriceProviderConstraint,
308 ChainStateProvider: ChainStateInfoProvider,
309{
310 pub async fn produce_and_execute_block_transactions(
312 &self,
313 height: BlockHeight,
314 block_time: Tai64,
315 transactions: Vec<Transaction>,
316 ) -> anyhow::Result<UncommittedResult<Changes>> {
317 self.produce_and_execute(
318 height,
319 block_time,
320 |_, _| async { Ok(transactions) },
321 (),
322 )
323 .await
324 }
325}
326
327impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
328 Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
329where
330 ViewProvider: AtomicView + 'static,
331 ViewProvider::LatestView: BlockProducerDatabase,
332 Executor: ports::DryRunner + 'static,
333 GasPriceProvider: GasPriceProviderConstraint,
334 ChainStateProvider: ChainStateInfoProvider,
335{
336 pub async fn dry_run(
340 &self,
341 transactions: Vec<Transaction>,
342 height: Option<BlockHeight>,
343 time: Option<Tai64>,
344 utxo_validation: Option<bool>,
345 gas_price: Option<u64>,
346 ) -> anyhow::Result<Vec<(Transaction, TransactionExecutionStatus)>> {
347 let view = self.view_provider.latest_view()?;
348 let latest_height = view.latest_height().unwrap_or_default();
349
350 let simulated_height = height.unwrap_or_else(|| {
351 latest_height
352 .succ()
353 .expect("It is impossible to overflow the current block height")
354 });
355
356 let simulated_time = time.unwrap_or_else(|| {
357 view.get_block(&latest_height)
358 .map(|block| block.header().time())
359 .unwrap_or(Tai64::UNIX_EPOCH)
360 });
361
362 let header = self.new_header(simulated_height, simulated_time, &view)?;
363
364 let gas_price = if let Some(inner) = gas_price {
365 inner
366 } else {
367 self.dry_run_gas_price().await?
368 };
369
370 let component = Components {
376 header_to_produce: header,
377 transactions_source: transactions.clone(),
378 coinbase_recipient: self.config.coinbase_recipient.unwrap_or_default(),
379 gas_price,
380 };
381
382 let executor = self.executor.clone();
383
384 let txs = tokio_rayon::spawn_fifo(
386 move || -> anyhow::Result<Vec<(Transaction, TransactionExecutionStatus)>> {
387 Ok(executor.dry_run(component, utxo_validation, height)?)
388 },
389 )
390 .await?;
391
392 if txs.iter().any(|(transaction, tx_status)| {
393 transaction.is_script() && tx_status.result.receipts().is_empty()
394 }) {
395 Err(anyhow!("Expected at least one set of receipts"))
396 } else {
397 Ok(txs)
398 }
399 }
400}
401
402impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
403 Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
404where
405 ViewProvider: HistoricalView + 'static,
406 ViewProvider::LatestView: BlockProducerDatabase,
407 Executor: ports::StorageReadReplayRecorder + 'static,
408 GasPriceProvider: GasPriceProviderConstraint,
409 ChainStateProvider: ChainStateInfoProvider,
410{
411 pub async fn storage_read_replay(
413 &self,
414 height: BlockHeight,
415 ) -> anyhow::Result<Vec<StorageReadReplayEvent>> {
416 let view = self.view_provider.latest_view()?;
417
418 let executor = self.executor.clone();
419
420 tokio_rayon::spawn_fifo(move || {
422 let block = view.get_full_block(&height)?;
423 Ok(executor.storage_read_replay(&block)?)
424 })
425 .await
426 }
427}
428
429pub const NO_NEW_DA_HEIGHT_FOUND: &str = "No new da_height found";
430
431impl<ViewProvider, TxPool, Executor, GP, ChainStateProvider>
432 Producer<ViewProvider, TxPool, Executor, GP, ChainStateProvider>
433where
434 ViewProvider: AtomicView + 'static,
435 ViewProvider::LatestView: BlockProducerDatabase,
436 ChainStateProvider: ChainStateInfoProvider,
437{
438 async fn new_header_with_new_da_height(
440 &self,
441 height: BlockHeight,
442 block_time: Tai64,
443 view: &ViewProvider::LatestView,
444 ) -> anyhow::Result<PartialBlockHeader> {
445 let mut block_header = self.new_header(height, block_time, view)?;
446 let previous_da_height = block_header.da_height;
447 let gas_limit = self
448 .chain_state_info_provider
449 .consensus_params_at_version(&block_header.consensus_parameters_version)?
450 .block_gas_limit();
451 let new_da_height = self
454 .select_new_da_height(gas_limit, previous_da_height, u16::MAX - 1)
455 .await?;
456
457 block_header.application.da_height = new_da_height;
458
459 Ok(block_header)
460 }
461 fn new_header_with_da_height(
463 &self,
464 height: BlockHeight,
465 block_time: Tai64,
466 da_height: DaBlockHeight,
467 view: &ViewProvider::LatestView,
468 ) -> anyhow::Result<PartialBlockHeader> {
469 let mut block_header = self.new_header(height, block_time, view)?;
470 block_header.application.da_height = da_height;
471 Ok(block_header)
472 }
473
474 async fn select_new_da_height(
475 &self,
476 gas_limit: u64,
477 previous_da_height: DaBlockHeight,
478 transactions_limit: u16,
479 ) -> anyhow::Result<DaBlockHeight> {
480 let mut new_best = previous_da_height;
481 let mut total_cost: u64 = 0;
482 let transactions_limit: u64 = transactions_limit as u64;
483 let mut total_transactions: u64 = 0;
484 let highest = self
485 .relayer
486 .wait_for_at_least_height(&previous_da_height)
487 .await?;
488 if highest < previous_da_height {
489 return Err(Error::InvalidDaFinalizationState {
490 best: highest,
491 previous_block: previous_da_height,
492 }
493 .into());
494 }
495
496 if highest == previous_da_height {
497 return Ok(highest);
498 }
499
500 let next_da_height = previous_da_height.saturating_add(1);
501 for height in next_da_height..=highest.0 {
502 let RelayerBlockInfo { gas_cost, tx_count } = self
503 .relayer
504 .get_cost_and_transactions_number_for_block(&DaBlockHeight(height))
505 .await?;
506 total_cost = total_cost.saturating_add(gas_cost);
507 total_transactions = total_transactions.saturating_add(tx_count);
508 if total_cost > gas_limit || total_transactions > transactions_limit {
509 break;
510 }
511
512 new_best = DaBlockHeight(height);
513 }
514
515 if new_best == previous_da_height {
516 Err(anyhow!(NO_NEW_DA_HEIGHT_FOUND))
517 } else {
518 Ok(new_best)
519 }
520 }
521
522 fn new_header(
523 &self,
524 height: BlockHeight,
525 block_time: Tai64,
526 view: &ViewProvider::LatestView,
527 ) -> anyhow::Result<PartialBlockHeader> {
528 let previous_block_info = self.previous_block_info(height, view)?;
529 let consensus_parameters_version = view.latest_consensus_parameters_version()?;
530 let state_transition_bytecode_version =
531 view.latest_state_transition_bytecode_version()?;
532
533 Ok(PartialBlockHeader {
534 application: ApplicationHeader {
535 da_height: previous_block_info.da_height,
536 consensus_parameters_version,
537 state_transition_bytecode_version,
538 generated: Default::default(),
539 },
540 consensus: ConsensusHeader {
541 prev_root: previous_block_info.prev_root,
542 height,
543 time: block_time,
544 generated: Default::default(),
545 },
546 })
547 }
548
549 fn previous_block_info(
550 &self,
551 height: BlockHeight,
552 view: &ViewProvider::LatestView,
553 ) -> anyhow::Result<PreviousBlockInfo> {
554 let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
555
556 let prev_height =
558 height
559 .pred()
560 .ok_or(Error::BlockHeightShouldBeHigherThanPrevious {
561 height: 0u32.into(),
562 previous_block: latest_height,
563 })?;
564 let previous_block = view.get_block(&prev_height)?;
565 let prev_root = view.block_header_merkle_root(&prev_height)?;
566
567 Ok(PreviousBlockInfo {
568 prev_root,
569 da_height: previous_block.header().da_height(),
570 })
571 }
572}
573
574struct PreviousBlockInfo {
575 prev_root: Bytes32,
576 da_height: DaBlockHeight,
577}