use crate::{
containers::{
dependency::Dependency,
price_sort::PriceSort,
time_sort::TimeSort,
},
ports::TxPoolDb,
service::TxStatusChange,
types::*,
Config,
Error,
TxInfo,
};
use fuel_core_metrics::txpool_metrics::TXPOOL_METRICS;
use fuel_core_types::{
fuel_tx::{
Chargeable,
Transaction,
UniqueIdentifier,
},
fuel_vm::checked_transaction::{
CheckedTransaction,
IntoChecked,
},
services::txpool::{
ArcPoolTx,
InsertionResult,
},
};
use std::{
cmp::Reverse,
collections::HashMap,
ops::Deref,
sync::Arc,
};
#[derive(Debug, Clone)]
pub struct TxPool<DB> {
by_hash: HashMap<TxId, TxInfo>,
by_gas_price: PriceSort,
by_time: TimeSort,
by_dependency: Dependency,
config: Config,
database: DB,
}
impl<DB> TxPool<DB>
where
DB: TxPoolDb,
{
pub fn new(config: Config, database: DB) -> Self {
let max_depth = config.max_depth;
Self {
by_hash: HashMap::new(),
by_gas_price: PriceSort::default(),
by_time: TimeSort::default(),
by_dependency: Dependency::new(max_depth, config.utxo_validation),
config,
database,
}
}
pub fn txs(&self) -> &HashMap<TxId, TxInfo> {
&self.by_hash
}
pub fn dependency(&self) -> &Dependency {
&self.by_dependency
}
#[tracing::instrument(level = "info", skip_all, fields(tx_id = %tx.id()), ret, err)]
fn insert_inner(
&mut self,
tx: Arc<Transaction>,
) -> anyhow::Result<InsertionResult> {
let current_height = self.database.current_block_height()?;
if tx.is_mint() {
return Err(Error::NotSupportedTransactionType.into())
}
self.verify_tx_min_gas_price(&tx)?;
let tx: CheckedTransaction = if self.config.utxo_validation {
tx.deref()
.clone()
.into_checked(
current_height.into(),
&self.config.chain_config.transaction_parameters,
&self.config.chain_config.gas_costs,
)?
.into()
} else {
tx.deref()
.clone()
.into_checked_basic(
current_height.into(),
&self.config.chain_config.transaction_parameters,
)?
.into()
};
let tx = Arc::new(match tx {
CheckedTransaction::Script(script) => PoolTransaction::Script(script),
CheckedTransaction::Create(create) => PoolTransaction::Create(create),
CheckedTransaction::Mint(_) => unreachable!(),
});
if !tx.is_computed() {
return Err(Error::NoMetadata.into())
}
if tx.max_gas() > self.config.chain_config.block_gas_limit {
return Err(Error::NotInsertedMaxGasLimit {
tx_gas: tx.max_gas(),
block_limit: self.config.chain_config.block_gas_limit,
}
.into())
}
if self.by_hash.contains_key(&tx.id()) {
return Err(Error::NotInsertedTxKnown.into())
}
let mut max_limit_hit = false;
if self.by_hash.len() >= self.config.max_tx {
max_limit_hit = true;
let lowest_price = self.by_gas_price.lowest_value().unwrap_or_default();
if lowest_price >= tx.price() {
return Err(Error::NotInsertedLimitHit.into())
}
}
if self.config.metrics {
TXPOOL_METRICS
.gas_price_histogram
.observe(tx.price() as f64);
TXPOOL_METRICS
.tx_size_histogram
.observe(tx.metered_bytes_size() as f64);
}
let rem = self
.by_dependency
.insert(&self.by_hash, &self.database, &tx)?;
let info = TxInfo::new(tx.clone());
self.by_gas_price.insert(&info);
self.by_time.insert(&info);
self.by_hash.insert(tx.id(), info);
let removed = if rem.is_empty() {
if max_limit_hit {
let rem_tx = self.by_gas_price.lowest_tx().unwrap(); self.remove_inner(&rem_tx);
vec![rem_tx]
} else {
Vec::new()
}
} else {
for rem in rem.iter() {
self.remove_tx(&rem.id());
}
rem
};
Ok(InsertionResult {
inserted: tx,
removed,
})
}
pub fn sorted_includable(&self) -> impl Iterator<Item = ArcPoolTx> + '_ {
self.by_gas_price
.sort
.iter()
.rev()
.map(|(_, tx)| tx.clone())
}
pub fn remove_inner(&mut self, tx: &ArcPoolTx) -> Vec<ArcPoolTx> {
self.remove_by_tx_id(&tx.id())
}
pub fn remove_by_tx_id(&mut self, tx_id: &TxId) -> Vec<ArcPoolTx> {
if let Some(tx) = self.remove_tx(tx_id) {
let removed = self
.by_dependency
.recursively_remove_all_dependencies(&self.by_hash, tx.tx().clone());
for remove in removed.iter() {
self.remove_tx(&remove.id());
}
return removed
}
Vec::new()
}
fn remove_tx(&mut self, tx_id: &TxId) -> Option<TxInfo> {
let info = self.by_hash.remove(tx_id);
if let Some(info) = &info {
self.by_time.remove(info);
self.by_gas_price.remove(info);
}
info
}
pub fn remove_committed_tx(&mut self, tx_id: &TxId) -> Vec<ArcPoolTx> {
self.remove_by_tx_id(tx_id)
}
fn verify_tx_min_gas_price(&mut self, tx: &Transaction) -> Result<(), Error> {
let price = match tx {
Transaction::Script(script) => script.price(),
Transaction::Create(create) => create.price(),
Transaction::Mint(_) => unreachable!(),
};
if self.config.metrics {
TXPOOL_METRICS.gas_price_histogram.observe(price as f64);
}
if price < self.config.min_gas_price {
return Err(Error::NotInsertedGasPriceTooLow)
}
Ok(())
}
#[tracing::instrument(level = "info", skip_all)]
pub fn insert(
&mut self,
tx_status_sender: &TxStatusChange,
txs: &[Arc<Transaction>],
) -> Vec<anyhow::Result<InsertionResult>> {
let mut res = Vec::new();
for tx in txs.iter() {
res.push(self.insert_inner(tx.clone()))
}
for ret in res.iter() {
match ret {
Ok(InsertionResult { removed, inserted }) => {
for removed in removed {
tx_status_sender.send_squeezed_out(removed.id(), Error::Removed);
}
tx_status_sender.send_submitted(inserted.id());
}
Err(_) => {
}
}
}
res
}
pub fn find(&self, hashes: &[TxId]) -> Vec<Option<TxInfo>> {
let mut res = Vec::with_capacity(hashes.len());
for hash in hashes {
res.push(self.txs().get(hash).cloned());
}
res
}
pub fn find_one(&self, hash: &TxId) -> Option<TxInfo> {
self.txs().get(hash).cloned()
}
pub fn find_dependent(&self, hashes: &[TxId]) -> Vec<ArcPoolTx> {
let mut seen = HashMap::new();
{
for hash in hashes {
if let Some(tx) = self.txs().get(hash) {
self.dependency().find_dependent(
tx.tx().clone(),
&mut seen,
self.txs(),
);
}
}
}
let mut list: Vec<_> = seen.into_values().collect();
list.sort_by_key(|tx| Reverse(tx.price()));
list
}
pub fn pending_number(&self) -> usize {
self.by_hash.len()
}
pub fn consumable_gas(&self) -> u64 {
self.by_hash.values().map(|tx| tx.limit()).sum()
}
pub fn includable(&mut self) -> impl Iterator<Item = ArcPoolTx> + '_ {
self.sorted_includable()
}
pub fn block_update(
&mut self,
tx_status_sender: &TxStatusChange,
transactions: &[TxId],
) {
for tx_id in transactions {
tx_status_sender.send_complete(*tx_id);
self.remove_committed_tx(tx_id);
}
}
pub fn remove(
&mut self,
tx_status_sender: &TxStatusChange,
tx_ids: &[TxId],
) -> Vec<ArcPoolTx> {
let mut removed = Vec::new();
for tx_id in tx_ids {
let rem = self.remove_by_tx_id(tx_id);
tx_status_sender.send_squeezed_out(*tx_id, Error::Removed);
removed.extend(rem.into_iter());
}
removed
}
pub fn prune_old_txs(&mut self) -> Vec<ArcPoolTx> {
let deadline = tokio::time::Instant::now() - self.config.transaction_ttl;
let mut result = vec![];
while let Some((oldest_time, oldest_tx)) = self.by_time.lowest() {
let oldest_tx = oldest_tx.clone();
if oldest_time.created() <= &deadline {
let removed = self.remove_inner(&oldest_tx);
result.extend(removed.into_iter());
} else {
break
}
}
result
}
}
#[cfg(test)]
mod test_helpers;
#[cfg(test)]
mod tests;