#![recursion_limit="256"]
#![warn(missing_docs)]
#![warn(unused_extern_crates)]
mod api;
pub mod error;
mod revalidation;
#[cfg(any(feature = "test-helpers", test))]
pub mod testing;
pub use sc_transaction_graph as txpool;
pub use crate::api::{FullChainApi, LightChainApi};
use std::{collections::HashMap, sync::Arc, pin::Pin};
use futures::{Future, FutureExt, future::ready};
use parking_lot::Mutex;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic},
};
use sp_transaction_pool::{
TransactionPool, PoolStatus, ImportNotificationStream, TxHash, TransactionFor,
TransactionStatusStreamFor, MaintainedTransactionPool, PoolFuture, ChainEvent,
};
use wasm_timer::Instant;
pub struct BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
{
pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
api: Arc<PoolApi>,
revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
}
#[cfg(not(target_os = "unknown"))]
impl<PoolApi, Block> parity_util_mem::MallocSizeOf for BasicPool<PoolApi, Block>
where
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
PoolApi::Hash: parity_util_mem::MallocSizeOf,
Block: BlockT,
{
fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
self.pool.size_of(ops)
}
}
pub enum RevalidationType {
Light,
Full,
}
impl<PoolApi, Block> BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash> + 'static,
{
pub fn new(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
) -> (Self, Option<Pin<Box<dyn Future<Output=()> + Send>>>) {
Self::with_revalidation_type(options, pool_api, RevalidationType::Full)
}
pub fn with_revalidation_type(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
revalidation_type: RevalidationType,
) -> (Self, Option<Pin<Box<dyn Future<Output=()> + Send>>>) {
let pool = Arc::new(sc_transaction_graph::Pool::new(options, pool_api.clone()));
let (revalidation_queue, background_task) = match revalidation_type {
RevalidationType::Light => (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
RevalidationType::Full => {
let (queue, background) = revalidation::RevalidationQueue::new_background(pool_api.clone(), pool.clone());
(queue, Some(background))
},
};
(
BasicPool {
api: pool_api,
pool,
revalidation_queue: Arc::new(revalidation_queue),
revalidation_strategy: Arc::new(Mutex::new(
match revalidation_type {
RevalidationType::Light => RevalidationStrategy::Light(RevalidationStatus::NotScheduled),
RevalidationType::Full => RevalidationStrategy::Always,
}
)),
},
background_task,
)
}
pub fn pool(&self) -> &Arc<sc_transaction_graph::Pool<PoolApi>> {
&self.pool
}
}
impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
{
type Block = PoolApi::Block;
type Hash = sc_transaction_graph::ExHash<PoolApi>;
type InPoolTransaction = sc_transaction_graph::base_pool::Transaction<TxHash<Self>, TransactionFor<Self>>;
type Error = PoolApi::Error;
fn submit_at(
&self,
at: &BlockId<Self::Block>,
xts: Vec<TransactionFor<Self>>,
) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
let pool = self.pool.clone();
let at = *at;
async move {
pool.submit_at(&at, xts, false).await
}.boxed()
}
fn submit_one(
&self,
at: &BlockId<Self::Block>,
xt: TransactionFor<Self>,
) -> PoolFuture<TxHash<Self>, Self::Error> {
let pool = self.pool.clone();
let at = *at;
async move {
pool.submit_one(&at, xt).await
}.boxed()
}
fn submit_and_watch(
&self,
at: &BlockId<Self::Block>,
xt: TransactionFor<Self>,
) -> PoolFuture<Box<TransactionStatusStreamFor<Self>>, Self::Error> {
let at = *at;
let pool = self.pool.clone();
async move {
pool.submit_and_watch(&at, xt)
.map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _))
.await
}.boxed()
}
fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
self.pool.validated_pool().remove_invalid(hashes)
}
fn status(&self) -> PoolStatus {
self.pool.validated_pool().status()
}
fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>> {
Box::new(self.pool.validated_pool().ready())
}
fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
self.pool.validated_pool().import_notification_stream()
}
fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
self.pool.hash_of(xt)
}
fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
self.pool.validated_pool().on_broadcasted(propagations)
}
fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
self.pool.validated_pool().ready_by_hash(hash)
}
}
#[cfg_attr(test, derive(Debug))]
enum RevalidationStatus<N> {
NotScheduled,
Scheduled(Option<Instant>, Option<N>),
InProgress,
}
enum RevalidationStrategy<N> {
Always,
Light(RevalidationStatus<N>),
}
struct RevalidationAction {
revalidate: bool,
resubmit: bool,
}
impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
pub fn clear(&mut self) {
if let Self::Light(status) = self {
status.clear()
}
}
pub fn next(
&mut self,
block: N,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<N>,
) -> RevalidationAction {
match self {
Self::Light(status) => RevalidationAction {
revalidate: status.next_required(
block,
revalidate_time_period,
revalidate_block_period,
),
resubmit: false,
},
Self::Always => RevalidationAction {
revalidate: true,
resubmit: true,
}
}
}
}
impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
pub fn clear(&mut self) {
*self = Self::NotScheduled;
}
pub fn next_required(
&mut self,
block: N,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<N>,
) -> bool {
match *self {
Self::NotScheduled => {
*self = Self::Scheduled(
revalidate_time_period.map(|period| Instant::now() + period),
revalidate_block_period.map(|period| block + period),
);
false
}
Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false)
|| revalidate_at_block.map(|at| block >= at).unwrap_or(false);
if is_required {
*self = Self::InProgress;
}
is_required
}
Self::InProgress => false,
}
}
}
impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
{
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output=()> + Send>> {
match event {
ChainEvent::NewBlock { id, retracted, .. } => {
let id = id.clone();
let pool = self.pool.clone();
let api = self.api.clone();
let block_number = match api.block_id_to_number(&id) {
Ok(Some(number)) => number,
_ => {
log::trace!(target: "txqueue", "Skipping chain event - no number for that block {:?}", id);
return Box::pin(ready(()));
}
};
let next_action = self.revalidation_strategy.lock().next(
block_number,
Some(std::time::Duration::from_secs(60)),
Some(20.into()),
);
let revalidation_strategy = self.revalidation_strategy.clone();
let retracted = retracted.clone();
let revalidation_queue = self.revalidation_queue.clone();
async move {
if !pool.validated_pool().status().is_empty() {
let hashes = api.block_body(&id).await
.unwrap_or_else(|e| {
log::warn!("Prune known transactions: error request {:?}!", e);
None
})
.unwrap_or_default()
.into_iter()
.map(|tx| pool.hash_of(&tx))
.collect::<Vec<_>>();
if let Err(e) = pool.prune_known(&id, &hashes) {
log::error!("Cannot prune known in the pool {:?}!", e);
}
}
if next_action.resubmit {
let mut resubmit_transactions = Vec::new();
for retracted_hash in retracted {
pool.validated_pool().on_block_retracted(retracted_hash.clone());
let block_transactions = api.block_body(&BlockId::hash(retracted_hash.clone())).await
.unwrap_or_else(|e| {
log::warn!("Failed to fetch block body {:?}!", e);
None
})
.unwrap_or_default()
.into_iter()
.filter(|tx| tx.is_signed().unwrap_or(true));
resubmit_transactions.extend(block_transactions);
}
if let Err(e) = pool.submit_at(&id, resubmit_transactions, true).await {
log::debug!(
target: "txpool",
"[{:?}] Error re-submitting transactions: {:?}", id, e
)
}
}
if next_action.revalidate {
let hashes = pool.validated_pool().ready().map(|tx| tx.hash.clone()).collect();
revalidation_queue.revalidate_later(block_number, hashes).await;
}
revalidation_strategy.lock().clear();
}.boxed()
}
ChainEvent::Finalized { hash } => {
let pool = self.pool.clone();
async move {
if let Err(e) = pool.validated_pool().on_block_finalized(hash).await {
log::warn!(
target: "txpool",
"Error [{}] occurred while attempting to notify watchers of finalization {}",
e, hash
)
}
}.boxed()
}
}
}
}