sc_transaction_pool/graph/
base_pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! A basic version of the dependency graph.
20//!
21//! For a more full-featured pool, have a look at the `pool` module.
22
23use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc};
24
25use crate::LOG_TARGET;
26use log::{trace, warn};
27use sc_transaction_pool_api::{error, InPoolTransaction, PoolStatus};
28use serde::Serialize;
29use sp_core::hexdisplay::HexDisplay;
30use sp_runtime::{
31	traits::Member,
32	transaction_validity::{
33		TransactionLongevity as Longevity, TransactionPriority as Priority,
34		TransactionSource as Source, TransactionTag as Tag,
35	},
36};
37
38use super::{
39	future::{FutureTransactions, WaitingTransaction},
40	ready::{BestIterator, ReadyTransactions, TransactionRef},
41};
42
43/// Successful import result.
44#[derive(Debug, PartialEq, Eq)]
45pub enum Imported<Hash, Ex> {
46	/// Transaction was successfully imported to Ready queue.
47	Ready {
48		/// Hash of transaction that was successfully imported.
49		hash: Hash,
50		/// Transactions that got promoted from the Future queue.
51		promoted: Vec<Hash>,
52		/// Transactions that failed to be promoted from the Future queue and are now discarded.
53		failed: Vec<Hash>,
54		/// Transactions removed from the Ready pool (replaced).
55		removed: Vec<Arc<Transaction<Hash, Ex>>>,
56	},
57	/// Transaction was successfully imported to Future queue.
58	Future {
59		/// Hash of transaction that was successfully imported.
60		hash: Hash,
61	},
62}
63
64impl<Hash, Ex> Imported<Hash, Ex> {
65	/// Returns the hash of imported transaction.
66	pub fn hash(&self) -> &Hash {
67		use self::Imported::*;
68		match *self {
69			Ready { ref hash, .. } => hash,
70			Future { ref hash, .. } => hash,
71		}
72	}
73}
74
75/// Status of pruning the queue.
76#[derive(Debug)]
77pub struct PruneStatus<Hash, Ex> {
78	/// A list of imports that satisfying the tag triggered.
79	pub promoted: Vec<Imported<Hash, Ex>>,
80	/// A list of transactions that failed to be promoted and now are discarded.
81	pub failed: Vec<Hash>,
82	/// A list of transactions that got pruned from the ready queue.
83	pub pruned: Vec<Arc<Transaction<Hash, Ex>>>,
84}
85
86/// Immutable transaction
87#[derive(PartialEq, Eq, Clone)]
88pub struct Transaction<Hash, Extrinsic> {
89	/// Raw extrinsic representing that transaction.
90	pub data: Extrinsic,
91	/// Number of bytes encoding of the transaction requires.
92	pub bytes: usize,
93	/// Transaction hash (unique)
94	pub hash: Hash,
95	/// Transaction priority (higher = better)
96	pub priority: Priority,
97	/// At which block the transaction becomes invalid?
98	pub valid_till: Longevity,
99	/// Tags required by the transaction.
100	pub requires: Vec<Tag>,
101	/// Tags that this transaction provides.
102	pub provides: Vec<Tag>,
103	/// Should that transaction be propagated.
104	pub propagate: bool,
105	/// Source of that transaction.
106	pub source: Source,
107}
108
109impl<Hash, Extrinsic> AsRef<Extrinsic> for Transaction<Hash, Extrinsic> {
110	fn as_ref(&self) -> &Extrinsic {
111		&self.data
112	}
113}
114
115impl<Hash, Extrinsic> InPoolTransaction for Transaction<Hash, Extrinsic> {
116	type Transaction = Extrinsic;
117	type Hash = Hash;
118
119	fn data(&self) -> &Extrinsic {
120		&self.data
121	}
122
123	fn hash(&self) -> &Hash {
124		&self.hash
125	}
126
127	fn priority(&self) -> &Priority {
128		&self.priority
129	}
130
131	fn longevity(&self) -> &Longevity {
132		&self.valid_till
133	}
134
135	fn requires(&self) -> &[Tag] {
136		&self.requires
137	}
138
139	fn provides(&self) -> &[Tag] {
140		&self.provides
141	}
142
143	fn is_propagable(&self) -> bool {
144		self.propagate
145	}
146}
147
148impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
149	/// Explicit transaction clone.
150	///
151	/// Transaction should be cloned only if absolutely necessary && we want
152	/// every reason to be commented. That's why we `Transaction` is not `Clone`,
153	/// but there's explicit `duplicate` method.
154	pub fn duplicate(&self) -> Self {
155		Self {
156			data: self.data.clone(),
157			bytes: self.bytes,
158			hash: self.hash.clone(),
159			priority: self.priority,
160			source: self.source,
161			valid_till: self.valid_till,
162			requires: self.requires.clone(),
163			provides: self.provides.clone(),
164			propagate: self.propagate,
165		}
166	}
167}
168
169impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic>
170where
171	Hash: fmt::Debug,
172	Extrinsic: fmt::Debug,
173{
174	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
175		let join_tags = |tags: &[Tag]| {
176			tags.iter()
177				.map(|tag| HexDisplay::from(tag).to_string())
178				.collect::<Vec<_>>()
179				.join(", ")
180		};
181
182		write!(fmt, "Transaction {{ ")?;
183		write!(fmt, "hash: {:?}, ", &self.hash)?;
184		write!(fmt, "priority: {:?}, ", &self.priority)?;
185		write!(fmt, "valid_till: {:?}, ", &self.valid_till)?;
186		write!(fmt, "bytes: {:?}, ", &self.bytes)?;
187		write!(fmt, "propagate: {:?}, ", &self.propagate)?;
188		write!(fmt, "source: {:?}, ", &self.source)?;
189		write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
190		write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
191		write!(fmt, "data: {:?}", &self.data)?;
192		write!(fmt, "}}")?;
193		Ok(())
194	}
195}
196
197/// Store last pruned tags for given number of invocations.
198const RECENTLY_PRUNED_TAGS: usize = 2;
199
200/// Transaction pool.
201///
202/// Builds a dependency graph for all transactions in the pool and returns
203/// the ones that are currently ready to be executed.
204///
205/// General note:
206/// If function returns some transactions it usually means that importing them
207/// as-is for the second time will fail or produce unwanted results.
208/// Most likely it is required to revalidate them and recompute set of
209/// required tags.
210#[derive(Clone, Debug)]
211pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
212	reject_future_transactions: bool,
213	future: FutureTransactions<Hash, Ex>,
214	ready: ReadyTransactions<Hash, Ex>,
215	/// Store recently pruned tags (for last two invocations).
216	///
217	/// This is used to make sure we don't accidentally put
218	/// transactions to future in case they were just stuck in verification.
219	recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
220	recently_pruned_index: usize,
221}
222
223impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for BasePool<Hash, Ex> {
224	fn default() -> Self {
225		Self::new(false)
226	}
227}
228
229impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
230	/// Create new pool given reject_future_transactions flag.
231	pub fn new(reject_future_transactions: bool) -> Self {
232		Self {
233			reject_future_transactions,
234			future: Default::default(),
235			ready: Default::default(),
236			recently_pruned: Default::default(),
237			recently_pruned_index: 0,
238		}
239	}
240
241	/// Clears buffer keeping recently pruned transaction.
242	pub fn clear_recently_pruned(&mut self) {
243		self.recently_pruned = Default::default();
244		self.recently_pruned_index = 0;
245	}
246
247	/// Temporary enables future transactions, runs closure and then restores
248	/// `reject_future_transactions` flag back to previous value.
249	///
250	/// The closure accepts the mutable reference to the pool and original value
251	/// of the `reject_future_transactions` flag.
252	pub(crate) fn with_futures_enabled<T>(
253		&mut self,
254		closure: impl FnOnce(&mut Self, bool) -> T,
255	) -> T {
256		let previous = self.reject_future_transactions;
257		self.reject_future_transactions = false;
258		let return_value = closure(self, previous);
259		self.reject_future_transactions = previous;
260		return_value
261	}
262
263	/// Returns if the transaction for the given hash is already imported.
264	pub fn is_imported(&self, tx_hash: &Hash) -> bool {
265		self.future.contains(tx_hash) || self.ready.contains(tx_hash)
266	}
267
268	/// Imports transaction to the pool.
269	///
270	/// The pool consists of two parts: Future and Ready.
271	/// The former contains transactions that require some tags that are not yet provided by
272	/// other transactions in the pool.
273	/// The latter contains transactions that have all the requirements satisfied and are
274	/// ready to be included in the block.
275	pub fn import(&mut self, tx: Transaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
276		if self.is_imported(&tx.hash) {
277			return Err(error::Error::AlreadyImported(Box::new(tx.hash)))
278		}
279
280		let tx = WaitingTransaction::new(tx, self.ready.provided_tags(), &self.recently_pruned);
281		trace!(
282			target: LOG_TARGET,
283			"[{:?}] Importing {:?} to {}",
284			tx.transaction.hash,
285			tx,
286			if tx.is_ready() { "ready" } else { "future" }
287		);
288
289		// If all tags are not satisfied import to future.
290		if !tx.is_ready() {
291			if self.reject_future_transactions {
292				return Err(error::Error::RejectedFutureTransaction)
293			}
294
295			let hash = tx.transaction.hash.clone();
296			self.future.import(tx);
297			return Ok(Imported::Future { hash })
298		}
299
300		self.import_to_ready(tx)
301	}
302
303	/// Imports transaction to ready queue.
304	///
305	/// NOTE the transaction has to have all requirements satisfied.
306	fn import_to_ready(
307		&mut self,
308		tx: WaitingTransaction<Hash, Ex>,
309	) -> error::Result<Imported<Hash, Ex>> {
310		let hash = tx.transaction.hash.clone();
311		let mut promoted = vec![];
312		let mut failed = vec![];
313		let mut removed = vec![];
314
315		let mut first = true;
316		let mut to_import = vec![tx];
317
318		// take first transaction from the list
319		while let Some(tx) = to_import.pop() {
320			// find transactions in Future that it unlocks
321			to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
322
323			// import this transaction
324			let current_hash = tx.transaction.hash.clone();
325			match self.ready.import(tx) {
326				Ok(mut replaced) => {
327					if !first {
328						promoted.push(current_hash);
329					}
330					// The transactions were removed from the ready pool. We might attempt to
331					// re-import them.
332					removed.append(&mut replaced);
333				},
334				// transaction failed to be imported.
335				Err(e) =>
336					if first {
337						trace!(target: LOG_TARGET, "[{:?}] Error importing: {:?}", current_hash, e);
338						return Err(e)
339					} else {
340						failed.push(current_hash);
341					},
342			}
343			first = false;
344		}
345
346		// An edge case when importing transaction caused
347		// some future transactions to be imported and that
348		// future transactions pushed out current transaction.
349		// This means that there is a cycle and the transactions should
350		// be moved back to future, since we can't resolve it.
351		if removed.iter().any(|tx| tx.hash == hash) {
352			// We still need to remove all transactions that we promoted
353			// since they depend on each other and will never get to the best iterator.
354			self.ready.remove_subtree(&promoted);
355
356			trace!(target: LOG_TARGET, "[{:?}] Cycle detected, bailing.", hash);
357			return Err(error::Error::CycleDetected)
358		}
359
360		Ok(Imported::Ready { hash, promoted, failed, removed })
361	}
362
363	/// Returns an iterator over ready transactions in the pool.
364	pub fn ready(&self) -> BestIterator<Hash, Ex> {
365		self.ready.get()
366	}
367
368	/// Returns an iterator over future transactions in the pool.
369	pub fn futures(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
370		self.future.all()
371	}
372
373	/// Returns pool transactions given list of hashes.
374	///
375	/// Includes both ready and future pool. For every hash in the `hashes`
376	/// iterator an `Option` is produced (so the resulting `Vec` always have the same length).
377	pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
378		let ready = self.ready.by_hashes(hashes);
379		let future = self.future.by_hashes(hashes);
380
381		ready.into_iter().zip(future).map(|(a, b)| a.or(b)).collect()
382	}
383
384	/// Returns pool transaction by hash.
385	pub fn ready_by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
386		self.ready.by_hash(hash)
387	}
388
389	/// Makes sure that the transactions in the queues stay within provided limits.
390	///
391	/// Removes and returns worst transactions from the queues and all transactions that depend on
392	/// them. Technically the worst transaction should be evaluated by computing the entire pending
393	/// set. We use a simplified approach to remove transactions with the lowest priority first or
394	/// those that occupy the pool for the longest time in case priority is the same.
395	pub fn enforce_limits(
396		&mut self,
397		ready: &Limit,
398		future: &Limit,
399	) -> Vec<Arc<Transaction<Hash, Ex>>> {
400		let mut removed = vec![];
401
402		while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
403			// find the worst transaction
404			let worst = self.ready.fold::<TransactionRef<Hash, Ex>, _>(|worst, current| {
405				let transaction = &current.transaction;
406				worst
407					.map(|worst| {
408						// Here we don't use `TransactionRef`'s ordering implementation because
409						// while it prefers priority like need here, it also prefers older
410						// transactions for inclusion purposes and limit enforcement needs to prefer
411						// newer transactions instead and drop the older ones.
412						match worst.transaction.priority.cmp(&transaction.transaction.priority) {
413							Ordering::Less => worst,
414							Ordering::Equal =>
415								if worst.insertion_id > transaction.insertion_id {
416									transaction.clone()
417								} else {
418									worst
419								},
420							Ordering::Greater => transaction.clone(),
421						}
422					})
423					.or_else(|| Some(transaction.clone()))
424			});
425
426			if let Some(worst) = worst {
427				removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
428			} else {
429				break
430			}
431		}
432
433		while future.is_exceeded(self.future.len(), self.future.bytes()) {
434			// find the worst transaction
435			let worst = self.future.fold(|worst, current| match worst {
436				None => Some(current.clone()),
437				Some(ref tx) if tx.imported_at > current.imported_at => Some(current.clone()),
438				other => other,
439			});
440
441			if let Some(worst) = worst {
442				removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
443			} else {
444				break
445			}
446		}
447
448		removed
449	}
450
451	/// Removes all transactions represented by the hashes and all other transactions
452	/// that depend on them.
453	///
454	/// Returns a list of actually removed transactions.
455	/// NOTE some transactions might still be valid, but were just removed because
456	/// they were part of a chain, you may attempt to re-import them later.
457	/// NOTE If you want to remove ready transactions that were already used,
458	/// and you don't want them to be stored in the pool use `prune_tags` method.
459	pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
460		let mut removed = self.ready.remove_subtree(hashes);
461		removed.extend(self.future.remove(hashes));
462		removed
463	}
464
465	/// Removes and returns all transactions from the future queue.
466	pub fn clear_future(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
467		self.future.clear()
468	}
469
470	/// Prunes transactions that provide given list of tags.
471	///
472	/// This will cause all transactions (both ready and future) that provide these tags to be
473	/// removed from the pool, but unlike `remove_subtree`, dependent transactions are not touched.
474	/// Additional transactions from future queue might be promoted to ready if you satisfy tags
475	/// that the pool didn't previously know about.
476	pub fn prune_tags(&mut self, tags: impl IntoIterator<Item = Tag>) -> PruneStatus<Hash, Ex> {
477		let mut to_import = vec![];
478		let mut pruned = vec![];
479		let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index];
480		self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS;
481		recently_pruned.clear();
482
483		let tags = tags.into_iter().collect::<Vec<_>>();
484		let futures_removed = self.future.prune_tags(&tags);
485
486		for tag in tags {
487			// make sure to promote any future transactions that could be unlocked
488			to_import.append(&mut self.future.satisfy_tags(std::iter::once(&tag)));
489			// and actually prune transactions in ready queue
490			pruned.append(&mut self.ready.prune_tags(tag.clone()));
491			// store the tags for next submission
492			recently_pruned.insert(tag);
493		}
494
495		let mut promoted = vec![];
496		let mut failed = vec![];
497		for tx in futures_removed {
498			failed.push(tx.hash.clone());
499		}
500
501		for tx in to_import {
502			let hash = tx.transaction.hash.clone();
503			match self.import_to_ready(tx) {
504				Ok(res) => promoted.push(res),
505				Err(e) => {
506					warn!(
507						target: LOG_TARGET,
508						"[{:?}] Failed to promote during pruning: {:?}", hash, e,
509					);
510					failed.push(hash)
511				},
512			}
513		}
514
515		PruneStatus { pruned, failed, promoted }
516	}
517
518	/// Get pool status.
519	pub fn status(&self) -> PoolStatus {
520		PoolStatus {
521			ready: self.ready.len(),
522			ready_bytes: self.ready.bytes(),
523			future: self.future.len(),
524			future_bytes: self.future.bytes(),
525		}
526	}
527}
528
529/// Queue limits
530#[derive(Debug, Clone)]
531pub struct Limit {
532	/// Maximal number of transactions in the queue.
533	pub count: usize,
534	/// Maximal size of encodings of all transactions in the queue.
535	pub total_bytes: usize,
536}
537
538impl Limit {
539	/// Returns true if any of the provided values exceeds the limit.
540	pub fn is_exceeded(&self, count: usize, bytes: usize) -> bool {
541		self.count < count || self.total_bytes < bytes
542	}
543}
544
545#[cfg(test)]
546mod tests {
547	use super::*;
548
549	type Hash = u64;
550
551	fn pool() -> BasePool<Hash, Vec<u8>> {
552		BasePool::default()
553	}
554
555	fn default_tx() -> Transaction<Hash, Vec<u8>> {
556		Transaction {
557			data: vec![],
558			bytes: 1,
559			hash: 1u64,
560			priority: 5u64,
561			valid_till: 64u64,
562			requires: vec![],
563			provides: vec![],
564			propagate: true,
565			source: Source::External,
566		}
567	}
568
569	#[test]
570	fn prune_for_ready_works() {
571		// given
572		let mut pool = pool();
573
574		// when
575		pool.import(Transaction {
576			data: vec![1u8].into(),
577			provides: vec![vec![2]],
578			..default_tx().clone()
579		})
580		.unwrap();
581
582		// then
583		assert_eq!(pool.ready().count(), 1);
584		assert_eq!(pool.ready.len(), 1);
585
586		let result = pool.prune_tags(vec![vec![2]]);
587		assert_eq!(pool.ready().count(), 0);
588		assert_eq!(pool.ready.len(), 0);
589		assert_eq!(result.pruned.len(), 1);
590		assert_eq!(result.failed.len(), 0);
591		assert_eq!(result.promoted.len(), 0);
592	}
593
594	#[test]
595	fn prune_for_future_works() {
596		// given
597		let mut pool = pool();
598
599		// when
600		pool.import(Transaction {
601			data: vec![1u8].into(),
602			requires: vec![vec![1]],
603			provides: vec![vec![2]],
604			hash: 0xaa,
605			..default_tx().clone()
606		})
607		.unwrap();
608
609		// then
610		assert_eq!(pool.futures().count(), 1);
611		assert_eq!(pool.future.len(), 1);
612
613		let result = pool.prune_tags(vec![vec![2]]);
614		assert_eq!(pool.ready().count(), 0);
615		assert_eq!(pool.ready.len(), 0);
616		assert_eq!(pool.futures().count(), 0);
617		assert_eq!(pool.future.len(), 0);
618
619		assert_eq!(result.pruned.len(), 0);
620		assert_eq!(result.failed.len(), 1);
621		assert_eq!(result.failed[0], 0xaa);
622		assert_eq!(result.promoted.len(), 0);
623	}
624
625	#[test]
626	fn should_import_transaction_to_ready() {
627		// given
628		let mut pool = pool();
629
630		// when
631		pool.import(Transaction {
632			data: vec![1u8].into(),
633			provides: vec![vec![1]],
634			..default_tx().clone()
635		})
636		.unwrap();
637
638		// then
639		assert_eq!(pool.ready().count(), 1);
640		assert_eq!(pool.ready.len(), 1);
641	}
642
643	#[test]
644	fn should_not_import_same_transaction_twice() {
645		// given
646		let mut pool = pool();
647
648		// when
649		pool.import(Transaction {
650			data: vec![1u8].into(),
651			provides: vec![vec![1]],
652			..default_tx().clone()
653		})
654		.unwrap();
655		pool.import(Transaction {
656			data: vec![1u8].into(),
657			provides: vec![vec![1]],
658			..default_tx().clone()
659		})
660		.unwrap_err();
661
662		// then
663		assert_eq!(pool.ready().count(), 1);
664		assert_eq!(pool.ready.len(), 1);
665	}
666
667	#[test]
668	fn should_import_transaction_to_future_and_promote_it_later() {
669		// given
670		let mut pool = pool();
671
672		// when
673		pool.import(Transaction {
674			data: vec![1u8].into(),
675			requires: vec![vec![0]],
676			provides: vec![vec![1]],
677			..default_tx().clone()
678		})
679		.unwrap();
680		assert_eq!(pool.ready().count(), 0);
681		assert_eq!(pool.ready.len(), 0);
682		pool.import(Transaction {
683			data: vec![2u8].into(),
684			hash: 2,
685			provides: vec![vec![0]],
686			..default_tx().clone()
687		})
688		.unwrap();
689
690		// then
691		assert_eq!(pool.ready().count(), 2);
692		assert_eq!(pool.ready.len(), 2);
693	}
694
695	#[test]
696	fn should_promote_a_subgraph() {
697		// given
698		let mut pool = pool();
699
700		// when
701		pool.import(Transaction {
702			data: vec![1u8].into(),
703			requires: vec![vec![0]],
704			provides: vec![vec![1]],
705			..default_tx().clone()
706		})
707		.unwrap();
708		pool.import(Transaction {
709			data: vec![3u8].into(),
710			hash: 3,
711			requires: vec![vec![2]],
712			..default_tx().clone()
713		})
714		.unwrap();
715		pool.import(Transaction {
716			data: vec![2u8].into(),
717			hash: 2,
718			requires: vec![vec![1]],
719			provides: vec![vec![3], vec![2]],
720			..default_tx().clone()
721		})
722		.unwrap();
723		pool.import(Transaction {
724			data: vec![4u8].into(),
725			hash: 4,
726			priority: 1_000u64,
727			requires: vec![vec![3], vec![4]],
728			..default_tx().clone()
729		})
730		.unwrap();
731		assert_eq!(pool.ready().count(), 0);
732		assert_eq!(pool.ready.len(), 0);
733
734		let res = pool
735			.import(Transaction {
736				data: vec![5u8].into(),
737				hash: 5,
738				provides: vec![vec![0], vec![4]],
739				..default_tx().clone()
740			})
741			.unwrap();
742
743		// then
744		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
745
746		assert_eq!(it.next(), Some(5));
747		assert_eq!(it.next(), Some(1));
748		assert_eq!(it.next(), Some(2));
749		assert_eq!(it.next(), Some(4));
750		assert_eq!(it.next(), Some(3));
751		assert_eq!(it.next(), None);
752		assert_eq!(
753			res,
754			Imported::Ready {
755				hash: 5,
756				promoted: vec![1, 2, 3, 4],
757				failed: vec![],
758				removed: vec![],
759			}
760		);
761	}
762
763	#[test]
764	fn should_handle_a_cycle() {
765		// given
766		let mut pool = pool();
767		pool.import(Transaction {
768			data: vec![1u8].into(),
769			requires: vec![vec![0]],
770			provides: vec![vec![1]],
771			..default_tx().clone()
772		})
773		.unwrap();
774		pool.import(Transaction {
775			data: vec![3u8].into(),
776			hash: 3,
777			requires: vec![vec![1]],
778			provides: vec![vec![2]],
779			..default_tx().clone()
780		})
781		.unwrap();
782		assert_eq!(pool.ready().count(), 0);
783		assert_eq!(pool.ready.len(), 0);
784
785		// when
786		pool.import(Transaction {
787			data: vec![2u8].into(),
788			hash: 2,
789			requires: vec![vec![2]],
790			provides: vec![vec![0]],
791			..default_tx().clone()
792		})
793		.unwrap();
794
795		// then
796		{
797			let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
798			assert_eq!(it.next(), None);
799		}
800		// all transactions occupy the Future queue - it's fine
801		assert_eq!(pool.future.len(), 3);
802
803		// let's close the cycle with one additional transaction
804		let res = pool
805			.import(Transaction {
806				data: vec![4u8].into(),
807				hash: 4,
808				priority: 50u64,
809				provides: vec![vec![0]],
810				..default_tx().clone()
811			})
812			.unwrap();
813		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
814		assert_eq!(it.next(), Some(4));
815		assert_eq!(it.next(), Some(1));
816		assert_eq!(it.next(), Some(3));
817		assert_eq!(it.next(), None);
818		assert_eq!(
819			res,
820			Imported::Ready { hash: 4, promoted: vec![1, 3], failed: vec![2], removed: vec![] }
821		);
822		assert_eq!(pool.future.len(), 0);
823	}
824
825	#[test]
826	fn should_handle_a_cycle_with_low_priority() {
827		// given
828		let mut pool = pool();
829		pool.import(Transaction {
830			data: vec![1u8].into(),
831			requires: vec![vec![0]],
832			provides: vec![vec![1]],
833			..default_tx().clone()
834		})
835		.unwrap();
836		pool.import(Transaction {
837			data: vec![3u8].into(),
838			hash: 3,
839			requires: vec![vec![1]],
840			provides: vec![vec![2]],
841			..default_tx().clone()
842		})
843		.unwrap();
844		assert_eq!(pool.ready().count(), 0);
845		assert_eq!(pool.ready.len(), 0);
846
847		// when
848		pool.import(Transaction {
849			data: vec![2u8].into(),
850			hash: 2,
851			requires: vec![vec![2]],
852			provides: vec![vec![0]],
853			..default_tx().clone()
854		})
855		.unwrap();
856
857		// then
858		{
859			let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
860			assert_eq!(it.next(), None);
861		}
862		// all transactions occupy the Future queue - it's fine
863		assert_eq!(pool.future.len(), 3);
864
865		// let's close the cycle with one additional transaction
866		let err = pool
867			.import(Transaction {
868				data: vec![4u8].into(),
869				hash: 4,
870				priority: 1u64, // lower priority than Tx(2)
871				provides: vec![vec![0]],
872				..default_tx().clone()
873			})
874			.unwrap_err();
875		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
876		assert_eq!(it.next(), None);
877		assert_eq!(pool.ready.len(), 0);
878		assert_eq!(pool.future.len(), 0);
879		if let error::Error::CycleDetected = err {
880		} else {
881			assert!(false, "Invalid error kind: {:?}", err);
882		}
883	}
884
885	#[test]
886	fn should_remove_invalid_transactions() {
887		// given
888		let mut pool = pool();
889		pool.import(Transaction {
890			data: vec![5u8].into(),
891			hash: 5,
892			provides: vec![vec![0], vec![4]],
893			..default_tx().clone()
894		})
895		.unwrap();
896		pool.import(Transaction {
897			data: vec![1u8].into(),
898			requires: vec![vec![0]],
899			provides: vec![vec![1]],
900			..default_tx().clone()
901		})
902		.unwrap();
903		pool.import(Transaction {
904			data: vec![3u8].into(),
905			hash: 3,
906			requires: vec![vec![2]],
907			..default_tx().clone()
908		})
909		.unwrap();
910		pool.import(Transaction {
911			data: vec![2u8].into(),
912			hash: 2,
913			requires: vec![vec![1]],
914			provides: vec![vec![3], vec![2]],
915			..default_tx().clone()
916		})
917		.unwrap();
918		pool.import(Transaction {
919			data: vec![4u8].into(),
920			hash: 4,
921			priority: 1_000u64,
922			requires: vec![vec![3], vec![4]],
923			..default_tx().clone()
924		})
925		.unwrap();
926		// future
927		pool.import(Transaction {
928			data: vec![6u8].into(),
929			hash: 6,
930			priority: 1_000u64,
931			requires: vec![vec![11]],
932			..default_tx().clone()
933		})
934		.unwrap();
935		assert_eq!(pool.ready().count(), 5);
936		assert_eq!(pool.future.len(), 1);
937
938		// when
939		pool.remove_subtree(&[6, 1]);
940
941		// then
942		assert_eq!(pool.ready().count(), 1);
943		assert_eq!(pool.future.len(), 0);
944	}
945
946	#[test]
947	fn should_prune_ready_transactions() {
948		// given
949		let mut pool = pool();
950		// future (waiting for 0)
951		pool.import(Transaction {
952			data: vec![5u8].into(),
953			hash: 5,
954			requires: vec![vec![0]],
955			provides: vec![vec![100]],
956			..default_tx().clone()
957		})
958		.unwrap();
959		// ready
960		pool.import(Transaction {
961			data: vec![1u8].into(),
962			provides: vec![vec![1]],
963			..default_tx().clone()
964		})
965		.unwrap();
966		pool.import(Transaction {
967			data: vec![2u8].into(),
968			hash: 2,
969			requires: vec![vec![2]],
970			provides: vec![vec![3]],
971			..default_tx().clone()
972		})
973		.unwrap();
974		pool.import(Transaction {
975			data: vec![3u8].into(),
976			hash: 3,
977			requires: vec![vec![1]],
978			provides: vec![vec![2]],
979			..default_tx().clone()
980		})
981		.unwrap();
982		pool.import(Transaction {
983			data: vec![4u8].into(),
984			hash: 4,
985			priority: 1_000u64,
986			requires: vec![vec![3], vec![2]],
987			provides: vec![vec![4]],
988			..default_tx().clone()
989		})
990		.unwrap();
991
992		assert_eq!(pool.ready().count(), 4);
993		assert_eq!(pool.future.len(), 1);
994
995		// when
996		let result = pool.prune_tags(vec![vec![0], vec![2]]);
997
998		// then
999		assert_eq!(result.pruned.len(), 2);
1000		assert_eq!(result.failed.len(), 0);
1001		assert_eq!(
1002			result.promoted[0],
1003			Imported::Ready { hash: 5, promoted: vec![], failed: vec![], removed: vec![] }
1004		);
1005		assert_eq!(result.promoted.len(), 1);
1006		assert_eq!(pool.future.len(), 0);
1007		assert_eq!(pool.ready.len(), 3);
1008		assert_eq!(pool.ready().count(), 3);
1009	}
1010
1011	#[test]
1012	fn transaction_debug() {
1013		assert_eq!(
1014			format!(
1015				"{:?}",
1016				Transaction {
1017					data: vec![4u8].into(),
1018					hash: 4,
1019					priority: 1_000u64,
1020					requires: vec![vec![3], vec![2]],
1021					provides: vec![vec![4]],
1022					..default_tx().clone()
1023				}
1024			),
1025			"Transaction { \
1026hash: 4, priority: 1000, valid_till: 64, bytes: 1, propagate: true, \
1027source: TransactionSource::External, requires: [03, 02], provides: [04], data: [4]}"
1028				.to_owned()
1029		);
1030	}
1031
1032	#[test]
1033	fn transaction_propagation() {
1034		assert_eq!(
1035			Transaction {
1036				data: vec![4u8].into(),
1037				hash: 4,
1038				priority: 1_000u64,
1039				requires: vec![vec![3], vec![2]],
1040				provides: vec![vec![4]],
1041				..default_tx().clone()
1042			}
1043			.is_propagable(),
1044			true
1045		);
1046
1047		assert_eq!(
1048			Transaction {
1049				data: vec![4u8].into(),
1050				hash: 4,
1051				priority: 1_000u64,
1052				requires: vec![vec![3], vec![2]],
1053				provides: vec![vec![4]],
1054				propagate: false,
1055				..default_tx().clone()
1056			}
1057			.is_propagable(),
1058			false
1059		);
1060	}
1061
1062	#[test]
1063	fn should_reject_future_transactions() {
1064		// given
1065		let mut pool = pool();
1066
1067		// when
1068		pool.reject_future_transactions = true;
1069
1070		// then
1071		let err = pool.import(Transaction {
1072			data: vec![5u8].into(),
1073			hash: 5,
1074			requires: vec![vec![0]],
1075			..default_tx().clone()
1076		});
1077
1078		if let Err(error::Error::RejectedFutureTransaction) = err {
1079		} else {
1080			assert!(false, "Invalid error kind: {:?}", err);
1081		}
1082	}
1083
1084	#[test]
1085	fn should_clear_future_queue() {
1086		// given
1087		let mut pool = pool();
1088
1089		// when
1090		pool.import(Transaction {
1091			data: vec![5u8].into(),
1092			hash: 5,
1093			requires: vec![vec![0]],
1094			..default_tx().clone()
1095		})
1096		.unwrap();
1097
1098		// then
1099		assert_eq!(pool.future.len(), 1);
1100
1101		// and then when
1102		assert_eq!(pool.clear_future().len(), 1);
1103
1104		// then
1105		assert_eq!(pool.future.len(), 0);
1106	}
1107
1108	#[test]
1109	fn should_accept_future_transactions_when_explicitly_asked_to() {
1110		// given
1111		let mut pool = pool();
1112		pool.reject_future_transactions = true;
1113
1114		// when
1115		let flag_value = pool.with_futures_enabled(|pool, flag| {
1116			pool.import(Transaction {
1117				data: vec![5u8].into(),
1118				hash: 5,
1119				requires: vec![vec![0]],
1120				..default_tx().clone()
1121			})
1122			.unwrap();
1123
1124			flag
1125		});
1126
1127		// then
1128		assert_eq!(flag_value, true);
1129		assert_eq!(pool.reject_future_transactions, true);
1130		assert_eq!(pool.future.len(), 1);
1131	}
1132}