fuel_poa_coordinator/
service.rs

1use crate::{
2    deadline_clock::{
3        DeadlineClock,
4        OnConflict,
5    },
6    ports::{
7        BlockDb,
8        BlockProducer,
9        DBTransaction,
10    },
11    Config,
12    Trigger,
13};
14use anyhow::{
15    anyhow,
16    Context,
17};
18use fuel_core_interfaces::{
19    block_importer::ImportBlockBroadcast,
20    common::{
21        fuel_tx::UniqueIdentifier,
22        prelude::{
23            Signature,
24            Word,
25        },
26        secrecy::{
27            ExposeSecret,
28            Secret,
29        },
30    },
31    executor::{
32        ExecutionResult,
33        UncommittedResult,
34    },
35    model::{
36        BlockHeight,
37        FuelBlock,
38        FuelBlockConsensus,
39        FuelBlockPoAConsensus,
40        SecretKeyWrapper,
41    },
42    poa_coordinator::TransactionPool,
43    txpool::TxStatus,
44};
45use parking_lot::Mutex;
46use std::{
47    ops::Deref,
48    sync::Arc,
49};
50use tokio::{
51    sync::{
52        broadcast,
53        mpsc,
54    },
55    task::JoinHandle,
56    time::Instant,
57};
58use tracing::{
59    error,
60    warn,
61};
62
63pub struct RunningService {
64    join: JoinHandle<()>,
65    stop: mpsc::Sender<()>,
66}
67
68pub struct Service {
69    running: Mutex<Option<RunningService>>,
70    config: Config,
71}
72
73impl Service {
74    pub fn new(config: &Config) -> Self {
75        Self {
76            running: Mutex::new(None),
77            config: config.clone(),
78        }
79    }
80
81    pub async fn start<D, T, B>(
82        &self,
83        txpool_broadcast: broadcast::Receiver<TxStatus>,
84        txpool: T,
85        import_block_events_tx: broadcast::Sender<ImportBlockBroadcast>,
86        block_producer: B,
87        db: D,
88    ) where
89        D: BlockDb + Send + Clone + 'static,
90        T: TransactionPool + Send + Sync + 'static,
91        B: BlockProducer<D> + 'static,
92    {
93        let mut running = self.running.lock();
94
95        if running.is_some() {
96            warn!("Trying to start a service that is already running");
97            return
98        }
99
100        let (stop_tx, stop_rx) = mpsc::channel(1);
101
102        let task = Task {
103            stop: stop_rx,
104            block_gas_limit: self.config.block_gas_limit,
105            signing_key: self.config.signing_key.clone(),
106            db,
107            block_producer,
108            txpool_broadcast,
109            txpool,
110            last_block_created: Instant::now(),
111            import_block_events_tx,
112            trigger: self.config.trigger,
113            timer: DeadlineClock::new(),
114        };
115
116        *running = Some(RunningService {
117            join: tokio::spawn(task.run()),
118            stop: stop_tx,
119        });
120    }
121
122    pub async fn stop(&self) -> Option<JoinHandle<()>> {
123        let maybe_running = self.running.lock().take();
124        if let Some(running) = maybe_running {
125            // Ignore possible send error, as the JoinHandle will report errors anyway
126            let _ = running.stop.send(()).await;
127            Some(running.join)
128        } else {
129            warn!("Trying to stop a service that is not running");
130            None
131        }
132    }
133}
134
135pub struct Task<D, T, B>
136where
137    D: BlockDb + Send + Sync,
138    T: TransactionPool,
139    B: BlockProducer<D>,
140{
141    stop: mpsc::Receiver<()>,
142    block_gas_limit: Word,
143    signing_key: Option<Secret<SecretKeyWrapper>>,
144    db: D,
145    block_producer: B,
146    txpool: T,
147    txpool_broadcast: broadcast::Receiver<TxStatus>,
148    import_block_events_tx: broadcast::Sender<ImportBlockBroadcast>,
149    /// Last block creation time. When starting up, this is initialized
150    /// to `Instant::now()`, which delays the first block on startup for
151    /// a bit, but doesn't cause any other issues.
152    last_block_created: Instant,
153    trigger: Trigger,
154    /// Deadline clock, used by the triggers
155    timer: DeadlineClock,
156}
157
158impl<D, T, B> Task<D, T, B>
159where
160    D: BlockDb + Send,
161    T: TransactionPool,
162    B: BlockProducer<D>,
163{
164    // Request the block producer to make a new block, and return it when ready
165    async fn signal_produce_block(
166        &mut self,
167    ) -> anyhow::Result<UncommittedResult<DBTransaction<D>>> {
168        let current_height = self
169            .db
170            .block_height()
171            .map_err(|err| anyhow::format_err!("db error {err:?}"))?;
172        let height = BlockHeight::from(current_height.as_usize() + 1);
173
174        self.block_producer
175            .produce_and_execute_block(height, self.block_gas_limit)
176            .await
177    }
178
179    async fn produce_block(&mut self) -> anyhow::Result<()> {
180        // verify signing key is set
181        if self.signing_key.is_none() {
182            return Err(anyhow!("unable to produce blocks without a consensus key"))
183        }
184
185        // Ask the block producer to create the block
186        let (
187            ExecutionResult {
188                block,
189                skipped_transactions,
190                ..
191            },
192            mut db_transaction,
193        ) = self.signal_produce_block().await?.into();
194
195        // sign the block and seal it
196        seal_block(&self.signing_key, &block, db_transaction.database_mut())?;
197        db_transaction.commit_box()?;
198
199        let mut tx_ids_to_remove = Vec::with_capacity(skipped_transactions.len());
200        for (tx, err) in skipped_transactions {
201            error!(
202                "During block production got invalid transaction {:?} with error {:?}",
203                tx, err
204            );
205            tx_ids_to_remove.push(tx.id());
206        }
207
208        if let Err(err) = self.txpool.remove_txs(tx_ids_to_remove).await {
209            error!(
210                "Unable to clean up skipped transaction from `TxPool` with error {:?}",
211                err
212            );
213        };
214
215        // Send the block back to the txpool
216        // TODO: this probably must be done differently with multi-node configuration
217        self.import_block_events_tx
218            .send(ImportBlockBroadcast::PendingFuelBlockImported {
219                block: Arc::new(block),
220            })
221            .expect("Failed to import the generated block");
222
223        // Update last block time
224        self.last_block_created = Instant::now();
225
226        // Set timer for the next block
227        match self.trigger {
228            Trigger::Never => {
229                unreachable!("This mode will never produce blocks");
230            }
231            Trigger::Instant => {}
232            Trigger::Interval { block_time } => {
233                // TODO: instead of sleeping for `block_time`, subtract the time we used for processing
234                self.timer.set_timeout(block_time, OnConflict::Min).await;
235            }
236            Trigger::Hybrid {
237                max_block_time,
238                min_block_time,
239                max_tx_idle_time,
240            } => {
241                self.timer
242                    .set_timeout(max_block_time, OnConflict::Min)
243                    .await;
244
245                let consumable_gas = self.txpool.total_consumable_gas().await?;
246
247                // If txpool still has more than a full block of transactions available,
248                // produce new block in min_block_time.
249                if consumable_gas > self.block_gas_limit {
250                    self.timer
251                        .set_timeout(min_block_time, OnConflict::Min)
252                        .await;
253                } else if self.txpool.pending_number().await? > 0 {
254                    // If we still have available txs, reduce the timeout to max idle time
255                    self.timer
256                        .set_timeout(max_tx_idle_time, OnConflict::Min)
257                        .await;
258                }
259            }
260        }
261
262        Ok(())
263    }
264
265    async fn on_txpool_event(&mut self, txpool_event: &TxStatus) -> anyhow::Result<()> {
266        match txpool_event {
267            TxStatus::Submitted => match self.trigger {
268                Trigger::Instant => {
269                    let pending_number = self.txpool.pending_number().await?;
270                    // skip production if there are no pending transactions
271                    if pending_number > 0 {
272                        self.produce_block().await?;
273                    }
274                    Ok(())
275                }
276                Trigger::Never | Trigger::Interval { .. } => Ok(()),
277                Trigger::Hybrid {
278                    max_tx_idle_time,
279                    min_block_time,
280                    ..
281                } => {
282                    let consumable_gas = self.txpool.total_consumable_gas().await?;
283
284                    // If we have over one full block of transactions and min_block_time
285                    // has expired, start block production immediately
286                    if consumable_gas > self.block_gas_limit
287                        && self.last_block_created + min_block_time < Instant::now()
288                    {
289                        self.produce_block().await?;
290                    } else if self.txpool.pending_number().await? > 0 {
291                        // We have at least one transaction, so tx_max_idle_time is the limit
292                        self.timer
293                            .set_timeout(max_tx_idle_time, OnConflict::Min)
294                            .await;
295                    }
296
297                    Ok(())
298                }
299            },
300            TxStatus::Completed => Ok(()), // This has been processed already
301            TxStatus::SqueezedOut { .. } => {
302                // TODO: If this is the only tx, set timer deadline to last_block_time + max_block_time
303                Ok(())
304            }
305        }
306    }
307
308    async fn on_timer(&mut self, _at: Instant) -> anyhow::Result<()> {
309        match self.trigger {
310            Trigger::Instant | Trigger::Never => {
311                unreachable!("Timer is never set in this mode");
312            }
313            // In the Interval mode the timer expires only when a new block should be created.
314            // In the Hybrid mode the timer can be either:
315            // 1. min_block_time expired after it was set when a block
316            //    would have been produced too soon
317            // 2. max_tx_idle_time expired after a tx has arrived
318            // 3. max_block_time expired
319            // => we produce a new block in any case
320            Trigger::Interval { .. } | Trigger::Hybrid { .. } => {
321                self.produce_block().await?;
322                Ok(())
323            }
324        }
325    }
326
327    /// Processes the next incoming event. Called by the main event loop.
328    /// Returns Ok(false) if the event loop should stop.
329    async fn process_next_event(&mut self) -> anyhow::Result<bool> {
330        tokio::select! {
331            _ = self.stop.recv() => {
332                Ok(false)
333            }
334            // TODO: This should likely be refactored to use something like tokio::sync::Notify.
335            //       Otherwise, if a bunch of txs are submitted at once and all the txs are included
336            //       into the first block production trigger, we'll still call the event handler
337            //       for each tx after they've already been included into a block.
338            //       The poa service also doesn't care about events unrelated to new tx submissions,
339            //       and shouldn't be awoken when txs are completed or squeezed out of the pool.
340            txpool_event = self.txpool_broadcast.recv() => {
341                self.on_txpool_event(&txpool_event.context("Broadcast receive error")?).await.context("While processing txpool event")?;
342                Ok(true)
343            }
344            at = self.timer.wait() => {
345                self.on_timer(at).await.context("While processing timer event")?;
346                Ok(true)
347            }
348        }
349    }
350
351    async fn init_timers(&mut self) {
352        match self.trigger {
353            Trigger::Never | Trigger::Instant => {}
354            Trigger::Interval { block_time } => {
355                self.timer
356                    .set_timeout(block_time, OnConflict::Overwrite)
357                    .await;
358            }
359            Trigger::Hybrid { max_block_time, .. } => {
360                self.timer
361                    .set_timeout(max_block_time, OnConflict::Overwrite)
362                    .await;
363            }
364        }
365    }
366
367    /// Start event loop
368    async fn run(mut self) {
369        self.init_timers().await;
370        loop {
371            match self.process_next_event().await {
372                Ok(should_continue) => {
373                    if !should_continue {
374                        break
375                    }
376                }
377                Err(err) => {
378                    error!("PoA module encountered an error: {err:?}");
379                }
380            }
381        }
382    }
383}
384
385pub fn seal_block(
386    signing_key: &Option<Secret<SecretKeyWrapper>>,
387    block: &FuelBlock,
388    database: &mut dyn BlockDb,
389) -> anyhow::Result<()> {
390    if let Some(key) = signing_key {
391        let block_hash = block.id();
392        let message = block_hash.into_message();
393
394        // The length of the secret is checked
395        let signing_key = key.expose_secret().deref();
396
397        let poa_signature = Signature::sign(signing_key, &message);
398        let seal = FuelBlockConsensus::PoA(FuelBlockPoAConsensus::new(poa_signature));
399        database.seal_block(block_hash, seal)
400    } else {
401        Err(anyhow!("no PoA signing key configured"))
402    }
403}
404
405#[cfg(test)]
406mod test {
407    use super::*;
408    use fuel_core_interfaces::{
409        common::{
410            fuel_crypto::SecretKey,
411            fuel_tx::{
412                Receipt,
413                Transaction,
414                TransactionBuilder,
415                TxId,
416            },
417        },
418        db::{
419            Error as DBError,
420            Transactional,
421        },
422        executor::Error,
423        model::{
424            ArcPoolTx,
425            BlockId,
426        },
427        txpool::Error::NoMetadata,
428    };
429    use rand::{
430        prelude::StdRng,
431        Rng,
432        SeedableRng,
433    };
434    use std::{
435        collections::HashSet,
436        time::Duration,
437    };
438    use tokio::time;
439
440    mockall::mock! {
441        TxPool {}
442
443        #[async_trait::async_trait]
444        impl TransactionPool for TxPool {
445            async fn pending_number(&self) -> anyhow::Result<usize>;
446
447            async fn total_consumable_gas(&self) -> anyhow::Result<u64>;
448
449            async fn remove_txs(&mut self, tx_ids: Vec<TxId>) -> anyhow::Result<Vec<ArcPoolTx>>;
450        }
451    }
452
453    mockall::mock! {
454        Database {}
455
456        unsafe impl Sync for Database {}
457        unsafe impl Send for Database {}
458
459        impl BlockDb for Database {
460            fn block_height(&self) -> anyhow::Result<BlockHeight>;
461
462            fn seal_block(
463                &mut self,
464                block_id: BlockId,
465                consensus: FuelBlockConsensus,
466            ) -> anyhow::Result<()>;
467        }
468    }
469
470    mockall::mock! {
471        #[derive(Debug)]
472        DatabaseTransaction{}
473
474        impl Transactional for DatabaseTransaction {
475            fn commit(self) -> Result<(), DBError>;
476
477            fn commit_box(self: Box<Self>) -> Result<(), DBError>;
478        }
479
480        impl fuel_core_interfaces::db::DatabaseTransaction<MockDatabase> for DatabaseTransaction {
481            fn database(&self) -> &MockDatabase;
482
483            fn database_mut(&mut self) -> &mut MockDatabase;
484        }
485    }
486
487    mockall::mock! {
488        BlockProducer {}
489
490        #[async_trait::async_trait]
491        impl BlockProducer<MockDatabase> for BlockProducer {
492            async fn produce_and_execute_block(
493                &self,
494                _height: BlockHeight,
495                _max_gas: Word,
496            ) -> anyhow::Result<UncommittedResult<DBTransaction<MockDatabase>>>;
497
498            async fn dry_run(
499                &self,
500                _transaction: Transaction,
501                _height: Option<BlockHeight>,
502                _utxo_validation: Option<bool>,
503            ) -> anyhow::Result<Vec<Receipt>>;
504        }
505    }
506
507    fn make_tx(rng: &mut StdRng) -> Transaction {
508        TransactionBuilder::create(rng.gen(), rng.gen(), vec![])
509            .gas_price(rng.gen())
510            .gas_limit(rng.gen())
511            .finalize_without_signature_as_transaction()
512    }
513
514    #[tokio::test]
515    async fn remove_skipped_transactions() {
516        // The test verifies that if `BlockProducer` returns skipped transactions, they would
517        // be propagated to `TxPool` for removal.
518        let mut rng = StdRng::seed_from_u64(2322);
519        let secret_key = SecretKey::random(&mut rng);
520
521        let (_, stop) = mpsc::channel(1);
522        let (_, txpool_broadcast) = broadcast::channel(1);
523        let (import_block_events_tx, mut import_block_receiver_tx) =
524            broadcast::channel(1);
525        tokio::spawn(async move {
526            import_block_receiver_tx.recv().await.unwrap();
527        });
528
529        const TX_NUM: usize = 100;
530        let skipped_transactions: Vec<_> =
531            (0..TX_NUM).map(|_| make_tx(&mut rng)).collect();
532
533        let mock_skipped_txs = skipped_transactions.clone();
534
535        let mut seq = mockall::Sequence::new();
536
537        let mut block_producer = MockBlockProducer::default();
538        block_producer
539            .expect_produce_and_execute_block()
540            .times(1)
541            .in_sequence(&mut seq)
542            .returning(move |_, _| {
543                let mut db = MockDatabase::default();
544                // We expect that `seal_block` should be called 1 time after `produce_and_execute_block`.
545                db.expect_seal_block()
546                    .times(1)
547                    .in_sequence(&mut seq)
548                    .returning(|_, _| Ok(()));
549
550                let mut db_transaction = MockDatabaseTransaction::default();
551                db_transaction.expect_database_mut().times(1).return_var(db);
552
553                // Check that `commit` is called after `seal_block`.
554                db_transaction
555                    .expect_commit_box()
556                    // Verifies that `commit_box` have been called.
557                    .times(1)
558                    .in_sequence(&mut seq)
559                    .returning(|| Ok(()));
560                db_transaction
561                    .expect_commit()
562                    // TODO: After removing `commit_box` set `times(1)`
563                    .times(0)
564                    .in_sequence(&mut seq)
565                    .returning(|| Ok(()));
566                Ok(UncommittedResult::new(
567                    ExecutionResult {
568                        block: Default::default(),
569                        skipped_transactions: mock_skipped_txs
570                            .clone()
571                            .into_iter()
572                            .map(|tx| (tx, Error::OutputAlreadyExists))
573                            .collect(),
574                        tx_status: Default::default(),
575                    },
576                    Box::new(db_transaction),
577                ))
578            });
579
580        let mut db = MockDatabase::default();
581        db.expect_block_height()
582            .returning(|| Ok(BlockHeight::from(1u32)));
583
584        let mut txpool = MockTxPool::default();
585        // Test created for only for this check.
586        txpool.expect_remove_txs().returning(move |skipped_ids| {
587            // Transform transactions into ids.
588            let skipped_transactions: Vec<_> =
589                skipped_transactions.iter().map(|tx| tx.id()).collect();
590
591            // Check that all transactions are unique.
592            let expected_skipped_ids_set: HashSet<_> =
593                skipped_transactions.clone().into_iter().collect();
594            assert_eq!(expected_skipped_ids_set.len(), TX_NUM);
595
596            // Check that `TxPool::remove_txs` was called with the same ids in the same order.
597            assert_eq!(skipped_ids.len(), TX_NUM);
598            assert_eq!(skipped_transactions.len(), TX_NUM);
599            assert_eq!(skipped_transactions, skipped_ids);
600            Ok(vec![])
601        });
602
603        let mut task = Task {
604            stop,
605            block_gas_limit: 1000000,
606            signing_key: Some(Secret::new(secret_key.into())),
607            db,
608            block_producer,
609            txpool,
610            txpool_broadcast,
611            import_block_events_tx,
612            last_block_created: Instant::now(),
613            trigger: Trigger::Instant,
614            timer: DeadlineClock::new(),
615        };
616
617        assert!(task.produce_block().await.is_ok());
618    }
619
620    #[tokio::test]
621    async fn does_not_produce_when_txpool_empty_in_instant_mode() {
622        // verify the PoA service doesn't trigger empty blocks to be produced when there are
623        // irrelevant updates from the txpool
624        let mut rng = StdRng::seed_from_u64(2322);
625        let secret_key = SecretKey::random(&mut rng);
626
627        let (_stop_tx, stop) = mpsc::channel(1);
628        let (_txpool_tx, txpool_broadcast) = broadcast::channel(1);
629        let (import_block_events_tx, mut import_block_receiver_tx) =
630            broadcast::channel(1);
631        tokio::spawn(async move {
632            import_block_receiver_tx.recv().await.unwrap();
633        });
634
635        let mut block_producer = MockBlockProducer::default();
636
637        block_producer
638            .expect_produce_and_execute_block()
639            .returning(|_, _| panic!("Block production should not be called"));
640
641        let mut db = MockDatabase::default();
642        db.expect_block_height()
643            .returning(|| Ok(BlockHeight::from(1u32)));
644
645        let mut txpool = MockTxPool::default();
646        txpool.expect_total_consumable_gas().returning(|| Ok(0));
647        txpool.expect_pending_number().returning(|| Ok(0));
648
649        let mut task = Task {
650            stop,
651            block_gas_limit: 1000000,
652            signing_key: Some(Secret::new(secret_key.into())),
653            db,
654            block_producer,
655            txpool,
656            txpool_broadcast,
657            import_block_events_tx,
658            last_block_created: Instant::now(),
659            trigger: Trigger::Instant,
660            timer: DeadlineClock::new(),
661        };
662
663        // simulate some txpool events to see if any block production is erroneously triggered
664        task.on_txpool_event(&TxStatus::Submitted).await.unwrap();
665        task.on_txpool_event(&TxStatus::Completed).await.unwrap();
666        task.on_txpool_event(&TxStatus::SqueezedOut { reason: NoMetadata })
667            .await
668            .unwrap();
669    }
670
671    #[tokio::test(start_paused = true)]
672    async fn hybrid_production_doesnt_produce_empty_blocks_when_txpool_is_empty() {
673        // verify the PoA service doesn't alter the hybrid block timing when
674        // receiving txpool events if txpool is actually empty
675        let mut rng = StdRng::seed_from_u64(2322);
676        let secret_key = SecretKey::random(&mut rng);
677
678        const TX_IDLE_TIME_MS: u64 = 50u64;
679
680        let (stop_tx, stop) = mpsc::channel(1);
681        let (txpool_tx, txpool_broadcast) = broadcast::channel(10);
682        let (import_block_events_tx, mut import_block_receiver_tx) =
683            broadcast::channel(1);
684        tokio::spawn(async move {
685            let _ = import_block_receiver_tx.recv().await;
686        });
687
688        let mut block_producer = MockBlockProducer::default();
689
690        block_producer
691            .expect_produce_and_execute_block()
692            .returning(|_, _| panic!("Block production should not be called"));
693
694        let mut db = MockDatabase::default();
695        db.expect_block_height()
696            .returning(|| Ok(BlockHeight::from(1u32)));
697
698        let mut txpool = MockTxPool::default();
699        txpool.expect_total_consumable_gas().returning(|| Ok(0));
700        txpool.expect_pending_number().returning(|| Ok(0));
701
702        let task = Task {
703            stop,
704            block_gas_limit: 1000000,
705            signing_key: Some(Secret::new(secret_key.into())),
706            db,
707            block_producer,
708            txpool,
709            txpool_broadcast,
710            import_block_events_tx,
711            last_block_created: Instant::now(),
712            trigger: Trigger::Hybrid {
713                min_block_time: Duration::from_millis(100),
714                max_tx_idle_time: Duration::from_millis(TX_IDLE_TIME_MS),
715                max_block_time: Duration::from_millis(1000),
716            },
717            timer: DeadlineClock::new(),
718        };
719
720        let jh = tokio::spawn(task.run());
721
722        // simulate some txpool events to see if any block production is erroneously triggered
723        txpool_tx.send(TxStatus::Submitted).unwrap();
724        txpool_tx.send(TxStatus::Completed).unwrap();
725        txpool_tx
726            .send(TxStatus::SqueezedOut { reason: NoMetadata })
727            .unwrap();
728
729        // wait max_tx_idle_time - causes block production to occur if
730        // pending txs > 0 is not checked.
731        time::sleep(Duration::from_millis(TX_IDLE_TIME_MS)).await;
732
733        // send stop
734        stop_tx.send(()).await.unwrap();
735
736        // await shutdown and capture any errors
737        jh.await.unwrap();
738    }
739}