1use crate::{
2 deadline_clock::{
3 DeadlineClock,
4 OnConflict,
5 },
6 ports::{
7 BlockDb,
8 BlockProducer,
9 DBTransaction,
10 },
11 Config,
12 Trigger,
13};
14use anyhow::{
15 anyhow,
16 Context,
17};
18use fuel_core_interfaces::{
19 block_importer::ImportBlockBroadcast,
20 common::{
21 fuel_tx::UniqueIdentifier,
22 prelude::{
23 Signature,
24 Word,
25 },
26 secrecy::{
27 ExposeSecret,
28 Secret,
29 },
30 },
31 executor::{
32 ExecutionResult,
33 UncommittedResult,
34 },
35 model::{
36 BlockHeight,
37 FuelBlock,
38 FuelBlockConsensus,
39 FuelBlockPoAConsensus,
40 SecretKeyWrapper,
41 },
42 poa_coordinator::TransactionPool,
43 txpool::TxStatus,
44};
45use parking_lot::Mutex;
46use std::{
47 ops::Deref,
48 sync::Arc,
49};
50use tokio::{
51 sync::{
52 broadcast,
53 mpsc,
54 },
55 task::JoinHandle,
56 time::Instant,
57};
58use tracing::{
59 error,
60 warn,
61};
62
63pub struct RunningService {
64 join: JoinHandle<()>,
65 stop: mpsc::Sender<()>,
66}
67
68pub struct Service {
69 running: Mutex<Option<RunningService>>,
70 config: Config,
71}
72
73impl Service {
74 pub fn new(config: &Config) -> Self {
75 Self {
76 running: Mutex::new(None),
77 config: config.clone(),
78 }
79 }
80
81 pub async fn start<D, T, B>(
82 &self,
83 txpool_broadcast: broadcast::Receiver<TxStatus>,
84 txpool: T,
85 import_block_events_tx: broadcast::Sender<ImportBlockBroadcast>,
86 block_producer: B,
87 db: D,
88 ) where
89 D: BlockDb + Send + Clone + 'static,
90 T: TransactionPool + Send + Sync + 'static,
91 B: BlockProducer<D> + 'static,
92 {
93 let mut running = self.running.lock();
94
95 if running.is_some() {
96 warn!("Trying to start a service that is already running");
97 return
98 }
99
100 let (stop_tx, stop_rx) = mpsc::channel(1);
101
102 let task = Task {
103 stop: stop_rx,
104 block_gas_limit: self.config.block_gas_limit,
105 signing_key: self.config.signing_key.clone(),
106 db,
107 block_producer,
108 txpool_broadcast,
109 txpool,
110 last_block_created: Instant::now(),
111 import_block_events_tx,
112 trigger: self.config.trigger,
113 timer: DeadlineClock::new(),
114 };
115
116 *running = Some(RunningService {
117 join: tokio::spawn(task.run()),
118 stop: stop_tx,
119 });
120 }
121
122 pub async fn stop(&self) -> Option<JoinHandle<()>> {
123 let maybe_running = self.running.lock().take();
124 if let Some(running) = maybe_running {
125 let _ = running.stop.send(()).await;
127 Some(running.join)
128 } else {
129 warn!("Trying to stop a service that is not running");
130 None
131 }
132 }
133}
134
135pub struct Task<D, T, B>
136where
137 D: BlockDb + Send + Sync,
138 T: TransactionPool,
139 B: BlockProducer<D>,
140{
141 stop: mpsc::Receiver<()>,
142 block_gas_limit: Word,
143 signing_key: Option<Secret<SecretKeyWrapper>>,
144 db: D,
145 block_producer: B,
146 txpool: T,
147 txpool_broadcast: broadcast::Receiver<TxStatus>,
148 import_block_events_tx: broadcast::Sender<ImportBlockBroadcast>,
149 last_block_created: Instant,
153 trigger: Trigger,
154 timer: DeadlineClock,
156}
157
158impl<D, T, B> Task<D, T, B>
159where
160 D: BlockDb + Send,
161 T: TransactionPool,
162 B: BlockProducer<D>,
163{
164 async fn signal_produce_block(
166 &mut self,
167 ) -> anyhow::Result<UncommittedResult<DBTransaction<D>>> {
168 let current_height = self
169 .db
170 .block_height()
171 .map_err(|err| anyhow::format_err!("db error {err:?}"))?;
172 let height = BlockHeight::from(current_height.as_usize() + 1);
173
174 self.block_producer
175 .produce_and_execute_block(height, self.block_gas_limit)
176 .await
177 }
178
179 async fn produce_block(&mut self) -> anyhow::Result<()> {
180 if self.signing_key.is_none() {
182 return Err(anyhow!("unable to produce blocks without a consensus key"))
183 }
184
185 let (
187 ExecutionResult {
188 block,
189 skipped_transactions,
190 ..
191 },
192 mut db_transaction,
193 ) = self.signal_produce_block().await?.into();
194
195 seal_block(&self.signing_key, &block, db_transaction.database_mut())?;
197 db_transaction.commit_box()?;
198
199 let mut tx_ids_to_remove = Vec::with_capacity(skipped_transactions.len());
200 for (tx, err) in skipped_transactions {
201 error!(
202 "During block production got invalid transaction {:?} with error {:?}",
203 tx, err
204 );
205 tx_ids_to_remove.push(tx.id());
206 }
207
208 if let Err(err) = self.txpool.remove_txs(tx_ids_to_remove).await {
209 error!(
210 "Unable to clean up skipped transaction from `TxPool` with error {:?}",
211 err
212 );
213 };
214
215 self.import_block_events_tx
218 .send(ImportBlockBroadcast::PendingFuelBlockImported {
219 block: Arc::new(block),
220 })
221 .expect("Failed to import the generated block");
222
223 self.last_block_created = Instant::now();
225
226 match self.trigger {
228 Trigger::Never => {
229 unreachable!("This mode will never produce blocks");
230 }
231 Trigger::Instant => {}
232 Trigger::Interval { block_time } => {
233 self.timer.set_timeout(block_time, OnConflict::Min).await;
235 }
236 Trigger::Hybrid {
237 max_block_time,
238 min_block_time,
239 max_tx_idle_time,
240 } => {
241 self.timer
242 .set_timeout(max_block_time, OnConflict::Min)
243 .await;
244
245 let consumable_gas = self.txpool.total_consumable_gas().await?;
246
247 if consumable_gas > self.block_gas_limit {
250 self.timer
251 .set_timeout(min_block_time, OnConflict::Min)
252 .await;
253 } else if self.txpool.pending_number().await? > 0 {
254 self.timer
256 .set_timeout(max_tx_idle_time, OnConflict::Min)
257 .await;
258 }
259 }
260 }
261
262 Ok(())
263 }
264
265 async fn on_txpool_event(&mut self, txpool_event: &TxStatus) -> anyhow::Result<()> {
266 match txpool_event {
267 TxStatus::Submitted => match self.trigger {
268 Trigger::Instant => {
269 let pending_number = self.txpool.pending_number().await?;
270 if pending_number > 0 {
272 self.produce_block().await?;
273 }
274 Ok(())
275 }
276 Trigger::Never | Trigger::Interval { .. } => Ok(()),
277 Trigger::Hybrid {
278 max_tx_idle_time,
279 min_block_time,
280 ..
281 } => {
282 let consumable_gas = self.txpool.total_consumable_gas().await?;
283
284 if consumable_gas > self.block_gas_limit
287 && self.last_block_created + min_block_time < Instant::now()
288 {
289 self.produce_block().await?;
290 } else if self.txpool.pending_number().await? > 0 {
291 self.timer
293 .set_timeout(max_tx_idle_time, OnConflict::Min)
294 .await;
295 }
296
297 Ok(())
298 }
299 },
300 TxStatus::Completed => Ok(()), TxStatus::SqueezedOut { .. } => {
302 Ok(())
304 }
305 }
306 }
307
308 async fn on_timer(&mut self, _at: Instant) -> anyhow::Result<()> {
309 match self.trigger {
310 Trigger::Instant | Trigger::Never => {
311 unreachable!("Timer is never set in this mode");
312 }
313 Trigger::Interval { .. } | Trigger::Hybrid { .. } => {
321 self.produce_block().await?;
322 Ok(())
323 }
324 }
325 }
326
327 async fn process_next_event(&mut self) -> anyhow::Result<bool> {
330 tokio::select! {
331 _ = self.stop.recv() => {
332 Ok(false)
333 }
334 txpool_event = self.txpool_broadcast.recv() => {
341 self.on_txpool_event(&txpool_event.context("Broadcast receive error")?).await.context("While processing txpool event")?;
342 Ok(true)
343 }
344 at = self.timer.wait() => {
345 self.on_timer(at).await.context("While processing timer event")?;
346 Ok(true)
347 }
348 }
349 }
350
351 async fn init_timers(&mut self) {
352 match self.trigger {
353 Trigger::Never | Trigger::Instant => {}
354 Trigger::Interval { block_time } => {
355 self.timer
356 .set_timeout(block_time, OnConflict::Overwrite)
357 .await;
358 }
359 Trigger::Hybrid { max_block_time, .. } => {
360 self.timer
361 .set_timeout(max_block_time, OnConflict::Overwrite)
362 .await;
363 }
364 }
365 }
366
367 async fn run(mut self) {
369 self.init_timers().await;
370 loop {
371 match self.process_next_event().await {
372 Ok(should_continue) => {
373 if !should_continue {
374 break
375 }
376 }
377 Err(err) => {
378 error!("PoA module encountered an error: {err:?}");
379 }
380 }
381 }
382 }
383}
384
385pub fn seal_block(
386 signing_key: &Option<Secret<SecretKeyWrapper>>,
387 block: &FuelBlock,
388 database: &mut dyn BlockDb,
389) -> anyhow::Result<()> {
390 if let Some(key) = signing_key {
391 let block_hash = block.id();
392 let message = block_hash.into_message();
393
394 let signing_key = key.expose_secret().deref();
396
397 let poa_signature = Signature::sign(signing_key, &message);
398 let seal = FuelBlockConsensus::PoA(FuelBlockPoAConsensus::new(poa_signature));
399 database.seal_block(block_hash, seal)
400 } else {
401 Err(anyhow!("no PoA signing key configured"))
402 }
403}
404
405#[cfg(test)]
406mod test {
407 use super::*;
408 use fuel_core_interfaces::{
409 common::{
410 fuel_crypto::SecretKey,
411 fuel_tx::{
412 Receipt,
413 Transaction,
414 TransactionBuilder,
415 TxId,
416 },
417 },
418 db::{
419 Error as DBError,
420 Transactional,
421 },
422 executor::Error,
423 model::{
424 ArcPoolTx,
425 BlockId,
426 },
427 txpool::Error::NoMetadata,
428 };
429 use rand::{
430 prelude::StdRng,
431 Rng,
432 SeedableRng,
433 };
434 use std::{
435 collections::HashSet,
436 time::Duration,
437 };
438 use tokio::time;
439
440 mockall::mock! {
441 TxPool {}
442
443 #[async_trait::async_trait]
444 impl TransactionPool for TxPool {
445 async fn pending_number(&self) -> anyhow::Result<usize>;
446
447 async fn total_consumable_gas(&self) -> anyhow::Result<u64>;
448
449 async fn remove_txs(&mut self, tx_ids: Vec<TxId>) -> anyhow::Result<Vec<ArcPoolTx>>;
450 }
451 }
452
453 mockall::mock! {
454 Database {}
455
456 unsafe impl Sync for Database {}
457 unsafe impl Send for Database {}
458
459 impl BlockDb for Database {
460 fn block_height(&self) -> anyhow::Result<BlockHeight>;
461
462 fn seal_block(
463 &mut self,
464 block_id: BlockId,
465 consensus: FuelBlockConsensus,
466 ) -> anyhow::Result<()>;
467 }
468 }
469
470 mockall::mock! {
471 #[derive(Debug)]
472 DatabaseTransaction{}
473
474 impl Transactional for DatabaseTransaction {
475 fn commit(self) -> Result<(), DBError>;
476
477 fn commit_box(self: Box<Self>) -> Result<(), DBError>;
478 }
479
480 impl fuel_core_interfaces::db::DatabaseTransaction<MockDatabase> for DatabaseTransaction {
481 fn database(&self) -> &MockDatabase;
482
483 fn database_mut(&mut self) -> &mut MockDatabase;
484 }
485 }
486
487 mockall::mock! {
488 BlockProducer {}
489
490 #[async_trait::async_trait]
491 impl BlockProducer<MockDatabase> for BlockProducer {
492 async fn produce_and_execute_block(
493 &self,
494 _height: BlockHeight,
495 _max_gas: Word,
496 ) -> anyhow::Result<UncommittedResult<DBTransaction<MockDatabase>>>;
497
498 async fn dry_run(
499 &self,
500 _transaction: Transaction,
501 _height: Option<BlockHeight>,
502 _utxo_validation: Option<bool>,
503 ) -> anyhow::Result<Vec<Receipt>>;
504 }
505 }
506
507 fn make_tx(rng: &mut StdRng) -> Transaction {
508 TransactionBuilder::create(rng.gen(), rng.gen(), vec![])
509 .gas_price(rng.gen())
510 .gas_limit(rng.gen())
511 .finalize_without_signature_as_transaction()
512 }
513
514 #[tokio::test]
515 async fn remove_skipped_transactions() {
516 let mut rng = StdRng::seed_from_u64(2322);
519 let secret_key = SecretKey::random(&mut rng);
520
521 let (_, stop) = mpsc::channel(1);
522 let (_, txpool_broadcast) = broadcast::channel(1);
523 let (import_block_events_tx, mut import_block_receiver_tx) =
524 broadcast::channel(1);
525 tokio::spawn(async move {
526 import_block_receiver_tx.recv().await.unwrap();
527 });
528
529 const TX_NUM: usize = 100;
530 let skipped_transactions: Vec<_> =
531 (0..TX_NUM).map(|_| make_tx(&mut rng)).collect();
532
533 let mock_skipped_txs = skipped_transactions.clone();
534
535 let mut seq = mockall::Sequence::new();
536
537 let mut block_producer = MockBlockProducer::default();
538 block_producer
539 .expect_produce_and_execute_block()
540 .times(1)
541 .in_sequence(&mut seq)
542 .returning(move |_, _| {
543 let mut db = MockDatabase::default();
544 db.expect_seal_block()
546 .times(1)
547 .in_sequence(&mut seq)
548 .returning(|_, _| Ok(()));
549
550 let mut db_transaction = MockDatabaseTransaction::default();
551 db_transaction.expect_database_mut().times(1).return_var(db);
552
553 db_transaction
555 .expect_commit_box()
556 .times(1)
558 .in_sequence(&mut seq)
559 .returning(|| Ok(()));
560 db_transaction
561 .expect_commit()
562 .times(0)
564 .in_sequence(&mut seq)
565 .returning(|| Ok(()));
566 Ok(UncommittedResult::new(
567 ExecutionResult {
568 block: Default::default(),
569 skipped_transactions: mock_skipped_txs
570 .clone()
571 .into_iter()
572 .map(|tx| (tx, Error::OutputAlreadyExists))
573 .collect(),
574 tx_status: Default::default(),
575 },
576 Box::new(db_transaction),
577 ))
578 });
579
580 let mut db = MockDatabase::default();
581 db.expect_block_height()
582 .returning(|| Ok(BlockHeight::from(1u32)));
583
584 let mut txpool = MockTxPool::default();
585 txpool.expect_remove_txs().returning(move |skipped_ids| {
587 let skipped_transactions: Vec<_> =
589 skipped_transactions.iter().map(|tx| tx.id()).collect();
590
591 let expected_skipped_ids_set: HashSet<_> =
593 skipped_transactions.clone().into_iter().collect();
594 assert_eq!(expected_skipped_ids_set.len(), TX_NUM);
595
596 assert_eq!(skipped_ids.len(), TX_NUM);
598 assert_eq!(skipped_transactions.len(), TX_NUM);
599 assert_eq!(skipped_transactions, skipped_ids);
600 Ok(vec![])
601 });
602
603 let mut task = Task {
604 stop,
605 block_gas_limit: 1000000,
606 signing_key: Some(Secret::new(secret_key.into())),
607 db,
608 block_producer,
609 txpool,
610 txpool_broadcast,
611 import_block_events_tx,
612 last_block_created: Instant::now(),
613 trigger: Trigger::Instant,
614 timer: DeadlineClock::new(),
615 };
616
617 assert!(task.produce_block().await.is_ok());
618 }
619
620 #[tokio::test]
621 async fn does_not_produce_when_txpool_empty_in_instant_mode() {
622 let mut rng = StdRng::seed_from_u64(2322);
625 let secret_key = SecretKey::random(&mut rng);
626
627 let (_stop_tx, stop) = mpsc::channel(1);
628 let (_txpool_tx, txpool_broadcast) = broadcast::channel(1);
629 let (import_block_events_tx, mut import_block_receiver_tx) =
630 broadcast::channel(1);
631 tokio::spawn(async move {
632 import_block_receiver_tx.recv().await.unwrap();
633 });
634
635 let mut block_producer = MockBlockProducer::default();
636
637 block_producer
638 .expect_produce_and_execute_block()
639 .returning(|_, _| panic!("Block production should not be called"));
640
641 let mut db = MockDatabase::default();
642 db.expect_block_height()
643 .returning(|| Ok(BlockHeight::from(1u32)));
644
645 let mut txpool = MockTxPool::default();
646 txpool.expect_total_consumable_gas().returning(|| Ok(0));
647 txpool.expect_pending_number().returning(|| Ok(0));
648
649 let mut task = Task {
650 stop,
651 block_gas_limit: 1000000,
652 signing_key: Some(Secret::new(secret_key.into())),
653 db,
654 block_producer,
655 txpool,
656 txpool_broadcast,
657 import_block_events_tx,
658 last_block_created: Instant::now(),
659 trigger: Trigger::Instant,
660 timer: DeadlineClock::new(),
661 };
662
663 task.on_txpool_event(&TxStatus::Submitted).await.unwrap();
665 task.on_txpool_event(&TxStatus::Completed).await.unwrap();
666 task.on_txpool_event(&TxStatus::SqueezedOut { reason: NoMetadata })
667 .await
668 .unwrap();
669 }
670
671 #[tokio::test(start_paused = true)]
672 async fn hybrid_production_doesnt_produce_empty_blocks_when_txpool_is_empty() {
673 let mut rng = StdRng::seed_from_u64(2322);
676 let secret_key = SecretKey::random(&mut rng);
677
678 const TX_IDLE_TIME_MS: u64 = 50u64;
679
680 let (stop_tx, stop) = mpsc::channel(1);
681 let (txpool_tx, txpool_broadcast) = broadcast::channel(10);
682 let (import_block_events_tx, mut import_block_receiver_tx) =
683 broadcast::channel(1);
684 tokio::spawn(async move {
685 let _ = import_block_receiver_tx.recv().await;
686 });
687
688 let mut block_producer = MockBlockProducer::default();
689
690 block_producer
691 .expect_produce_and_execute_block()
692 .returning(|_, _| panic!("Block production should not be called"));
693
694 let mut db = MockDatabase::default();
695 db.expect_block_height()
696 .returning(|| Ok(BlockHeight::from(1u32)));
697
698 let mut txpool = MockTxPool::default();
699 txpool.expect_total_consumable_gas().returning(|| Ok(0));
700 txpool.expect_pending_number().returning(|| Ok(0));
701
702 let task = Task {
703 stop,
704 block_gas_limit: 1000000,
705 signing_key: Some(Secret::new(secret_key.into())),
706 db,
707 block_producer,
708 txpool,
709 txpool_broadcast,
710 import_block_events_tx,
711 last_block_created: Instant::now(),
712 trigger: Trigger::Hybrid {
713 min_block_time: Duration::from_millis(100),
714 max_tx_idle_time: Duration::from_millis(TX_IDLE_TIME_MS),
715 max_block_time: Duration::from_millis(1000),
716 },
717 timer: DeadlineClock::new(),
718 };
719
720 let jh = tokio::spawn(task.run());
721
722 txpool_tx.send(TxStatus::Submitted).unwrap();
724 txpool_tx.send(TxStatus::Completed).unwrap();
725 txpool_tx
726 .send(TxStatus::SqueezedOut { reason: NoMetadata })
727 .unwrap();
728
729 time::sleep(Duration::from_millis(TX_IDLE_TIME_MS)).await;
732
733 stop_tx.send(()).await.unwrap();
735
736 jh.await.unwrap();
738 }
739}