1use crate::{common::log_xt::log_xt_trace, LOG_TARGET};
20use futures::{channel::mpsc::Receiver, Future};
21use indexmap::IndexMap;
22use sc_transaction_pool_api::error;
23use sp_blockchain::{HashAndNumber, TreeRoute};
24use sp_runtime::{
25 generic::BlockId,
26 traits::{self, Block as BlockT, SaturatedConversion},
27 transaction_validity::{
28 TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
29 },
30};
31use std::{
32 collections::HashMap,
33 sync::Arc,
34 time::{Duration, Instant},
35};
36
37use super::{
38 base_pool as base,
39 validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
40 watcher::Watcher,
41};
42
43pub type EventStream<H> = Receiver<H>;
45
46pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
48pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
50pub type ExtrinsicFor<A> = Arc<<<A as ChainApi>::Block as traits::Block>::Extrinsic>;
52pub type RawExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
54pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
56pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
58pub type ValidatedTransactionFor<A> =
60 ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
61
62pub trait ChainApi: Send + Sync {
64 type Block: BlockT;
66 type Error: From<error::Error> + error::IntoPoolError;
68 type ValidationFuture: Future<Output = Result<TransactionValidity, Self::Error>> + Send + Unpin;
70 type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>>
72 + Unpin
73 + Send
74 + 'static;
75
76 fn validate_transaction(
78 &self,
79 at: <Self::Block as BlockT>::Hash,
80 source: TransactionSource,
81 uxt: ExtrinsicFor<Self>,
82 ) -> Self::ValidationFuture;
83
84 fn validate_transaction_blocking(
89 &self,
90 at: <Self::Block as BlockT>::Hash,
91 source: TransactionSource,
92 uxt: ExtrinsicFor<Self>,
93 ) -> Result<TransactionValidity, Self::Error>;
94
95 fn block_id_to_number(
97 &self,
98 at: &BlockId<Self::Block>,
99 ) -> Result<Option<NumberFor<Self>>, Self::Error>;
100
101 fn block_id_to_hash(
103 &self,
104 at: &BlockId<Self::Block>,
105 ) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
106
107 fn hash_and_length(&self, uxt: &RawExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
109
110 fn block_body(&self, at: <Self::Block as BlockT>::Hash) -> Self::BodyFuture;
112
113 fn block_header(
115 &self,
116 at: <Self::Block as BlockT>::Hash,
117 ) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
118
119 fn tree_route(
121 &self,
122 from: <Self::Block as BlockT>::Hash,
123 to: <Self::Block as BlockT>::Hash,
124 ) -> Result<TreeRoute<Self::Block>, Self::Error>;
125
126 fn resolve_block_number(
128 &self,
129 at: <Self::Block as BlockT>::Hash,
130 ) -> Result<NumberFor<Self>, Self::Error> {
131 self.block_id_to_number(&BlockId::Hash(at)).and_then(|number| {
132 number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
133 })
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct Options {
140 pub ready: base::Limit,
142 pub future: base::Limit,
144 pub reject_future_transactions: bool,
146 pub ban_time: Duration,
148}
149
150impl Default for Options {
151 fn default() -> Self {
152 Self {
153 ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
154 future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
155 reject_future_transactions: false,
156 ban_time: Duration::from_secs(60 * 30),
157 }
158 }
159}
160
161#[derive(Copy, Clone)]
164enum CheckBannedBeforeVerify {
165 Yes,
166 No,
167}
168
169pub struct Pool<B: ChainApi> {
171 validated_pool: Arc<ValidatedPool<B>>,
172}
173
174impl<B: ChainApi> Pool<B> {
175 pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
177 Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
178 }
179
180 pub async fn submit_at(
182 &self,
183 at: &HashAndNumber<B::Block>,
184 source: TransactionSource,
185 xts: impl IntoIterator<Item = ExtrinsicFor<B>>,
186 ) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
187 let xts = xts.into_iter().map(|xt| (source, xt));
188 let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await;
189 self.validated_pool.submit(validated_transactions.into_values())
190 }
191
192 pub async fn resubmit_at(
196 &self,
197 at: &HashAndNumber<B::Block>,
198 source: TransactionSource,
199 xts: impl IntoIterator<Item = ExtrinsicFor<B>>,
200 ) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
201 let xts = xts.into_iter().map(|xt| (source, xt));
202 let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await;
203 self.validated_pool.submit(validated_transactions.into_values())
204 }
205
206 pub async fn submit_one(
208 &self,
209 at: &HashAndNumber<B::Block>,
210 source: TransactionSource,
211 xt: ExtrinsicFor<B>,
212 ) -> Result<ExtrinsicHash<B>, B::Error> {
213 let res = self.submit_at(at, source, std::iter::once(xt)).await.pop();
214 res.expect("One extrinsic passed; one result returned; qed")
215 }
216
217 pub async fn submit_and_watch(
219 &self,
220 at: &HashAndNumber<B::Block>,
221 source: TransactionSource,
222 xt: ExtrinsicFor<B>,
223 ) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
224 let (_, tx) = self
225 .verify_one(at.hash, at.number, source, xt, CheckBannedBeforeVerify::Yes)
226 .await;
227 self.validated_pool.submit_and_watch(tx)
228 }
229
230 pub fn resubmit(
232 &self,
233 revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
234 ) {
235 let now = Instant::now();
236 self.validated_pool.resubmit(revalidated_transactions);
237 log::trace!(
238 target: LOG_TARGET,
239 "Resubmitted. Took {} ms. Status: {:?}",
240 now.elapsed().as_millis(),
241 self.validated_pool.status()
242 );
243 }
244
245 pub fn prune_known(&self, at: &HashAndNumber<B::Block>, hashes: &[ExtrinsicHash<B>]) {
251 let in_pool_tags =
253 self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
254
255 let prune_status = self.validated_pool.prune_tags(in_pool_tags);
257 let pruned_transactions =
258 hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
259 self.validated_pool.fire_pruned(at, pruned_transactions);
260 }
261
262 pub async fn prune(
269 &self,
270 at: &HashAndNumber<B::Block>,
271 parent: <B::Block as BlockT>::Hash,
272 extrinsics: &[RawExtrinsicFor<B>],
273 ) {
274 log::debug!(
275 target: LOG_TARGET,
276 "Starting pruning of block {:?} (extrinsics: {})",
277 at,
278 extrinsics.len()
279 );
280 let in_pool_hashes =
282 extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
283 let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
284
285 let all = extrinsics.iter().zip(in_pool_tags.into_iter());
288 let mut validated_counter: usize = 0;
289
290 let mut future_tags = Vec::new();
291 for (extrinsic, in_pool_tags) in all {
292 match in_pool_tags {
293 Some(tags) => future_tags.extend(tags),
295 None => {
298 if !self.validated_pool.status().is_empty() {
300 validated_counter = validated_counter + 1;
301 let validity = self
302 .validated_pool
303 .api()
304 .validate_transaction(
305 parent,
306 TransactionSource::InBlock,
307 Arc::from(extrinsic.clone()),
308 )
309 .await;
310
311 log::trace!(target: LOG_TARGET,"[{:?}] prune::revalidated {:?}", self.validated_pool.api().hash_and_length(&extrinsic.clone()).0, validity);
312
313 if let Ok(Ok(validity)) = validity {
314 future_tags.extend(validity.provides);
315 }
316 } else {
317 log::trace!(
318 target: LOG_TARGET,
319 "txpool is empty, skipping validation for block {at:?}",
320 );
321 }
322 },
323 }
324 }
325
326 log::trace!(target: LOG_TARGET,"prune: validated_counter:{validated_counter}");
327
328 self.prune_tags(at, future_tags, in_pool_hashes).await
329 }
330
331 pub async fn prune_tags(
353 &self,
354 at: &HashAndNumber<B::Block>,
355 tags: impl IntoIterator<Item = Tag>,
356 known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
357 ) {
358 log::trace!(target: LOG_TARGET, "Pruning at {:?}", at);
359 let prune_status = self.validated_pool.prune_tags(tags);
361
362 self.validated_pool
366 .ban(&Instant::now(), known_imported_hashes.clone().into_iter());
367
368 let pruned_transactions =
371 prune_status.pruned.into_iter().map(|tx| (tx.source, tx.data.clone()));
372
373 let reverified_transactions =
374 self.verify(at, pruned_transactions, CheckBannedBeforeVerify::Yes).await;
375
376 let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect();
377
378 log::trace!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions: {}", &at, reverified_transactions.len());
379 log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "[{:?}] Resubmitting transaction: {:?}");
380
381 self.validated_pool.resubmit_pruned(
383 &at,
384 known_imported_hashes,
385 pruned_hashes,
386 reverified_transactions.into_values().collect(),
387 )
388 }
389
390 pub fn hash_of(&self, xt: &RawExtrinsicFor<B>) -> ExtrinsicHash<B> {
392 self.validated_pool.api().hash_and_length(xt).0
393 }
394
395 async fn verify(
397 &self,
398 at: &HashAndNumber<B::Block>,
399 xts: impl IntoIterator<Item = (TransactionSource, ExtrinsicFor<B>)>,
400 check: CheckBannedBeforeVerify,
401 ) -> IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>> {
402 let HashAndNumber { number, hash } = *at;
403
404 let res = futures::future::join_all(
405 xts.into_iter()
406 .map(|(source, xt)| self.verify_one(hash, number, source, xt, check)),
407 )
408 .await
409 .into_iter()
410 .collect::<IndexMap<_, _>>();
411
412 res
413 }
414
415 async fn verify_one(
417 &self,
418 block_hash: <B::Block as BlockT>::Hash,
419 block_number: NumberFor<B>,
420 source: TransactionSource,
421 xt: ExtrinsicFor<B>,
422 check: CheckBannedBeforeVerify,
423 ) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
424 let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
425
426 let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
427 if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
428 return (hash, ValidatedTransaction::Invalid(hash, err))
429 }
430
431 let validation_result = self
432 .validated_pool
433 .api()
434 .validate_transaction(block_hash, source, xt.clone())
435 .await;
436
437 let status = match validation_result {
438 Ok(status) => status,
439 Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
440 };
441
442 let validity = match status {
443 Ok(validity) =>
444 if validity.provides.is_empty() {
445 ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
446 } else {
447 ValidatedTransaction::valid_at(
448 block_number.saturated_into::<u64>(),
449 hash,
450 source,
451 xt,
452 bytes,
453 validity,
454 )
455 },
456 Err(TransactionValidityError::Invalid(e)) =>
457 ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
458 Err(TransactionValidityError::Unknown(e)) =>
459 ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
460 };
461
462 (hash, validity)
463 }
464
465 pub fn validated_pool(&self) -> &ValidatedPool<B> {
467 &self.validated_pool
468 }
469
470 pub fn clear_recently_pruned(&mut self) {
472 self.validated_pool.pool.write().clear_recently_pruned();
473 }
474}
475
476impl<B: ChainApi> Pool<B> {
477 pub fn deep_clone(&self) -> Self {
481 let other: ValidatedPool<B> = (*self.validated_pool).clone();
482 Self { validated_pool: Arc::from(other) }
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use super::{super::base_pool::Limit, *};
489 use crate::common::tests::{pool, uxt, TestApi, INVALID_NONCE};
490 use assert_matches::assert_matches;
491 use codec::Encode;
492 use futures::executor::block_on;
493 use parking_lot::Mutex;
494 use sc_transaction_pool_api::TransactionStatus;
495 use sp_runtime::transaction_validity::TransactionSource;
496 use std::{collections::HashMap, time::Instant};
497 use substrate_test_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
498 use substrate_test_runtime_client::AccountKeyring::{Alice, Bob};
499
500 const SOURCE: TransactionSource = TransactionSource::External;
501
502 #[test]
503 fn should_validate_and_import_transaction() {
504 let (pool, api) = pool();
506
507 let hash = block_on(
509 pool.submit_one(
510 &api.expect_hash_and_number(0),
511 SOURCE,
512 uxt(Transfer {
513 from: Alice.into(),
514 to: AccountId::from_h256(H256::from_low_u64_be(2)),
515 amount: 5,
516 nonce: 0,
517 })
518 .into(),
519 ),
520 )
521 .unwrap();
522
523 assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
525 }
526
527 #[test]
528 fn submit_at_preserves_order() {
529 sp_tracing::try_init_simple();
530 let (pool, api) = pool();
532
533 let txs = (0..10)
534 .map(|i| {
535 uxt(Transfer {
536 from: Alice.into(),
537 to: AccountId::from_h256(H256::from_low_u64_be(i)),
538 amount: 5,
539 nonce: i,
540 })
541 .into()
542 })
543 .collect::<Vec<_>>();
544
545 let initial_hashes = txs.iter().map(|t| api.hash_and_length(t).0).collect::<Vec<_>>();
546
547 let txs = txs.into_iter().map(|x| Arc::from(x)).collect::<Vec<_>>();
549 let hashes = block_on(pool.submit_at(&api.expect_hash_and_number(0), SOURCE, txs));
550 log::debug!("--> {hashes:#?}");
551
552 hashes.into_iter().zip(initial_hashes.into_iter()).for_each(
554 |(result_hash, initial_hash)| {
555 assert_eq!(result_hash.unwrap(), initial_hash);
556 },
557 );
558 }
559
560 #[test]
561 fn should_reject_if_temporarily_banned() {
562 let (pool, api) = pool();
564 let uxt = uxt(Transfer {
565 from: Alice.into(),
566 to: AccountId::from_h256(H256::from_low_u64_be(2)),
567 amount: 5,
568 nonce: 0,
569 });
570
571 pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
573 let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()));
574 assert_eq!(pool.validated_pool().status().ready, 0);
575 assert_eq!(pool.validated_pool().status().future, 0);
576
577 assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
579 }
580
581 #[test]
582 fn should_reject_unactionable_transactions() {
583 let api = Arc::new(TestApi::default());
585 let pool = Pool::new(
586 Default::default(),
587 false.into(),
589 api.clone(),
590 );
591
592 let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
594
595 let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()));
597
598 assert_matches!(res.unwrap_err(), error::Error::Unactionable);
600 }
601
602 #[test]
603 fn should_notify_about_pool_events() {
604 let (stream, hash0, hash1) = {
605 let (pool, api) = pool();
607 let han_of_block0 = api.expect_hash_and_number(0);
608 let stream = pool.validated_pool().import_notification_stream();
609
610 let hash0 = block_on(
612 pool.submit_one(
613 &han_of_block0,
614 SOURCE,
615 uxt(Transfer {
616 from: Alice.into(),
617 to: AccountId::from_h256(H256::from_low_u64_be(2)),
618 amount: 5,
619 nonce: 0,
620 })
621 .into(),
622 ),
623 )
624 .unwrap();
625 let hash1 = block_on(
626 pool.submit_one(
627 &han_of_block0,
628 SOURCE,
629 uxt(Transfer {
630 from: Alice.into(),
631 to: AccountId::from_h256(H256::from_low_u64_be(2)),
632 amount: 5,
633 nonce: 1,
634 })
635 .into(),
636 ),
637 )
638 .unwrap();
639 let _hash = block_on(
641 pool.submit_one(
642 &han_of_block0,
643 SOURCE,
644 uxt(Transfer {
645 from: Alice.into(),
646 to: AccountId::from_h256(H256::from_low_u64_be(2)),
647 amount: 5,
648 nonce: 3,
649 })
650 .into(),
651 ),
652 )
653 .unwrap();
654
655 assert_eq!(pool.validated_pool().status().ready, 2);
656 assert_eq!(pool.validated_pool().status().future, 1);
657
658 (stream, hash0, hash1)
659 };
660
661 let mut it = futures::executor::block_on_stream(stream);
663 assert_eq!(it.next(), Some(hash0));
664 assert_eq!(it.next(), Some(hash1));
665 assert_eq!(it.next(), None);
666 }
667
668 #[test]
669 fn should_clear_stale_transactions() {
670 let (pool, api) = pool();
672 let han_of_block0 = api.expect_hash_and_number(0);
673 let hash1 = block_on(
674 pool.submit_one(
675 &han_of_block0,
676 SOURCE,
677 uxt(Transfer {
678 from: Alice.into(),
679 to: AccountId::from_h256(H256::from_low_u64_be(2)),
680 amount: 5,
681 nonce: 0,
682 })
683 .into(),
684 ),
685 )
686 .unwrap();
687 let hash2 = block_on(
688 pool.submit_one(
689 &han_of_block0,
690 SOURCE,
691 uxt(Transfer {
692 from: Alice.into(),
693 to: AccountId::from_h256(H256::from_low_u64_be(2)),
694 amount: 5,
695 nonce: 1,
696 })
697 .into(),
698 ),
699 )
700 .unwrap();
701 let hash3 = block_on(
702 pool.submit_one(
703 &han_of_block0,
704 SOURCE,
705 uxt(Transfer {
706 from: Alice.into(),
707 to: AccountId::from_h256(H256::from_low_u64_be(2)),
708 amount: 5,
709 nonce: 3,
710 })
711 .into(),
712 ),
713 )
714 .unwrap();
715
716 pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
718
719 assert_eq!(pool.validated_pool().ready().count(), 0);
721 assert_eq!(pool.validated_pool().status().future, 0);
722 assert_eq!(pool.validated_pool().status().ready, 0);
723 assert!(pool.validated_pool.is_banned(&hash1));
725 assert!(pool.validated_pool.is_banned(&hash2));
726 assert!(pool.validated_pool.is_banned(&hash3));
727 }
728
729 #[test]
730 fn should_ban_mined_transactions() {
731 let (pool, api) = pool();
733 let hash1 = block_on(
734 pool.submit_one(
735 &api.expect_hash_and_number(0),
736 SOURCE,
737 uxt(Transfer {
738 from: Alice.into(),
739 to: AccountId::from_h256(H256::from_low_u64_be(2)),
740 amount: 5,
741 nonce: 0,
742 })
743 .into(),
744 ),
745 )
746 .unwrap();
747
748 block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
750
751 assert!(pool.validated_pool.is_banned(&hash1));
753 }
754
755 #[test]
756 fn should_limit_futures() {
757 sp_tracing::try_init_simple();
758
759 let xt = uxt(Transfer {
760 from: Alice.into(),
761 to: AccountId::from_h256(H256::from_low_u64_be(2)),
762 amount: 5,
763 nonce: 1,
764 });
765
766 let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
768
769 let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
770
771 let api = Arc::new(TestApi::default());
772 let pool = Pool::new(options, true.into(), api.clone());
773
774 let hash1 =
775 block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into())).unwrap();
776 assert_eq!(pool.validated_pool().status().future, 1);
777
778 let hash2 = block_on(
780 pool.submit_one(
781 &api.expect_hash_and_number(0),
782 SOURCE,
783 uxt(Transfer {
784 from: Bob.into(),
785 to: AccountId::from_h256(H256::from_low_u64_be(2)),
786 amount: 5,
787 nonce: 10,
788 })
789 .into(),
790 ),
791 )
792 .unwrap();
793
794 assert_eq!(pool.validated_pool().status().future, 1);
796 assert!(pool.validated_pool.is_banned(&hash1));
797 assert!(!pool.validated_pool.is_banned(&hash2));
798 }
799
800 #[test]
801 fn should_error_if_reject_immediately() {
802 let limit = Limit { count: 100, total_bytes: 10 };
804
805 let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
806
807 let api = Arc::new(TestApi::default());
808 let pool = Pool::new(options, true.into(), api.clone());
809
810 block_on(
812 pool.submit_one(
813 &api.expect_hash_and_number(0),
814 SOURCE,
815 uxt(Transfer {
816 from: Alice.into(),
817 to: AccountId::from_h256(H256::from_low_u64_be(2)),
818 amount: 5,
819 nonce: 1,
820 })
821 .into(),
822 ),
823 )
824 .unwrap_err();
825
826 assert_eq!(pool.validated_pool().status().ready, 0);
828 assert_eq!(pool.validated_pool().status().future, 0);
829 }
830
831 #[test]
832 fn should_reject_transactions_with_no_provides() {
833 let (pool, api) = pool();
835
836 let err = block_on(
838 pool.submit_one(
839 &api.expect_hash_and_number(0),
840 SOURCE,
841 uxt(Transfer {
842 from: Alice.into(),
843 to: AccountId::from_h256(H256::from_low_u64_be(2)),
844 amount: 5,
845 nonce: INVALID_NONCE,
846 })
847 .into(),
848 ),
849 )
850 .unwrap_err();
851
852 assert_eq!(pool.validated_pool().status().ready, 0);
854 assert_eq!(pool.validated_pool().status().future, 0);
855 assert_matches!(err, error::Error::NoTagsProvided);
856 }
857
858 mod listener {
859 use super::*;
860
861 #[test]
862 fn should_trigger_ready_and_finalized() {
863 let (pool, api) = pool();
865 let watcher = block_on(
866 pool.submit_and_watch(
867 &api.expect_hash_and_number(0),
868 SOURCE,
869 uxt(Transfer {
870 from: Alice.into(),
871 to: AccountId::from_h256(H256::from_low_u64_be(2)),
872 amount: 5,
873 nonce: 0,
874 })
875 .into(),
876 ),
877 )
878 .unwrap();
879 assert_eq!(pool.validated_pool().status().ready, 1);
880 assert_eq!(pool.validated_pool().status().future, 0);
881
882 let han_of_block2 = api.expect_hash_and_number(2);
883
884 block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![]));
886 assert_eq!(pool.validated_pool().status().ready, 0);
887 assert_eq!(pool.validated_pool().status().future, 0);
888
889 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
891 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
892 assert_eq!(
893 stream.next(),
894 Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
895 );
896 }
897
898 #[test]
899 fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
900 let (pool, api) = pool();
902 let watcher = block_on(
903 pool.submit_and_watch(
904 &api.expect_hash_and_number(0),
905 SOURCE,
906 uxt(Transfer {
907 from: Alice.into(),
908 to: AccountId::from_h256(H256::from_low_u64_be(2)),
909 amount: 5,
910 nonce: 0,
911 })
912 .into(),
913 ),
914 )
915 .unwrap();
916 assert_eq!(pool.validated_pool().status().ready, 1);
917 assert_eq!(pool.validated_pool().status().future, 0);
918
919 let han_of_block2 = api.expect_hash_and_number(2);
920
921 block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![*watcher.hash()]));
923 assert_eq!(pool.validated_pool().status().ready, 0);
924 assert_eq!(pool.validated_pool().status().future, 0);
925
926 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
928 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
929 assert_eq!(
930 stream.next(),
931 Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
932 );
933 }
934
935 #[test]
936 fn should_trigger_future_and_ready_after_promoted() {
937 let (pool, api) = pool();
939 let han_of_block0 = api.expect_hash_and_number(0);
940
941 let watcher = block_on(
942 pool.submit_and_watch(
943 &han_of_block0,
944 SOURCE,
945 uxt(Transfer {
946 from: Alice.into(),
947 to: AccountId::from_h256(H256::from_low_u64_be(2)),
948 amount: 5,
949 nonce: 1,
950 })
951 .into(),
952 ),
953 )
954 .unwrap();
955 assert_eq!(pool.validated_pool().status().ready, 0);
956 assert_eq!(pool.validated_pool().status().future, 1);
957
958 block_on(
960 pool.submit_one(
961 &han_of_block0,
962 SOURCE,
963 uxt(Transfer {
964 from: Alice.into(),
965 to: AccountId::from_h256(H256::from_low_u64_be(2)),
966 amount: 5,
967 nonce: 0,
968 })
969 .into(),
970 ),
971 )
972 .unwrap();
973 assert_eq!(pool.validated_pool().status().ready, 2);
974
975 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
977 assert_eq!(stream.next(), Some(TransactionStatus::Future));
978 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
979 }
980
981 #[test]
982 fn should_trigger_invalid_and_ban() {
983 let (pool, api) = pool();
985 let uxt = uxt(Transfer {
986 from: Alice.into(),
987 to: AccountId::from_h256(H256::from_low_u64_be(2)),
988 amount: 5,
989 nonce: 0,
990 });
991 let watcher =
992 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
993 .unwrap();
994 assert_eq!(pool.validated_pool().status().ready, 1);
995
996 pool.validated_pool.remove_invalid(&[*watcher.hash()]);
998
999 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1001 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1002 assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
1003 assert_eq!(stream.next(), None);
1004 }
1005
1006 #[test]
1007 fn should_trigger_broadcasted() {
1008 let (pool, api) = pool();
1010 let uxt = uxt(Transfer {
1011 from: Alice.into(),
1012 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1013 amount: 5,
1014 nonce: 0,
1015 });
1016 let watcher =
1017 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1018 .unwrap();
1019 assert_eq!(pool.validated_pool().status().ready, 1);
1020
1021 let mut map = HashMap::new();
1023 let peers = vec!["a".into(), "b".into(), "c".into()];
1024 map.insert(*watcher.hash(), peers.clone());
1025 pool.validated_pool().on_broadcasted(map);
1026
1027 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1029 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1030 assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
1031 }
1032
1033 #[test]
1034 fn should_trigger_dropped_older() {
1035 let limit = Limit { count: 1, total_bytes: 1000 };
1037 let options =
1038 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1039
1040 let api = Arc::new(TestApi::default());
1041 let pool = Pool::new(options, true.into(), api.clone());
1042
1043 let xt = uxt(Transfer {
1044 from: Alice.into(),
1045 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1046 amount: 5,
1047 nonce: 0,
1048 });
1049 let watcher =
1050 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1051 .unwrap();
1052 assert_eq!(pool.validated_pool().status().ready, 1);
1053
1054 let xt = uxt(Transfer {
1056 from: Bob.into(),
1057 to: AccountId::from_h256(H256::from_low_u64_be(1)),
1058 amount: 4,
1059 nonce: 1,
1060 });
1061 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into())).unwrap();
1062 assert_eq!(pool.validated_pool().status().ready, 1);
1063
1064 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1066 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1067 assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1068 }
1069
1070 #[test]
1071 fn should_trigger_dropped_lower_priority() {
1072 {
1073 let limit = Limit { count: 1, total_bytes: 1000 };
1075 let options =
1076 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1077
1078 let api = Arc::new(TestApi::default());
1079 let pool = Pool::new(options, true.into(), api.clone());
1080
1081 let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1084 block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1085 .unwrap();
1086 assert_eq!(pool.validated_pool().status().ready, 1);
1087
1088 let xt = uxt(Transfer {
1092 from: Bob.into(),
1093 to: AccountId::from_h256(H256::from_low_u64_be(1)),
1094 amount: 4,
1095 nonce: 1,
1096 });
1097 let result =
1098 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()));
1099 assert!(matches!(
1100 result,
1101 Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
1102 ));
1103 }
1104 {
1105 let limit = Limit { count: 2, total_bytes: 1000 };
1107 let options =
1108 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1109
1110 let api = Arc::new(TestApi::default());
1111 let pool = Pool::new(options, true.into(), api.clone());
1112
1113 let han_of_block0 = api.expect_hash_and_number(0);
1114
1115 let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1118 block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap();
1119 assert_eq!(pool.validated_pool().status().ready, 1);
1120
1121 let xt = uxt(Transfer {
1124 from: Alice.into(),
1125 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1126 amount: 5,
1127 nonce: 0,
1128 });
1129 let watcher =
1130 block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap();
1131 assert_eq!(pool.validated_pool().status().ready, 2);
1132
1133 let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
1137 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()))
1138 .unwrap();
1139 assert_eq!(pool.validated_pool().status().ready, 2);
1140
1141 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1143 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1144 assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1145 }
1146 }
1147
1148 #[test]
1149 fn should_handle_pruning_in_the_middle_of_import() {
1150 let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
1152 let (tx, rx) = std::sync::mpsc::sync_channel(1);
1153 let mut api = TestApi::default();
1154 api.delay = Arc::new(Mutex::new(rx.into()));
1155 let api = Arc::new(api);
1156 let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
1157
1158 let han_of_block0 = api.expect_hash_and_number(0);
1159
1160 let xt = uxt(Transfer {
1162 from: Alice.into(),
1163 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1164 amount: 5,
1165 nonce: 1,
1166 });
1167
1168 let pool2 = pool.clone();
1170 std::thread::spawn({
1171 let hash_of_block0 = han_of_block0.clone();
1172 move || {
1173 block_on(pool2.submit_one(&hash_of_block0, SOURCE, xt.into())).unwrap();
1174 ready.send(()).unwrap();
1175 }
1176 });
1177
1178 let xt = uxt(Transfer {
1181 from: Alice.into(),
1182 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1183 amount: 4,
1184 nonce: 0,
1185 });
1186 let provides = vec![0_u8];
1188 block_on(pool.submit_one(&han_of_block0, SOURCE, xt.into())).unwrap();
1189 assert_eq!(pool.validated_pool().status().ready, 1);
1190
1191 block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![provides], vec![]));
1194 assert_eq!(pool.validated_pool().status().ready, 0);
1195
1196 tx.send(()).unwrap();
1200
1201 is_ready.recv().unwrap(); assert_eq!(pool.validated_pool().status().ready, 1);
1204 assert_eq!(pool.validated_pool().status().future, 0);
1205 }
1206 }
1207}