use crate::{
database::Database,
fuel_core_graphql_api::ports::ConsensusModulePort,
service::adapters::{
BlockImporterAdapter,
BlockProducerAdapter,
PoAAdapter,
TxPoolAdapter,
},
};
use anyhow::anyhow;
use fuel_core_poa::{
ports::{
BlockImporter,
TransactionPool,
},
service::SharedState,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::transactional::StorageTransaction;
use fuel_core_types::{
blockchain::primitives::BlockHeight,
fuel_asm::Word,
fuel_tx::{
Receipt,
Transaction,
TxId,
},
services::{
block_importer::UncommittedResult as UncommittedImporterResult,
executor::UncommittedResult,
txpool::{
ArcPoolTx,
TxStatus,
},
},
tai64::Tai64,
};
impl PoAAdapter {
pub fn new(shared_state: Option<SharedState>) -> Self {
Self { shared_state }
}
}
#[async_trait::async_trait]
impl ConsensusModulePort for PoAAdapter {
async fn manual_produce_block(
&self,
block_times: Vec<Option<Tai64>>,
) -> anyhow::Result<()> {
self.shared_state
.as_ref()
.ok_or(anyhow!("The block production is disabled"))?
.manually_produce_block(block_times)
.await
}
}
impl TransactionPool for TxPoolAdapter {
fn pending_number(&self) -> usize {
self.service.pending_number()
}
fn total_consumable_gas(&self) -> u64 {
self.service.total_consumable_gas()
}
fn remove_txs(&self, ids: Vec<TxId>) -> Vec<ArcPoolTx> {
self.service.remove_txs(ids)
}
fn transaction_status_events(&self) -> BoxStream<TxStatus> {
use tokio_stream::{
wrappers::BroadcastStream,
StreamExt,
};
Box::pin(
BroadcastStream::new(self.service.tx_status_subscribe())
.filter_map(|result| result.ok()),
)
}
}
#[async_trait::async_trait]
impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter {
type Database = Database;
async fn produce_and_execute_block(
&self,
height: BlockHeight,
block_time: Option<Tai64>,
max_gas: Word,
) -> anyhow::Result<UncommittedResult<StorageTransaction<Database>>> {
self.block_producer
.produce_and_execute_block(height, block_time, max_gas)
.await
}
async fn dry_run(
&self,
transaction: Transaction,
height: Option<BlockHeight>,
utxo_validation: Option<bool>,
) -> anyhow::Result<Vec<Receipt>> {
self.block_producer
.dry_run(transaction, height, utxo_validation)
.await
}
}
impl BlockImporter for BlockImporterAdapter {
type Database = Database;
fn commit_result(
&self,
result: UncommittedImporterResult<StorageTransaction<Self::Database>>,
) -> anyhow::Result<()> {
self.block_importer
.commit_result(result)
.map_err(Into::into)
}
}