use crate::{
deadline_clock::{
DeadlineClock,
OnConflict,
},
ports::{
BlockDb,
BlockProducer,
DBTransaction,
},
Config,
Trigger,
};
use anyhow::{
anyhow,
Context,
};
use fuel_core_interfaces::{
block_importer::ImportBlockBroadcast,
common::{
fuel_tx::UniqueIdentifier,
prelude::{
Signature,
Word,
},
secrecy::{
ExposeSecret,
Secret,
},
},
executor::{
ExecutionResult,
UncommittedResult,
},
model::{
BlockHeight,
FuelBlock,
FuelBlockConsensus,
FuelBlockPoAConsensus,
SecretKeyWrapper,
},
poa_coordinator::TransactionPool,
txpool::TxStatus,
};
use parking_lot::Mutex;
use std::{
ops::Deref,
sync::Arc,
};
use tokio::{
sync::{
broadcast,
mpsc,
},
task::JoinHandle,
time::Instant,
};
use tracing::{
error,
warn,
};
pub struct RunningService {
join: JoinHandle<()>,
stop: mpsc::Sender<()>,
}
pub struct Service {
running: Mutex<Option<RunningService>>,
config: Config,
}
impl Service {
pub fn new(config: &Config) -> Self {
Self {
running: Mutex::new(None),
config: config.clone(),
}
}
pub async fn start<D, T, B>(
&self,
txpool_broadcast: broadcast::Receiver<TxStatus>,
txpool: T,
import_block_events_tx: broadcast::Sender<ImportBlockBroadcast>,
block_producer: B,
db: D,
) where
D: BlockDb + Send + Clone + 'static,
T: TransactionPool + Send + Sync + 'static,
B: BlockProducer<D> + 'static,
{
let mut running = self.running.lock();
if running.is_some() {
warn!("Trying to start a service that is already running");
return
}
let (stop_tx, stop_rx) = mpsc::channel(1);
let task = Task {
stop: stop_rx,
block_gas_limit: self.config.block_gas_limit,
signing_key: self.config.signing_key.clone(),
db,
block_producer,
txpool_broadcast,
txpool,
last_block_created: Instant::now(),
import_block_events_tx,
trigger: self.config.trigger,
timer: DeadlineClock::new(),
};
*running = Some(RunningService {
join: tokio::spawn(task.run()),
stop: stop_tx,
});
}
pub async fn stop(&self) -> Option<JoinHandle<()>> {
let maybe_running = self.running.lock().take();
if let Some(running) = maybe_running {
let _ = running.stop.send(()).await;
Some(running.join)
} else {
warn!("Trying to stop a service that is not running");
None
}
}
}
pub struct Task<D, T, B>
where
D: BlockDb + Send + Sync,
T: TransactionPool,
B: BlockProducer<D>,
{
stop: mpsc::Receiver<()>,
block_gas_limit: Word,
signing_key: Option<Secret<SecretKeyWrapper>>,
db: D,
block_producer: B,
txpool: T,
txpool_broadcast: broadcast::Receiver<TxStatus>,
import_block_events_tx: broadcast::Sender<ImportBlockBroadcast>,
last_block_created: Instant,
trigger: Trigger,
timer: DeadlineClock,
}
impl<D, T, B> Task<D, T, B>
where
D: BlockDb + Send,
T: TransactionPool,
B: BlockProducer<D>,
{
async fn signal_produce_block(
&mut self,
) -> anyhow::Result<UncommittedResult<DBTransaction<D>>> {
let current_height = self
.db
.block_height()
.map_err(|err| anyhow::format_err!("db error {err:?}"))?;
let height = BlockHeight::from(current_height.as_usize() + 1);
self.block_producer
.produce_and_execute_block(height, self.block_gas_limit)
.await
}
async fn produce_block(&mut self) -> anyhow::Result<()> {
if self.signing_key.is_none() {
return Err(anyhow!("unable to produce blocks without a consensus key"))
}
let (
ExecutionResult {
block,
skipped_transactions,
..
},
mut db_transaction,
) = self.signal_produce_block().await?.into();
seal_block(&self.signing_key, &block, db_transaction.database_mut())?;
db_transaction.commit_box()?;
let mut tx_ids_to_remove = Vec::with_capacity(skipped_transactions.len());
for (tx, err) in skipped_transactions {
error!(
"During block production got invalid transaction {:?} with error {:?}",
tx, err
);
tx_ids_to_remove.push(tx.id());
}
if let Err(err) = self.txpool.remove_txs(tx_ids_to_remove).await {
error!(
"Unable to clean up skipped transaction from `TxPool` with error {:?}",
err
);
};
self.import_block_events_tx
.send(ImportBlockBroadcast::PendingFuelBlockImported {
block: Arc::new(block),
})
.expect("Failed to import the generated block");
self.last_block_created = Instant::now();
match self.trigger {
Trigger::Never => {
unreachable!("This mode will never produce blocks");
}
Trigger::Instant => {}
Trigger::Interval { block_time } => {
self.timer.set_timeout(block_time, OnConflict::Min).await;
}
Trigger::Hybrid {
max_block_time,
min_block_time,
max_tx_idle_time,
} => {
self.timer
.set_timeout(max_block_time, OnConflict::Min)
.await;
let consumable_gas = self.txpool.total_consumable_gas().await?;
if consumable_gas > self.block_gas_limit {
self.timer
.set_timeout(min_block_time, OnConflict::Min)
.await;
} else if self.txpool.pending_number().await? > 0 {
self.timer
.set_timeout(max_tx_idle_time, OnConflict::Min)
.await;
}
}
}
Ok(())
}
async fn on_txpool_event(&mut self, txpool_event: &TxStatus) -> anyhow::Result<()> {
match txpool_event {
TxStatus::Submitted => match self.trigger {
Trigger::Instant => {
let pending_number = self.txpool.pending_number().await?;
if pending_number > 0 {
self.produce_block().await?;
}
Ok(())
}
Trigger::Never | Trigger::Interval { .. } => Ok(()),
Trigger::Hybrid {
max_tx_idle_time,
min_block_time,
..
} => {
let consumable_gas = self.txpool.total_consumable_gas().await?;
if consumable_gas > self.block_gas_limit
&& self.last_block_created + min_block_time < Instant::now()
{
self.produce_block().await?;
} else if self.txpool.pending_number().await? > 0 {
self.timer
.set_timeout(max_tx_idle_time, OnConflict::Min)
.await;
}
Ok(())
}
},
TxStatus::Completed => Ok(()), TxStatus::SqueezedOut { .. } => {
Ok(())
}
}
}
async fn on_timer(&mut self, _at: Instant) -> anyhow::Result<()> {
match self.trigger {
Trigger::Instant | Trigger::Never => {
unreachable!("Timer is never set in this mode");
}
Trigger::Interval { .. } | Trigger::Hybrid { .. } => {
self.produce_block().await?;
Ok(())
}
}
}
async fn process_next_event(&mut self) -> anyhow::Result<bool> {
tokio::select! {
_ = self.stop.recv() => {
Ok(false)
}
txpool_event = self.txpool_broadcast.recv() => {
self.on_txpool_event(&txpool_event.context("Broadcast receive error")?).await.context("While processing txpool event")?;
Ok(true)
}
at = self.timer.wait() => {
self.on_timer(at).await.context("While processing timer event")?;
Ok(true)
}
}
}
async fn init_timers(&mut self) {
match self.trigger {
Trigger::Never | Trigger::Instant => {}
Trigger::Interval { block_time } => {
self.timer
.set_timeout(block_time, OnConflict::Overwrite)
.await;
}
Trigger::Hybrid { max_block_time, .. } => {
self.timer
.set_timeout(max_block_time, OnConflict::Overwrite)
.await;
}
}
}
async fn run(mut self) {
self.init_timers().await;
loop {
match self.process_next_event().await {
Ok(should_continue) => {
if !should_continue {
break
}
}
Err(err) => {
error!("PoA module encountered an error: {err:?}");
}
}
}
}
}
pub fn seal_block(
signing_key: &Option<Secret<SecretKeyWrapper>>,
block: &FuelBlock,
database: &mut dyn BlockDb,
) -> anyhow::Result<()> {
if let Some(key) = signing_key {
let block_hash = block.id();
let message = block_hash.into_message();
let signing_key = key.expose_secret().deref();
let poa_signature = Signature::sign(signing_key, &message);
let seal = FuelBlockConsensus::PoA(FuelBlockPoAConsensus::new(poa_signature));
database.seal_block(block_hash, seal)
} else {
Err(anyhow!("no PoA signing key configured"))
}
}
#[cfg(test)]
mod test {
use super::*;
use fuel_core_interfaces::{
common::{
fuel_crypto::SecretKey,
fuel_tx::{
Receipt,
Transaction,
TransactionBuilder,
TxId,
},
},
db::{
Error as DBError,
Transactional,
},
executor::Error,
model::{
ArcPoolTx,
BlockId,
},
txpool::Error::NoMetadata,
};
use rand::{
prelude::StdRng,
Rng,
SeedableRng,
};
use std::{
collections::HashSet,
time::Duration,
};
use tokio::time;
mockall::mock! {
TxPool {}
#[async_trait::async_trait]
impl TransactionPool for TxPool {
async fn pending_number(&self) -> anyhow::Result<usize>;
async fn total_consumable_gas(&self) -> anyhow::Result<u64>;
async fn remove_txs(&mut self, tx_ids: Vec<TxId>) -> anyhow::Result<Vec<ArcPoolTx>>;
}
}
mockall::mock! {
Database {}
unsafe impl Sync for Database {}
unsafe impl Send for Database {}
impl BlockDb for Database {
fn block_height(&self) -> anyhow::Result<BlockHeight>;
fn seal_block(
&mut self,
block_id: BlockId,
consensus: FuelBlockConsensus,
) -> anyhow::Result<()>;
}
}
mockall::mock! {
#[derive(Debug)]
DatabaseTransaction{}
impl Transactional for DatabaseTransaction {
fn commit(self) -> Result<(), DBError>;
fn commit_box(self: Box<Self>) -> Result<(), DBError>;
}
impl fuel_core_interfaces::db::DatabaseTransaction<MockDatabase> for DatabaseTransaction {
fn database(&self) -> &MockDatabase;
fn database_mut(&mut self) -> &mut MockDatabase;
}
}
mockall::mock! {
BlockProducer {}
#[async_trait::async_trait]
impl BlockProducer<MockDatabase> for BlockProducer {
async fn produce_and_execute_block(
&self,
_height: BlockHeight,
_max_gas: Word,
) -> anyhow::Result<UncommittedResult<DBTransaction<MockDatabase>>>;
async fn dry_run(
&self,
_transaction: Transaction,
_height: Option<BlockHeight>,
_utxo_validation: Option<bool>,
) -> anyhow::Result<Vec<Receipt>>;
}
}
fn make_tx(rng: &mut StdRng) -> Transaction {
TransactionBuilder::create(rng.gen(), rng.gen(), vec![])
.gas_price(rng.gen())
.gas_limit(rng.gen())
.finalize_without_signature_as_transaction()
}
#[tokio::test]
async fn remove_skipped_transactions() {
let mut rng = StdRng::seed_from_u64(2322);
let secret_key = SecretKey::random(&mut rng);
let (_, stop) = mpsc::channel(1);
let (_, txpool_broadcast) = broadcast::channel(1);
let (import_block_events_tx, mut import_block_receiver_tx) =
broadcast::channel(1);
tokio::spawn(async move {
import_block_receiver_tx.recv().await.unwrap();
});
const TX_NUM: usize = 100;
let skipped_transactions: Vec<_> =
(0..TX_NUM).map(|_| make_tx(&mut rng)).collect();
let mock_skipped_txs = skipped_transactions.clone();
let mut seq = mockall::Sequence::new();
let mut block_producer = MockBlockProducer::default();
block_producer
.expect_produce_and_execute_block()
.times(1)
.in_sequence(&mut seq)
.returning(move |_, _| {
let mut db = MockDatabase::default();
db.expect_seal_block()
.times(1)
.in_sequence(&mut seq)
.returning(|_, _| Ok(()));
let mut db_transaction = MockDatabaseTransaction::default();
db_transaction.expect_database_mut().times(1).return_var(db);
db_transaction
.expect_commit_box()
.times(1)
.in_sequence(&mut seq)
.returning(|| Ok(()));
db_transaction
.expect_commit()
.times(0)
.in_sequence(&mut seq)
.returning(|| Ok(()));
Ok(UncommittedResult::new(
ExecutionResult {
block: Default::default(),
skipped_transactions: mock_skipped_txs
.clone()
.into_iter()
.map(|tx| (tx, Error::OutputAlreadyExists))
.collect(),
tx_status: Default::default(),
},
Box::new(db_transaction),
))
});
let mut db = MockDatabase::default();
db.expect_block_height()
.returning(|| Ok(BlockHeight::from(1u32)));
let mut txpool = MockTxPool::default();
txpool.expect_remove_txs().returning(move |skipped_ids| {
let skipped_transactions: Vec<_> =
skipped_transactions.iter().map(|tx| tx.id()).collect();
let expected_skipped_ids_set: HashSet<_> =
skipped_transactions.clone().into_iter().collect();
assert_eq!(expected_skipped_ids_set.len(), TX_NUM);
assert_eq!(skipped_ids.len(), TX_NUM);
assert_eq!(skipped_transactions.len(), TX_NUM);
assert_eq!(skipped_transactions, skipped_ids);
Ok(vec![])
});
let mut task = Task {
stop,
block_gas_limit: 1000000,
signing_key: Some(Secret::new(secret_key.into())),
db,
block_producer,
txpool,
txpool_broadcast,
import_block_events_tx,
last_block_created: Instant::now(),
trigger: Trigger::Instant,
timer: DeadlineClock::new(),
};
assert!(task.produce_block().await.is_ok());
}
#[tokio::test]
async fn does_not_produce_when_txpool_empty_in_instant_mode() {
let mut rng = StdRng::seed_from_u64(2322);
let secret_key = SecretKey::random(&mut rng);
let (_stop_tx, stop) = mpsc::channel(1);
let (_txpool_tx, txpool_broadcast) = broadcast::channel(1);
let (import_block_events_tx, mut import_block_receiver_tx) =
broadcast::channel(1);
tokio::spawn(async move {
import_block_receiver_tx.recv().await.unwrap();
});
let mut block_producer = MockBlockProducer::default();
block_producer
.expect_produce_and_execute_block()
.returning(|_, _| panic!("Block production should not be called"));
let mut db = MockDatabase::default();
db.expect_block_height()
.returning(|| Ok(BlockHeight::from(1u32)));
let mut txpool = MockTxPool::default();
txpool.expect_total_consumable_gas().returning(|| Ok(0));
txpool.expect_pending_number().returning(|| Ok(0));
let mut task = Task {
stop,
block_gas_limit: 1000000,
signing_key: Some(Secret::new(secret_key.into())),
db,
block_producer,
txpool,
txpool_broadcast,
import_block_events_tx,
last_block_created: Instant::now(),
trigger: Trigger::Instant,
timer: DeadlineClock::new(),
};
task.on_txpool_event(&TxStatus::Submitted).await.unwrap();
task.on_txpool_event(&TxStatus::Completed).await.unwrap();
task.on_txpool_event(&TxStatus::SqueezedOut { reason: NoMetadata })
.await
.unwrap();
}
#[tokio::test(start_paused = true)]
async fn hybrid_production_doesnt_produce_empty_blocks_when_txpool_is_empty() {
let mut rng = StdRng::seed_from_u64(2322);
let secret_key = SecretKey::random(&mut rng);
const TX_IDLE_TIME_MS: u64 = 50u64;
let (stop_tx, stop) = mpsc::channel(1);
let (txpool_tx, txpool_broadcast) = broadcast::channel(10);
let (import_block_events_tx, mut import_block_receiver_tx) =
broadcast::channel(1);
tokio::spawn(async move {
let _ = import_block_receiver_tx.recv().await;
});
let mut block_producer = MockBlockProducer::default();
block_producer
.expect_produce_and_execute_block()
.returning(|_, _| panic!("Block production should not be called"));
let mut db = MockDatabase::default();
db.expect_block_height()
.returning(|| Ok(BlockHeight::from(1u32)));
let mut txpool = MockTxPool::default();
txpool.expect_total_consumable_gas().returning(|| Ok(0));
txpool.expect_pending_number().returning(|| Ok(0));
let task = Task {
stop,
block_gas_limit: 1000000,
signing_key: Some(Secret::new(secret_key.into())),
db,
block_producer,
txpool,
txpool_broadcast,
import_block_events_tx,
last_block_created: Instant::now(),
trigger: Trigger::Hybrid {
min_block_time: Duration::from_millis(100),
max_tx_idle_time: Duration::from_millis(TX_IDLE_TIME_MS),
max_block_time: Duration::from_millis(1000),
},
timer: DeadlineClock::new(),
};
let jh = tokio::spawn(task.run());
txpool_tx.send(TxStatus::Submitted).unwrap();
txpool_tx.send(TxStatus::Completed).unwrap();
txpool_tx
.send(TxStatus::SqueezedOut { reason: NoMetadata })
.unwrap();
time::sleep(Duration::from_millis(TX_IDLE_TIME_MS)).await;
stop_tx.send(()).await.unwrap();
jh.await.unwrap();
}
}