use crate::{
containers::{
dependency::Dependency,
price_sort::TipSort,
time_sort::TimeSort,
},
ports::TxPoolDb,
service::TxStatusChange,
types::*,
Config,
Error,
TxInfo,
};
use fuel_core_types::{
fuel_tx::Transaction,
fuel_types::BlockHeight,
fuel_vm::checked_transaction::{
CheckPredicates,
Checked,
CheckedTransaction,
Checks,
IntoChecked,
},
services::txpool::{
ArcPoolTx,
InsertionResult,
},
tai64::Tai64,
};
use crate::ports::{
GasPriceProvider,
MemoryPool,
};
use fuel_core_metrics::txpool_metrics::txpool_metrics;
use fuel_core_storage::transactional::AtomicView;
use fuel_core_types::{
fuel_tx::{
input::{
coin::{
CoinPredicate,
CoinSigned,
},
message::{
MessageCoinPredicate,
MessageCoinSigned,
MessageDataPredicate,
MessageDataSigned,
},
},
ConsensusParameters,
Input,
},
fuel_vm::{
checked_transaction::CheckPredicateParams,
interpreter::Memory,
},
services::executor::TransactionExecutionStatus,
};
use std::{
cmp::Reverse,
collections::HashMap,
ops::Deref,
sync::Arc,
};
#[cfg(test)]
mod test_helpers;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone)]
pub struct TxPool<ViewProvider> {
by_hash: HashMap<TxId, TxInfo>,
by_tip: TipSort,
by_time: TimeSort,
by_dependency: Dependency,
config: Config,
database: ViewProvider,
}
impl<ViewProvider> TxPool<ViewProvider> {
pub fn new(config: Config, database: ViewProvider) -> Self {
let max_depth = config.max_depth;
Self {
by_hash: HashMap::new(),
by_tip: TipSort::default(),
by_time: TimeSort::default(),
by_dependency: Dependency::new(max_depth, config.utxo_validation),
config,
database,
}
}
#[cfg(test)]
pub fn config(&self) -> &Config {
&self.config
}
#[cfg(test)]
pub fn config_mut(&mut self) -> &mut Config {
&mut self.config
}
pub fn txs(&self) -> &HashMap<TxId, TxInfo> {
&self.by_hash
}
pub fn dependency(&self) -> &Dependency {
&self.by_dependency
}
pub fn sorted_includable(&self) -> impl Iterator<Item = ArcPoolTx> + '_ {
self.by_tip.sort.iter().rev().map(|(_, tx)| tx.clone())
}
pub fn remove_inner(&mut self, tx: &ArcPoolTx) -> Vec<ArcPoolTx> {
self.remove_by_tx_id(&tx.id())
}
pub fn remove_by_tx_id(&mut self, tx_id: &TxId) -> Vec<ArcPoolTx> {
if let Some(tx) = self.remove_tx(tx_id) {
let removed = self
.by_dependency
.recursively_remove_all_dependencies(&self.by_hash, tx.tx().clone());
for remove in removed.iter() {
self.remove_tx(&remove.id());
}
return removed
}
Vec::new()
}
fn remove_tx(&mut self, tx_id: &TxId) -> Option<TxInfo> {
let info = self.by_hash.remove(tx_id);
if let Some(info) = &info {
self.by_time.remove(info);
self.by_tip.remove(info);
}
info
}
pub fn remove_committed_tx(&mut self, tx_id: &TxId) -> Vec<ArcPoolTx> {
self.remove_by_tx_id(tx_id)
}
pub fn find(&self, hashes: &[TxId]) -> Vec<Option<TxInfo>> {
let mut res = Vec::with_capacity(hashes.len());
for hash in hashes {
res.push(self.txs().get(hash).cloned());
}
res
}
pub fn find_one(&self, hash: &TxId) -> Option<TxInfo> {
self.txs().get(hash).cloned()
}
pub fn find_dependent(&self, hashes: &[TxId]) -> Vec<ArcPoolTx> {
let mut seen = HashMap::new();
{
for hash in hashes {
if let Some(tx) = self.txs().get(hash) {
self.dependency().find_dependent(
tx.tx().clone(),
&mut seen,
self.txs(),
);
}
}
}
let mut list: Vec<_> = seen.into_values().collect();
list.sort_by_key(|tx| Reverse(tx.tip()));
list
}
pub fn pending_number(&self) -> usize {
self.by_hash.len()
}
pub fn consumable_gas(&self) -> u64 {
self.by_hash.values().map(|tx| tx.max_gas()).sum()
}
pub fn includable(&mut self) -> impl Iterator<Item = ArcPoolTx> + '_ {
self.sorted_includable()
}
pub fn block_update(
&mut self,
tx_status: &[TransactionExecutionStatus],
) {
for status in tx_status {
let tx_id = status.id;
self.remove_committed_tx(&tx_id);
}
}
pub fn remove(
&mut self,
tx_status_sender: &TxStatusChange,
tx_ids: Vec<(TxId, String)>,
) -> Vec<ArcPoolTx> {
let mut removed = Vec::new();
for (tx_id, reason) in tx_ids.into_iter() {
let rem = self.remove_by_tx_id(&tx_id);
tx_status_sender.send_squeezed_out(tx_id, Error::SqueezedOut(reason.clone()));
for dependent_tx in rem.iter() {
if tx_id != dependent_tx.id() {
tx_status_sender.send_squeezed_out(
dependent_tx.id(),
Error::SqueezedOut(
format!("Parent transaction with {tx_id}, was removed because of the {reason}")
)
);
}
}
removed.extend(rem.into_iter());
}
removed
}
pub fn prune_old_txs(&mut self) -> Vec<ArcPoolTx> {
let Some(deadline) =
tokio::time::Instant::now().checked_sub(self.config.transaction_ttl)
else {
return vec![]
};
let mut result = vec![];
while let Some((oldest_time, oldest_tx)) = self.by_time.lowest() {
let oldest_tx = oldest_tx.clone();
if oldest_time.created() <= &deadline {
let removed = self.remove_inner(&oldest_tx);
result.extend(removed.into_iter());
} else {
break
}
}
result
}
fn check_blacklisting(&self, tx: &PoolTransaction) -> Result<(), Error> {
for input in tx.inputs() {
match input {
Input::CoinSigned(CoinSigned { utxo_id, owner, .. })
| Input::CoinPredicate(CoinPredicate { utxo_id, owner, .. }) => {
if self.config.blacklist.contains_coin(utxo_id) {
return Err(Error::BlacklistedUTXO(*utxo_id))
}
if self.config.blacklist.contains_address(owner) {
return Err(Error::BlacklistedOwner(*owner))
}
}
Input::Contract(contract) => {
if self
.config
.blacklist
.contains_contract(&contract.contract_id)
{
return Err(Error::BlacklistedContract(contract.contract_id))
}
}
Input::MessageCoinSigned(MessageCoinSigned {
nonce,
sender,
recipient,
..
})
| Input::MessageCoinPredicate(MessageCoinPredicate {
nonce,
sender,
recipient,
..
})
| Input::MessageDataSigned(MessageDataSigned {
nonce,
sender,
recipient,
..
})
| Input::MessageDataPredicate(MessageDataPredicate {
nonce,
sender,
recipient,
..
}) => {
if self.config.blacklist.contains_message(nonce) {
return Err(Error::BlacklistedMessage(*nonce))
}
if self.config.blacklist.contains_address(sender) {
return Err(Error::BlacklistedOwner(*sender))
}
if self.config.blacklist.contains_address(recipient) {
return Err(Error::BlacklistedOwner(*recipient))
}
}
}
}
Ok(())
}
}
impl<ViewProvider, View> TxPool<ViewProvider>
where
ViewProvider: AtomicView<View = View>,
View: TxPoolDb,
{
#[cfg(test)]
fn insert_single(
&mut self,
tx: Checked<Transaction>,
) -> Result<InsertionResult, Error> {
let view = self.database.latest_view();
self.insert_inner(tx, &view)
}
#[tracing::instrument(level = "debug", skip_all, fields(tx_id = %tx.id()), ret, err)]
fn insert_inner(
&mut self,
tx: Checked<Transaction>,
view: &View,
) -> Result<InsertionResult, Error> {
let tx: CheckedTransaction = tx.into();
let tx = Arc::new(match tx {
CheckedTransaction::Script(tx) => PoolTransaction::Script(tx),
CheckedTransaction::Create(tx) => PoolTransaction::Create(tx),
CheckedTransaction::Mint(_) => return Err(Error::MintIsDisallowed),
CheckedTransaction::Upgrade(tx) => PoolTransaction::Upgrade(tx),
CheckedTransaction::Upload(tx) => PoolTransaction::Upload(tx),
});
self.check_blacklisting(tx.as_ref())?;
if !tx.is_computed() {
return Err(Error::NoMetadata)
}
if self.by_hash.contains_key(&tx.id()) {
return Err(Error::NotInsertedTxKnown)
}
let mut max_limit_hit = false;
if self.by_hash.len() >= self.config.max_tx {
max_limit_hit = true;
let lowest_tip = self.by_tip.lowest_value().unwrap_or_default();
if lowest_tip >= tx.tip() {
return Err(Error::NotInsertedLimitHit)
}
}
if self.config.metrics {
txpool_metrics()
.tx_size_histogram
.observe(tx.metered_bytes_size() as f64);
}
let rem = self.by_dependency.insert(&self.by_hash, view, &tx)?;
let info = TxInfo::new(tx.clone());
let submitted_time = info.submitted_time();
self.by_tip.insert(&info);
self.by_time.insert(&info);
self.by_hash.insert(tx.id(), info);
let removed = if rem.is_empty() {
if max_limit_hit {
let rem_tx = self.by_tip.lowest_tx().unwrap(); self.remove_inner(&rem_tx);
vec![rem_tx]
} else {
Vec::new()
}
} else {
for rem in rem.iter() {
self.remove_tx(&rem.id());
}
rem
};
Ok(InsertionResult {
inserted: tx,
submitted_time,
removed,
})
}
#[tracing::instrument(level = "info", skip_all)]
pub fn insert(
&mut self,
tx_status_sender: &TxStatusChange,
txs: Vec<Checked<Transaction>>,
) -> Vec<Result<InsertionResult, Error>> {
let mut res = Vec::new();
let view = self.database.latest_view();
for tx in txs.into_iter() {
res.push(self.insert_inner(tx, &view));
}
for ret in res.iter() {
match ret {
Ok(InsertionResult {
removed,
inserted,
submitted_time,
}) => {
for removed in removed {
tx_status_sender.send_squeezed_out(removed.id(), Error::Removed);
}
tx_status_sender.send_submitted(
inserted.id(),
Tai64::from_unix(submitted_time.as_secs() as i64),
);
}
Err(_) => {
}
}
}
res
}
}
pub async fn check_transactions<Provider, MP>(
txs: &[Arc<Transaction>],
current_height: BlockHeight,
utxp_validation: bool,
consensus_params: &ConsensusParameters,
gas_price_provider: &Provider,
memory_pool: Arc<MP>,
) -> Vec<Result<Checked<Transaction>, Error>>
where
Provider: GasPriceProvider,
MP: MemoryPool,
{
let mut checked_txs = Vec::with_capacity(txs.len());
for tx in txs.iter() {
checked_txs.push(
check_single_tx(
tx.deref().clone(),
current_height,
utxp_validation,
consensus_params,
gas_price_provider,
memory_pool.get_memory().await,
)
.await,
);
}
checked_txs
}
pub async fn check_single_tx<GasPrice, M>(
tx: Transaction,
current_height: BlockHeight,
utxo_validation: bool,
consensus_params: &ConsensusParameters,
gas_price_provider: &GasPrice,
memory: M,
) -> Result<Checked<Transaction>, Error>
where
GasPrice: GasPriceProvider,
M: Memory + Send + Sync + 'static,
{
if tx.is_mint() {
return Err(Error::NotSupportedTransactionType)
}
let tx: Checked<Transaction> = if utxo_validation {
let tx = tx
.into_checked_basic(current_height, consensus_params)?
.check_signatures(&consensus_params.chain_id())?;
let parameters = CheckPredicateParams::from(consensus_params);
let tx =
tokio_rayon::spawn_fifo(move || tx.check_predicates(¶meters, memory))
.await?;
debug_assert!(tx.checks().contains(Checks::all()));
tx
} else {
tx.into_checked_basic(current_height, consensus_params)?
};
let gas_price = gas_price_provider
.gas_price(current_height)
.ok_or(Error::GasPriceNotFound(current_height))?;
let tx = verify_tx_min_gas_price(tx, consensus_params, gas_price)?;
Ok(tx)
}
fn verify_tx_min_gas_price(
tx: Checked<Transaction>,
consensus_params: &ConsensusParameters,
gas_price: GasPrice,
) -> Result<Checked<Transaction>, Error> {
let tx: CheckedTransaction = tx.into();
let gas_costs = consensus_params.gas_costs();
let fee_parameters = consensus_params.fee_params();
let read = match tx {
CheckedTransaction::Script(script) => {
let ready = script.into_ready(gas_price, gas_costs, fee_parameters)?;
let (_, checked) = ready.decompose();
CheckedTransaction::Script(checked)
}
CheckedTransaction::Create(create) => {
let ready = create.into_ready(gas_price, gas_costs, fee_parameters)?;
let (_, checked) = ready.decompose();
CheckedTransaction::Create(checked)
}
CheckedTransaction::Upgrade(tx) => {
let ready = tx.into_ready(gas_price, gas_costs, fee_parameters)?;
let (_, checked) = ready.decompose();
CheckedTransaction::Upgrade(checked)
}
CheckedTransaction::Upload(tx) => {
let ready = tx.into_ready(gas_price, gas_costs, fee_parameters)?;
let (_, checked) = ready.decompose();
CheckedTransaction::Upload(checked)
}
CheckedTransaction::Mint(_) => return Err(Error::MintIsDisallowed),
};
Ok(read.into())
}