use crate::{
database::Database,
service::adapters::BlockImporterAdapter,
};
use fuel_core_producer::ports::BlockProducerDatabase;
use fuel_core_services::{
stream::BoxStream,
RunnableService,
RunnableTask,
ServiceRunner,
SharedMutex,
StateWatcher,
};
use fuel_core_storage::{
not_found,
tables::ConsensusParametersVersions,
Result as StorageResult,
StorageAsRef,
};
use fuel_core_txpool::ports::BlockImporter;
use fuel_core_types::{
blockchain::header::ConsensusParametersVersion,
fuel_tx::ConsensusParameters,
services::block_importer::SharedImportResult,
};
use futures::StreamExt;
use std::{
collections::HashMap,
fmt::Debug,
sync::Arc,
};
#[derive(Clone, Debug)]
pub struct SharedState {
latest_consensus_parameters_version: SharedMutex<ConsensusParametersVersion>,
consensus_parameters:
SharedMutex<HashMap<ConsensusParametersVersion, Arc<ConsensusParameters>>>,
database: Database,
}
pub struct Task {
blocks_events: BoxStream<SharedImportResult>,
shared_state: SharedState,
}
impl Debug for Task {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Task")
.field("shared_state", &self.shared_state)
.finish()
}
}
impl SharedState {
fn new(database: Database) -> Self {
let genesis_version = 0;
Self {
latest_consensus_parameters_version: SharedMutex::new(genesis_version),
consensus_parameters: Default::default(),
database,
}
}
fn cache_consensus_parameters(
&self,
version: ConsensusParametersVersion,
) -> StorageResult<Arc<ConsensusParameters>> {
let consensus_parameters = self
.database
.storage::<ConsensusParametersVersions>()
.get(&version)?
.ok_or(not_found!(ConsensusParametersVersions))?
.into_owned();
let consensus_parameters = Arc::new(consensus_parameters);
self.consensus_parameters
.lock()
.insert(version, consensus_parameters.clone());
Ok(consensus_parameters)
}
pub fn get_consensus_parameters(
&self,
version: &ConsensusParametersVersion,
) -> StorageResult<Arc<ConsensusParameters>> {
{
let consensus_parameters = self.consensus_parameters.lock();
if let Some(parameters) = consensus_parameters.get(version) {
return Ok(parameters.clone());
}
}
self.cache_consensus_parameters(*version)
}
pub fn latest_consensus_parameters(&self) -> Arc<ConsensusParameters> {
let version = *self.latest_consensus_parameters_version.lock();
self.get_consensus_parameters(&version)
.expect("The latest consensus parameters always are available unless this function was called before regenesis.")
}
}
#[async_trait::async_trait]
impl RunnableTask for Task {
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
tokio::select! {
biased;
_ = watcher.while_started() => {
should_continue = false;
}
Some(event) = self.blocks_events.next() => {
let new_version = event
.sealed_block
.entity
.header()
.application()
.consensus_parameters_version;
if new_version > *self.shared_state.latest_consensus_parameters_version.lock() {
match self.shared_state.cache_consensus_parameters(new_version) {
Ok(_) => {
*self.shared_state.latest_consensus_parameters_version.lock() = new_version;
}
Err(err) => {
tracing::error!("Failed to cache consensus parameters: {:?}", err);
should_continue = false;
return Ok(should_continue)
}
}
}
should_continue = true;
}
}
Ok(should_continue)
}
async fn shutdown(self) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait::async_trait]
impl RunnableService for Task {
const NAME: &'static str = "ConsensusParametersProviderTask";
type SharedData = SharedState;
type Task = Self;
type TaskParams = ();
fn shared_data(&self) -> Self::SharedData {
self.shared_state.clone()
}
async fn into_task(
self,
_: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
let latest_consensus_parameters_version = self
.shared_state
.database
.latest_consensus_parameters_version()?;
self.shared_state
.cache_consensus_parameters(latest_consensus_parameters_version)?;
*self.shared_state.latest_consensus_parameters_version.lock() =
latest_consensus_parameters_version;
Ok(self)
}
}
pub fn new_service(
database: Database,
importer: &BlockImporterAdapter,
) -> ServiceRunner<Task> {
let blocks_events = importer.block_events();
ServiceRunner::new(Task {
blocks_events,
shared_state: SharedState::new(database),
})
}
#[cfg(test)]
mod tests {
use crate::{
database::Database,
service::adapters::consensus_parameters_provider::{
SharedState,
Task,
},
};
use fuel_core_services::{
stream::IntoBoxStream,
RunnableService,
RunnableTask,
StateWatcher,
};
use fuel_core_storage::{
tables::ConsensusParametersVersions,
transactional::IntoTransaction,
StorageAsMut,
};
use fuel_core_types::{
blockchain::{
block::Block,
header::ConsensusParametersVersion,
SealedBlock,
},
fuel_tx::ConsensusParameters,
services::block_importer::{
ImportResult,
SharedImportResult,
},
};
use futures::stream;
fn add_consensus_parameters(
database: Database,
consensus_parameters_version: ConsensusParametersVersion,
consensus_parameters: &ConsensusParameters,
) -> Database {
let mut database = database.into_transaction();
database
.storage_as_mut::<ConsensusParametersVersions>()
.insert(&consensus_parameters_version, consensus_parameters)
.unwrap();
database.commit().unwrap()
}
#[tokio::test]
async fn latest_consensus_parameters_works() {
let version = 0;
let consensus_parameters = Default::default();
let database =
add_consensus_parameters(Database::default(), version, &consensus_parameters);
let state = SharedState::new(database);
let fetched_parameters = state.latest_consensus_parameters();
assert_eq!(fetched_parameters.as_ref(), &consensus_parameters);
}
#[tokio::test]
async fn into_task_works_with_non_empty_database() {
let version = 0;
let consensus_parameters = Default::default();
let non_empty_database =
add_consensus_parameters(Database::default(), version, &consensus_parameters);
let task = Task {
blocks_events: stream::empty().into_boxed(),
shared_state: SharedState::new(non_empty_database),
};
let result = task.into_task(&Default::default(), ()).await;
result.expect("Initialization should succeed because database contains consensus parameters.");
}
#[tokio::test]
async fn into_task_fails_when_no_parameters() {
let empty_database = Database::default();
let task = Task {
blocks_events: stream::empty().into_boxed(),
shared_state: SharedState::new(empty_database),
};
let result = task.into_task(&Default::default(), ()).await;
result.expect_err(
"Initialization should fails because of lack of consensus parameters",
);
}
fn result_with_new_version(
version: ConsensusParametersVersion,
) -> SharedImportResult {
let mut block = Block::default();
block
.header_mut()
.application_mut()
.consensus_parameters_version = version;
let sealed_block = SealedBlock {
entity: block,
consensus: Default::default(),
};
std::sync::Arc::new(ImportResult::new_from_local(
sealed_block,
Default::default(),
Default::default(),
))
}
#[tokio::test]
async fn run_updates_the_latest_consensus_parameters_from_imported_block() {
use futures::StreamExt;
let old_version = 0;
let new_version = 1234;
let old_consensus_parameters = ConsensusParameters::default();
let mut new_consensus_parameters = ConsensusParameters::default();
new_consensus_parameters.set_privileged_address([123; 32].into());
assert_ne!(old_consensus_parameters, new_consensus_parameters);
let (block_sender, block_receiver) =
tokio::sync::broadcast::channel::<SharedImportResult>(1);
let database_with_old_parameters = add_consensus_parameters(
Database::default(),
old_version,
&old_consensus_parameters,
);
let mut task = Task {
blocks_events: IntoBoxStream::into_boxed(
tokio_stream::wrappers::BroadcastStream::new(block_receiver)
.filter_map(|r| futures::future::ready(r.ok())),
),
shared_state: SharedState::new(database_with_old_parameters.clone()),
}
.into_task(&Default::default(), ())
.await
.unwrap();
assert_eq!(
task.shared_state.latest_consensus_parameters().as_ref(),
&old_consensus_parameters
);
add_consensus_parameters(
database_with_old_parameters,
new_version,
&new_consensus_parameters,
);
let result_with_new_version = result_with_new_version(new_version);
let _ = block_sender.send(result_with_new_version);
task.run(&mut StateWatcher::started()).await.unwrap();
assert_eq!(
task.shared_state.latest_consensus_parameters().as_ref(),
&new_consensus_parameters
);
}
}