sc_transaction_pool/graph/
pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{common::log_xt::log_xt_trace, LOG_TARGET};
20use futures::{channel::mpsc::Receiver, Future};
21use indexmap::IndexMap;
22use sc_transaction_pool_api::error;
23use sp_blockchain::{HashAndNumber, TreeRoute};
24use sp_runtime::{
25	generic::BlockId,
26	traits::{self, Block as BlockT, SaturatedConversion},
27	transaction_validity::{
28		TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
29	},
30};
31use std::{
32	collections::HashMap,
33	sync::Arc,
34	time::{Duration, Instant},
35};
36
37use super::{
38	base_pool as base,
39	validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
40	watcher::Watcher,
41};
42
43/// Modification notification event stream type;
44pub type EventStream<H> = Receiver<H>;
45
46/// Block hash type for a pool.
47pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
48/// Extrinsic hash type for a pool.
49pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
50/// Extrinsic type for a pool (reference counted).
51pub type ExtrinsicFor<A> = Arc<<<A as ChainApi>::Block as traits::Block>::Extrinsic>;
52/// Extrinsic type for a pool (raw data).
53pub type RawExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
54/// Block number type for the ChainApi
55pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
56/// A type of transaction stored in the pool
57pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
58/// A type of validated transaction stored in the pool.
59pub type ValidatedTransactionFor<A> =
60	ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
61
62/// Concrete extrinsic validation and query logic.
63pub trait ChainApi: Send + Sync {
64	/// Block type.
65	type Block: BlockT;
66	/// Error type.
67	type Error: From<error::Error> + error::IntoPoolError;
68	/// Validate transaction future.
69	type ValidationFuture: Future<Output = Result<TransactionValidity, Self::Error>> + Send + Unpin;
70	/// Body future (since block body might be remote)
71	type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>>
72		+ Unpin
73		+ Send
74		+ 'static;
75
76	/// Asynchronously verify extrinsic at given block.
77	fn validate_transaction(
78		&self,
79		at: <Self::Block as BlockT>::Hash,
80		source: TransactionSource,
81		uxt: ExtrinsicFor<Self>,
82	) -> Self::ValidationFuture;
83
84	/// Synchronously verify given extrinsic at given block.
85	///
86	/// Validates a transaction by calling into the runtime. Same as `validate_transaction` but
87	/// blocks the current thread when performing validation.
88	fn validate_transaction_blocking(
89		&self,
90		at: <Self::Block as BlockT>::Hash,
91		source: TransactionSource,
92		uxt: ExtrinsicFor<Self>,
93	) -> Result<TransactionValidity, Self::Error>;
94
95	/// Returns a block number given the block id.
96	fn block_id_to_number(
97		&self,
98		at: &BlockId<Self::Block>,
99	) -> Result<Option<NumberFor<Self>>, Self::Error>;
100
101	/// Returns a block hash given the block id.
102	fn block_id_to_hash(
103		&self,
104		at: &BlockId<Self::Block>,
105	) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
106
107	/// Returns hash and encoding length of the extrinsic.
108	fn hash_and_length(&self, uxt: &RawExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
109
110	/// Returns a block body given the block.
111	fn block_body(&self, at: <Self::Block as BlockT>::Hash) -> Self::BodyFuture;
112
113	/// Returns a block header given the block id.
114	fn block_header(
115		&self,
116		at: <Self::Block as BlockT>::Hash,
117	) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
118
119	/// Compute a tree-route between two blocks. See [`TreeRoute`] for more details.
120	fn tree_route(
121		&self,
122		from: <Self::Block as BlockT>::Hash,
123		to: <Self::Block as BlockT>::Hash,
124	) -> Result<TreeRoute<Self::Block>, Self::Error>;
125
126	/// Resolves block number by id.
127	fn resolve_block_number(
128		&self,
129		at: <Self::Block as BlockT>::Hash,
130	) -> Result<NumberFor<Self>, Self::Error> {
131		self.block_id_to_number(&BlockId::Hash(at)).and_then(|number| {
132			number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
133		})
134	}
135}
136
137/// Pool configuration options.
138#[derive(Debug, Clone)]
139pub struct Options {
140	/// Ready queue limits.
141	pub ready: base::Limit,
142	/// Future queue limits.
143	pub future: base::Limit,
144	/// Reject future transactions.
145	pub reject_future_transactions: bool,
146	/// How long the extrinsic is banned for.
147	pub ban_time: Duration,
148}
149
150impl Default for Options {
151	fn default() -> Self {
152		Self {
153			ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
154			future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
155			reject_future_transactions: false,
156			ban_time: Duration::from_secs(60 * 30),
157		}
158	}
159}
160
161/// Should we check that the transaction is banned
162/// in the pool, before we verify it?
163#[derive(Copy, Clone)]
164enum CheckBannedBeforeVerify {
165	Yes,
166	No,
167}
168
169/// Extrinsics pool that performs validation.
170pub struct Pool<B: ChainApi> {
171	validated_pool: Arc<ValidatedPool<B>>,
172}
173
174impl<B: ChainApi> Pool<B> {
175	/// Create a new transaction pool.
176	pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
177		Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
178	}
179
180	/// Imports a bunch of unverified extrinsics to the pool
181	pub async fn submit_at(
182		&self,
183		at: &HashAndNumber<B::Block>,
184		source: TransactionSource,
185		xts: impl IntoIterator<Item = ExtrinsicFor<B>>,
186	) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
187		let xts = xts.into_iter().map(|xt| (source, xt));
188		let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await;
189		self.validated_pool.submit(validated_transactions.into_values())
190	}
191
192	/// Resubmit the given extrinsics to the pool.
193	///
194	/// This does not check if a transaction is banned, before we verify it again.
195	pub async fn resubmit_at(
196		&self,
197		at: &HashAndNumber<B::Block>,
198		source: TransactionSource,
199		xts: impl IntoIterator<Item = ExtrinsicFor<B>>,
200	) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
201		let xts = xts.into_iter().map(|xt| (source, xt));
202		let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await;
203		self.validated_pool.submit(validated_transactions.into_values())
204	}
205
206	/// Imports one unverified extrinsic to the pool
207	pub async fn submit_one(
208		&self,
209		at: &HashAndNumber<B::Block>,
210		source: TransactionSource,
211		xt: ExtrinsicFor<B>,
212	) -> Result<ExtrinsicHash<B>, B::Error> {
213		let res = self.submit_at(at, source, std::iter::once(xt)).await.pop();
214		res.expect("One extrinsic passed; one result returned; qed")
215	}
216
217	/// Import a single extrinsic and starts to watch its progress in the pool.
218	pub async fn submit_and_watch(
219		&self,
220		at: &HashAndNumber<B::Block>,
221		source: TransactionSource,
222		xt: ExtrinsicFor<B>,
223	) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
224		let (_, tx) = self
225			.verify_one(at.hash, at.number, source, xt, CheckBannedBeforeVerify::Yes)
226			.await;
227		self.validated_pool.submit_and_watch(tx)
228	}
229
230	/// Resubmit some transaction that were validated elsewhere.
231	pub fn resubmit(
232		&self,
233		revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
234	) {
235		let now = Instant::now();
236		self.validated_pool.resubmit(revalidated_transactions);
237		log::trace!(
238			target: LOG_TARGET,
239			"Resubmitted. Took {} ms. Status: {:?}",
240			now.elapsed().as_millis(),
241			self.validated_pool.status()
242		);
243	}
244
245	/// Prunes known ready transactions.
246	///
247	/// Used to clear the pool from transactions that were part of recently imported block.
248	/// The main difference from the `prune` is that we do not revalidate any transactions
249	/// and ignore unknown passed hashes.
250	pub fn prune_known(&self, at: &HashAndNumber<B::Block>, hashes: &[ExtrinsicHash<B>]) {
251		// Get details of all extrinsics that are already in the pool
252		let in_pool_tags =
253			self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
254
255		// Prune all transactions that provide given tags
256		let prune_status = self.validated_pool.prune_tags(in_pool_tags);
257		let pruned_transactions =
258			hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
259		self.validated_pool.fire_pruned(at, pruned_transactions);
260	}
261
262	/// Prunes ready transactions.
263	///
264	/// Used to clear the pool from transactions that were part of recently imported block.
265	/// To perform pruning we need the tags that each extrinsic provides and to avoid calling
266	/// into runtime too often we first look up all extrinsics that are in the pool and get
267	/// their provided tags from there. Otherwise we query the runtime at the `parent` block.
268	pub async fn prune(
269		&self,
270		at: &HashAndNumber<B::Block>,
271		parent: <B::Block as BlockT>::Hash,
272		extrinsics: &[RawExtrinsicFor<B>],
273	) {
274		log::debug!(
275			target: LOG_TARGET,
276			"Starting pruning of block {:?} (extrinsics: {})",
277			at,
278			extrinsics.len()
279		);
280		// Get details of all extrinsics that are already in the pool
281		let in_pool_hashes =
282			extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
283		let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
284
285		// Zip the ones from the pool with the full list (we get pairs `(Extrinsic,
286		// Option<Vec<Tag>>)`)
287		let all = extrinsics.iter().zip(in_pool_tags.into_iter());
288		let mut validated_counter: usize = 0;
289
290		let mut future_tags = Vec::new();
291		for (extrinsic, in_pool_tags) in all {
292			match in_pool_tags {
293				// reuse the tags for extrinsics that were found in the pool
294				Some(tags) => future_tags.extend(tags),
295				// if it's not found in the pool query the runtime at parent block
296				// to get validity info and tags that the extrinsic provides.
297				None => {
298					// Avoid validating block txs if the pool is empty
299					if !self.validated_pool.status().is_empty() {
300						validated_counter = validated_counter + 1;
301						let validity = self
302							.validated_pool
303							.api()
304							.validate_transaction(
305								parent,
306								TransactionSource::InBlock,
307								Arc::from(extrinsic.clone()),
308							)
309							.await;
310
311						log::trace!(target: LOG_TARGET,"[{:?}] prune::revalidated {:?}", self.validated_pool.api().hash_and_length(&extrinsic.clone()).0, validity);
312
313						if let Ok(Ok(validity)) = validity {
314							future_tags.extend(validity.provides);
315						}
316					} else {
317						log::trace!(
318							target: LOG_TARGET,
319							"txpool is empty, skipping validation for block {at:?}",
320						);
321					}
322				},
323			}
324		}
325
326		log::trace!(target: LOG_TARGET,"prune: validated_counter:{validated_counter}");
327
328		self.prune_tags(at, future_tags, in_pool_hashes).await
329	}
330
331	/// Prunes ready transactions that provide given list of tags.
332	///
333	/// Given tags are assumed to be always provided now, so all transactions
334	/// in the Future Queue that require that particular tag (and have other
335	/// requirements satisfied) are promoted to Ready Queue.
336	///
337	/// Moreover for each provided tag we remove transactions in the pool that:
338	/// 1. Provide that tag directly
339	/// 2. Are a dependency of pruned transaction.
340	///
341	/// Returns transactions that have been removed from the pool and must be reverified
342	/// before reinserting to the pool.
343	///
344	/// By removing predecessor transactions as well we might actually end up
345	/// pruning too much, so all removed transactions are reverified against
346	/// the runtime (`validate_transaction`) to make sure they are invalid.
347	///
348	/// However we avoid revalidating transactions that are contained within
349	/// the second parameter of `known_imported_hashes`. These transactions
350	/// (if pruned) are not revalidated and become temporarily banned to
351	/// prevent importing them in the (near) future.
352	pub async fn prune_tags(
353		&self,
354		at: &HashAndNumber<B::Block>,
355		tags: impl IntoIterator<Item = Tag>,
356		known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
357	) {
358		log::trace!(target: LOG_TARGET, "Pruning at {:?}", at);
359		// Prune all transactions that provide given tags
360		let prune_status = self.validated_pool.prune_tags(tags);
361
362		// Make sure that we don't revalidate extrinsics that were part of the recently
363		// imported block. This is especially important for UTXO-like chains cause the
364		// inputs are pruned so such transaction would go to future again.
365		self.validated_pool
366			.ban(&Instant::now(), known_imported_hashes.clone().into_iter());
367
368		// Try to re-validate pruned transactions since some of them might be still valid.
369		// note that `known_imported_hashes` will be rejected here due to temporary ban.
370		let pruned_transactions =
371			prune_status.pruned.into_iter().map(|tx| (tx.source, tx.data.clone()));
372
373		let reverified_transactions =
374			self.verify(at, pruned_transactions, CheckBannedBeforeVerify::Yes).await;
375
376		let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect();
377
378		log::trace!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions: {}", &at, reverified_transactions.len());
379		log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "[{:?}] Resubmitting transaction: {:?}");
380
381		// And finally - submit reverified transactions back to the pool
382		self.validated_pool.resubmit_pruned(
383			&at,
384			known_imported_hashes,
385			pruned_hashes,
386			reverified_transactions.into_values().collect(),
387		)
388	}
389
390	/// Returns transaction hash
391	pub fn hash_of(&self, xt: &RawExtrinsicFor<B>) -> ExtrinsicHash<B> {
392		self.validated_pool.api().hash_and_length(xt).0
393	}
394
395	/// Returns future that validates a bunch of transactions at given block.
396	async fn verify(
397		&self,
398		at: &HashAndNumber<B::Block>,
399		xts: impl IntoIterator<Item = (TransactionSource, ExtrinsicFor<B>)>,
400		check: CheckBannedBeforeVerify,
401	) -> IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>> {
402		let HashAndNumber { number, hash } = *at;
403
404		let res = futures::future::join_all(
405			xts.into_iter()
406				.map(|(source, xt)| self.verify_one(hash, number, source, xt, check)),
407		)
408		.await
409		.into_iter()
410		.collect::<IndexMap<_, _>>();
411
412		res
413	}
414
415	/// Returns future that validates single transaction at given block.
416	async fn verify_one(
417		&self,
418		block_hash: <B::Block as BlockT>::Hash,
419		block_number: NumberFor<B>,
420		source: TransactionSource,
421		xt: ExtrinsicFor<B>,
422		check: CheckBannedBeforeVerify,
423	) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
424		let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
425
426		let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
427		if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
428			return (hash, ValidatedTransaction::Invalid(hash, err))
429		}
430
431		let validation_result = self
432			.validated_pool
433			.api()
434			.validate_transaction(block_hash, source, xt.clone())
435			.await;
436
437		let status = match validation_result {
438			Ok(status) => status,
439			Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
440		};
441
442		let validity = match status {
443			Ok(validity) =>
444				if validity.provides.is_empty() {
445					ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
446				} else {
447					ValidatedTransaction::valid_at(
448						block_number.saturated_into::<u64>(),
449						hash,
450						source,
451						xt,
452						bytes,
453						validity,
454					)
455				},
456			Err(TransactionValidityError::Invalid(e)) =>
457				ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
458			Err(TransactionValidityError::Unknown(e)) =>
459				ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
460		};
461
462		(hash, validity)
463	}
464
465	/// Get a reference to the underlying validated pool.
466	pub fn validated_pool(&self) -> &ValidatedPool<B> {
467		&self.validated_pool
468	}
469
470	/// Clears the recently pruned transactions in validated pool.
471	pub fn clear_recently_pruned(&mut self) {
472		self.validated_pool.pool.write().clear_recently_pruned();
473	}
474}
475
476impl<B: ChainApi> Pool<B> {
477	/// Deep clones the pool.
478	///
479	/// Must be called on purpose: it duplicates all the internal structures.
480	pub fn deep_clone(&self) -> Self {
481		let other: ValidatedPool<B> = (*self.validated_pool).clone();
482		Self { validated_pool: Arc::from(other) }
483	}
484}
485
486#[cfg(test)]
487mod tests {
488	use super::{super::base_pool::Limit, *};
489	use crate::common::tests::{pool, uxt, TestApi, INVALID_NONCE};
490	use assert_matches::assert_matches;
491	use codec::Encode;
492	use futures::executor::block_on;
493	use parking_lot::Mutex;
494	use sc_transaction_pool_api::TransactionStatus;
495	use sp_runtime::transaction_validity::TransactionSource;
496	use std::{collections::HashMap, time::Instant};
497	use substrate_test_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
498	use substrate_test_runtime_client::AccountKeyring::{Alice, Bob};
499
500	const SOURCE: TransactionSource = TransactionSource::External;
501
502	#[test]
503	fn should_validate_and_import_transaction() {
504		// given
505		let (pool, api) = pool();
506
507		// when
508		let hash = block_on(
509			pool.submit_one(
510				&api.expect_hash_and_number(0),
511				SOURCE,
512				uxt(Transfer {
513					from: Alice.into(),
514					to: AccountId::from_h256(H256::from_low_u64_be(2)),
515					amount: 5,
516					nonce: 0,
517				})
518				.into(),
519			),
520		)
521		.unwrap();
522
523		// then
524		assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
525	}
526
527	#[test]
528	fn submit_at_preserves_order() {
529		sp_tracing::try_init_simple();
530		// given
531		let (pool, api) = pool();
532
533		let txs = (0..10)
534			.map(|i| {
535				uxt(Transfer {
536					from: Alice.into(),
537					to: AccountId::from_h256(H256::from_low_u64_be(i)),
538					amount: 5,
539					nonce: i,
540				})
541				.into()
542			})
543			.collect::<Vec<_>>();
544
545		let initial_hashes = txs.iter().map(|t| api.hash_and_length(t).0).collect::<Vec<_>>();
546
547		// when
548		let txs = txs.into_iter().map(|x| Arc::from(x)).collect::<Vec<_>>();
549		let hashes = block_on(pool.submit_at(&api.expect_hash_and_number(0), SOURCE, txs));
550		log::debug!("--> {hashes:#?}");
551
552		// then
553		hashes.into_iter().zip(initial_hashes.into_iter()).for_each(
554			|(result_hash, initial_hash)| {
555				assert_eq!(result_hash.unwrap(), initial_hash);
556			},
557		);
558	}
559
560	#[test]
561	fn should_reject_if_temporarily_banned() {
562		// given
563		let (pool, api) = pool();
564		let uxt = uxt(Transfer {
565			from: Alice.into(),
566			to: AccountId::from_h256(H256::from_low_u64_be(2)),
567			amount: 5,
568			nonce: 0,
569		});
570
571		// when
572		pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
573		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()));
574		assert_eq!(pool.validated_pool().status().ready, 0);
575		assert_eq!(pool.validated_pool().status().future, 0);
576
577		// then
578		assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
579	}
580
581	#[test]
582	fn should_reject_unactionable_transactions() {
583		// given
584		let api = Arc::new(TestApi::default());
585		let pool = Pool::new(
586			Default::default(),
587			// the node does not author blocks
588			false.into(),
589			api.clone(),
590		);
591
592		// after validation `IncludeData` will be set to non-propagable (validate_transaction mock)
593		let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
594
595		// when
596		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()));
597
598		// then
599		assert_matches!(res.unwrap_err(), error::Error::Unactionable);
600	}
601
602	#[test]
603	fn should_notify_about_pool_events() {
604		let (stream, hash0, hash1) = {
605			// given
606			let (pool, api) = pool();
607			let han_of_block0 = api.expect_hash_and_number(0);
608			let stream = pool.validated_pool().import_notification_stream();
609
610			// when
611			let hash0 = block_on(
612				pool.submit_one(
613					&han_of_block0,
614					SOURCE,
615					uxt(Transfer {
616						from: Alice.into(),
617						to: AccountId::from_h256(H256::from_low_u64_be(2)),
618						amount: 5,
619						nonce: 0,
620					})
621					.into(),
622				),
623			)
624			.unwrap();
625			let hash1 = block_on(
626				pool.submit_one(
627					&han_of_block0,
628					SOURCE,
629					uxt(Transfer {
630						from: Alice.into(),
631						to: AccountId::from_h256(H256::from_low_u64_be(2)),
632						amount: 5,
633						nonce: 1,
634					})
635					.into(),
636				),
637			)
638			.unwrap();
639			// future doesn't count
640			let _hash = block_on(
641				pool.submit_one(
642					&han_of_block0,
643					SOURCE,
644					uxt(Transfer {
645						from: Alice.into(),
646						to: AccountId::from_h256(H256::from_low_u64_be(2)),
647						amount: 5,
648						nonce: 3,
649					})
650					.into(),
651				),
652			)
653			.unwrap();
654
655			assert_eq!(pool.validated_pool().status().ready, 2);
656			assert_eq!(pool.validated_pool().status().future, 1);
657
658			(stream, hash0, hash1)
659		};
660
661		// then
662		let mut it = futures::executor::block_on_stream(stream);
663		assert_eq!(it.next(), Some(hash0));
664		assert_eq!(it.next(), Some(hash1));
665		assert_eq!(it.next(), None);
666	}
667
668	#[test]
669	fn should_clear_stale_transactions() {
670		// given
671		let (pool, api) = pool();
672		let han_of_block0 = api.expect_hash_and_number(0);
673		let hash1 = block_on(
674			pool.submit_one(
675				&han_of_block0,
676				SOURCE,
677				uxt(Transfer {
678					from: Alice.into(),
679					to: AccountId::from_h256(H256::from_low_u64_be(2)),
680					amount: 5,
681					nonce: 0,
682				})
683				.into(),
684			),
685		)
686		.unwrap();
687		let hash2 = block_on(
688			pool.submit_one(
689				&han_of_block0,
690				SOURCE,
691				uxt(Transfer {
692					from: Alice.into(),
693					to: AccountId::from_h256(H256::from_low_u64_be(2)),
694					amount: 5,
695					nonce: 1,
696				})
697				.into(),
698			),
699		)
700		.unwrap();
701		let hash3 = block_on(
702			pool.submit_one(
703				&han_of_block0,
704				SOURCE,
705				uxt(Transfer {
706					from: Alice.into(),
707					to: AccountId::from_h256(H256::from_low_u64_be(2)),
708					amount: 5,
709					nonce: 3,
710				})
711				.into(),
712			),
713		)
714		.unwrap();
715
716		// when
717		pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
718
719		// then
720		assert_eq!(pool.validated_pool().ready().count(), 0);
721		assert_eq!(pool.validated_pool().status().future, 0);
722		assert_eq!(pool.validated_pool().status().ready, 0);
723		// make sure they are temporarily banned as well
724		assert!(pool.validated_pool.is_banned(&hash1));
725		assert!(pool.validated_pool.is_banned(&hash2));
726		assert!(pool.validated_pool.is_banned(&hash3));
727	}
728
729	#[test]
730	fn should_ban_mined_transactions() {
731		// given
732		let (pool, api) = pool();
733		let hash1 = block_on(
734			pool.submit_one(
735				&api.expect_hash_and_number(0),
736				SOURCE,
737				uxt(Transfer {
738					from: Alice.into(),
739					to: AccountId::from_h256(H256::from_low_u64_be(2)),
740					amount: 5,
741					nonce: 0,
742				})
743				.into(),
744			),
745		)
746		.unwrap();
747
748		// when
749		block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
750
751		// then
752		assert!(pool.validated_pool.is_banned(&hash1));
753	}
754
755	#[test]
756	fn should_limit_futures() {
757		sp_tracing::try_init_simple();
758
759		let xt = uxt(Transfer {
760			from: Alice.into(),
761			to: AccountId::from_h256(H256::from_low_u64_be(2)),
762			amount: 5,
763			nonce: 1,
764		});
765
766		// given
767		let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
768
769		let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
770
771		let api = Arc::new(TestApi::default());
772		let pool = Pool::new(options, true.into(), api.clone());
773
774		let hash1 =
775			block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into())).unwrap();
776		assert_eq!(pool.validated_pool().status().future, 1);
777
778		// when
779		let hash2 = block_on(
780			pool.submit_one(
781				&api.expect_hash_and_number(0),
782				SOURCE,
783				uxt(Transfer {
784					from: Bob.into(),
785					to: AccountId::from_h256(H256::from_low_u64_be(2)),
786					amount: 5,
787					nonce: 10,
788				})
789				.into(),
790			),
791		)
792		.unwrap();
793
794		// then
795		assert_eq!(pool.validated_pool().status().future, 1);
796		assert!(pool.validated_pool.is_banned(&hash1));
797		assert!(!pool.validated_pool.is_banned(&hash2));
798	}
799
800	#[test]
801	fn should_error_if_reject_immediately() {
802		// given
803		let limit = Limit { count: 100, total_bytes: 10 };
804
805		let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
806
807		let api = Arc::new(TestApi::default());
808		let pool = Pool::new(options, true.into(), api.clone());
809
810		// when
811		block_on(
812			pool.submit_one(
813				&api.expect_hash_and_number(0),
814				SOURCE,
815				uxt(Transfer {
816					from: Alice.into(),
817					to: AccountId::from_h256(H256::from_low_u64_be(2)),
818					amount: 5,
819					nonce: 1,
820				})
821				.into(),
822			),
823		)
824		.unwrap_err();
825
826		// then
827		assert_eq!(pool.validated_pool().status().ready, 0);
828		assert_eq!(pool.validated_pool().status().future, 0);
829	}
830
831	#[test]
832	fn should_reject_transactions_with_no_provides() {
833		// given
834		let (pool, api) = pool();
835
836		// when
837		let err = block_on(
838			pool.submit_one(
839				&api.expect_hash_and_number(0),
840				SOURCE,
841				uxt(Transfer {
842					from: Alice.into(),
843					to: AccountId::from_h256(H256::from_low_u64_be(2)),
844					amount: 5,
845					nonce: INVALID_NONCE,
846				})
847				.into(),
848			),
849		)
850		.unwrap_err();
851
852		// then
853		assert_eq!(pool.validated_pool().status().ready, 0);
854		assert_eq!(pool.validated_pool().status().future, 0);
855		assert_matches!(err, error::Error::NoTagsProvided);
856	}
857
858	mod listener {
859		use super::*;
860
861		#[test]
862		fn should_trigger_ready_and_finalized() {
863			// given
864			let (pool, api) = pool();
865			let watcher = block_on(
866				pool.submit_and_watch(
867					&api.expect_hash_and_number(0),
868					SOURCE,
869					uxt(Transfer {
870						from: Alice.into(),
871						to: AccountId::from_h256(H256::from_low_u64_be(2)),
872						amount: 5,
873						nonce: 0,
874					})
875					.into(),
876				),
877			)
878			.unwrap();
879			assert_eq!(pool.validated_pool().status().ready, 1);
880			assert_eq!(pool.validated_pool().status().future, 0);
881
882			let han_of_block2 = api.expect_hash_and_number(2);
883
884			// when
885			block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![]));
886			assert_eq!(pool.validated_pool().status().ready, 0);
887			assert_eq!(pool.validated_pool().status().future, 0);
888
889			// then
890			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
891			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
892			assert_eq!(
893				stream.next(),
894				Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
895			);
896		}
897
898		#[test]
899		fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
900			// given
901			let (pool, api) = pool();
902			let watcher = block_on(
903				pool.submit_and_watch(
904					&api.expect_hash_and_number(0),
905					SOURCE,
906					uxt(Transfer {
907						from: Alice.into(),
908						to: AccountId::from_h256(H256::from_low_u64_be(2)),
909						amount: 5,
910						nonce: 0,
911					})
912					.into(),
913				),
914			)
915			.unwrap();
916			assert_eq!(pool.validated_pool().status().ready, 1);
917			assert_eq!(pool.validated_pool().status().future, 0);
918
919			let han_of_block2 = api.expect_hash_and_number(2);
920
921			// when
922			block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![*watcher.hash()]));
923			assert_eq!(pool.validated_pool().status().ready, 0);
924			assert_eq!(pool.validated_pool().status().future, 0);
925
926			// then
927			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
928			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
929			assert_eq!(
930				stream.next(),
931				Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
932			);
933		}
934
935		#[test]
936		fn should_trigger_future_and_ready_after_promoted() {
937			// given
938			let (pool, api) = pool();
939			let han_of_block0 = api.expect_hash_and_number(0);
940
941			let watcher = block_on(
942				pool.submit_and_watch(
943					&han_of_block0,
944					SOURCE,
945					uxt(Transfer {
946						from: Alice.into(),
947						to: AccountId::from_h256(H256::from_low_u64_be(2)),
948						amount: 5,
949						nonce: 1,
950					})
951					.into(),
952				),
953			)
954			.unwrap();
955			assert_eq!(pool.validated_pool().status().ready, 0);
956			assert_eq!(pool.validated_pool().status().future, 1);
957
958			// when
959			block_on(
960				pool.submit_one(
961					&han_of_block0,
962					SOURCE,
963					uxt(Transfer {
964						from: Alice.into(),
965						to: AccountId::from_h256(H256::from_low_u64_be(2)),
966						amount: 5,
967						nonce: 0,
968					})
969					.into(),
970				),
971			)
972			.unwrap();
973			assert_eq!(pool.validated_pool().status().ready, 2);
974
975			// then
976			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
977			assert_eq!(stream.next(), Some(TransactionStatus::Future));
978			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
979		}
980
981		#[test]
982		fn should_trigger_invalid_and_ban() {
983			// given
984			let (pool, api) = pool();
985			let uxt = uxt(Transfer {
986				from: Alice.into(),
987				to: AccountId::from_h256(H256::from_low_u64_be(2)),
988				amount: 5,
989				nonce: 0,
990			});
991			let watcher =
992				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
993					.unwrap();
994			assert_eq!(pool.validated_pool().status().ready, 1);
995
996			// when
997			pool.validated_pool.remove_invalid(&[*watcher.hash()]);
998
999			// then
1000			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1001			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1002			assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
1003			assert_eq!(stream.next(), None);
1004		}
1005
1006		#[test]
1007		fn should_trigger_broadcasted() {
1008			// given
1009			let (pool, api) = pool();
1010			let uxt = uxt(Transfer {
1011				from: Alice.into(),
1012				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1013				amount: 5,
1014				nonce: 0,
1015			});
1016			let watcher =
1017				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1018					.unwrap();
1019			assert_eq!(pool.validated_pool().status().ready, 1);
1020
1021			// when
1022			let mut map = HashMap::new();
1023			let peers = vec!["a".into(), "b".into(), "c".into()];
1024			map.insert(*watcher.hash(), peers.clone());
1025			pool.validated_pool().on_broadcasted(map);
1026
1027			// then
1028			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1029			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1030			assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
1031		}
1032
1033		#[test]
1034		fn should_trigger_dropped_older() {
1035			// given
1036			let limit = Limit { count: 1, total_bytes: 1000 };
1037			let options =
1038				Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1039
1040			let api = Arc::new(TestApi::default());
1041			let pool = Pool::new(options, true.into(), api.clone());
1042
1043			let xt = uxt(Transfer {
1044				from: Alice.into(),
1045				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1046				amount: 5,
1047				nonce: 0,
1048			});
1049			let watcher =
1050				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1051					.unwrap();
1052			assert_eq!(pool.validated_pool().status().ready, 1);
1053
1054			// when
1055			let xt = uxt(Transfer {
1056				from: Bob.into(),
1057				to: AccountId::from_h256(H256::from_low_u64_be(1)),
1058				amount: 4,
1059				nonce: 1,
1060			});
1061			block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into())).unwrap();
1062			assert_eq!(pool.validated_pool().status().ready, 1);
1063
1064			// then
1065			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1066			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1067			assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1068		}
1069
1070		#[test]
1071		fn should_trigger_dropped_lower_priority() {
1072			{
1073				// given
1074				let limit = Limit { count: 1, total_bytes: 1000 };
1075				let options =
1076					Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1077
1078				let api = Arc::new(TestApi::default());
1079				let pool = Pool::new(options, true.into(), api.clone());
1080
1081				// after validation `IncludeData` will have priority set to 9001
1082				// (validate_transaction mock)
1083				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1084				block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1085					.unwrap();
1086				assert_eq!(pool.validated_pool().status().ready, 1);
1087
1088				// then
1089				// after validation `Transfer` will have priority set to 4 (validate_transaction
1090				// mock)
1091				let xt = uxt(Transfer {
1092					from: Bob.into(),
1093					to: AccountId::from_h256(H256::from_low_u64_be(1)),
1094					amount: 4,
1095					nonce: 1,
1096				});
1097				let result =
1098					block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()));
1099				assert!(matches!(
1100					result,
1101					Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
1102				));
1103			}
1104			{
1105				// given
1106				let limit = Limit { count: 2, total_bytes: 1000 };
1107				let options =
1108					Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1109
1110				let api = Arc::new(TestApi::default());
1111				let pool = Pool::new(options, true.into(), api.clone());
1112
1113				let han_of_block0 = api.expect_hash_and_number(0);
1114
1115				// after validation `IncludeData` will have priority set to 9001
1116				// (validate_transaction mock)
1117				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1118				block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap();
1119				assert_eq!(pool.validated_pool().status().ready, 1);
1120
1121				// after validation `Transfer` will have priority set to 4 (validate_transaction
1122				// mock)
1123				let xt = uxt(Transfer {
1124					from: Alice.into(),
1125					to: AccountId::from_h256(H256::from_low_u64_be(2)),
1126					amount: 5,
1127					nonce: 0,
1128				});
1129				let watcher =
1130					block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap();
1131				assert_eq!(pool.validated_pool().status().ready, 2);
1132
1133				// when
1134				// after validation `Store` will have priority set to 9001 (validate_transaction
1135				// mock)
1136				let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
1137				block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()))
1138					.unwrap();
1139				assert_eq!(pool.validated_pool().status().ready, 2);
1140
1141				// then
1142				let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1143				assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1144				assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1145			}
1146		}
1147
1148		#[test]
1149		fn should_handle_pruning_in_the_middle_of_import() {
1150			// given
1151			let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
1152			let (tx, rx) = std::sync::mpsc::sync_channel(1);
1153			let mut api = TestApi::default();
1154			api.delay = Arc::new(Mutex::new(rx.into()));
1155			let api = Arc::new(api);
1156			let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
1157
1158			let han_of_block0 = api.expect_hash_and_number(0);
1159
1160			// when
1161			let xt = uxt(Transfer {
1162				from: Alice.into(),
1163				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1164				amount: 5,
1165				nonce: 1,
1166			});
1167
1168			// This transaction should go to future, since we use `nonce: 1`
1169			let pool2 = pool.clone();
1170			std::thread::spawn({
1171				let hash_of_block0 = han_of_block0.clone();
1172				move || {
1173					block_on(pool2.submit_one(&hash_of_block0, SOURCE, xt.into())).unwrap();
1174					ready.send(()).unwrap();
1175				}
1176			});
1177
1178			// But now before the previous one is imported we import
1179			// the one that it depends on.
1180			let xt = uxt(Transfer {
1181				from: Alice.into(),
1182				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1183				amount: 4,
1184				nonce: 0,
1185			});
1186			// The tag the above transaction provides (TestApi is using just nonce as u8)
1187			let provides = vec![0_u8];
1188			block_on(pool.submit_one(&han_of_block0, SOURCE, xt.into())).unwrap();
1189			assert_eq!(pool.validated_pool().status().ready, 1);
1190
1191			// Now block import happens before the second transaction is able to finish
1192			// verification.
1193			block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![provides], vec![]));
1194			assert_eq!(pool.validated_pool().status().ready, 0);
1195
1196			// so when we release the verification of the previous one it will have
1197			// something in `requires`, but should go to ready directly, since the previous
1198			// transaction was imported correctly.
1199			tx.send(()).unwrap();
1200
1201			// then
1202			is_ready.recv().unwrap(); // wait for finish
1203			assert_eq!(pool.validated_pool().status().ready, 1);
1204			assert_eq!(pool.validated_pool().status().future, 0);
1205		}
1206	}
1207}