sc_transaction_pool/single_state_txpool/
single_state_txpool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate transaction pool implementation.
20
21use super::{metrics::MetricsLink as PrometheusMetrics, revalidation};
22pub use crate::{
23	api::FullChainApi,
24	graph::{ChainApi, ValidatedTransaction},
25};
26use crate::{
27	common::{
28		enactment_state::{EnactmentAction, EnactmentState},
29		error,
30		log_xt::log_xt_trace,
31	},
32	graph,
33	graph::{ExtrinsicHash, IsValidator},
34	PolledIterator, ReadyIteratorFor, LOG_TARGET,
35};
36use async_trait::async_trait;
37use futures::{channel::oneshot, future, prelude::*, Future, FutureExt};
38use parking_lot::Mutex;
39use prometheus_endpoint::Registry as PrometheusRegistry;
40use sc_transaction_pool_api::{
41	error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
42	PoolFuture, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
43	TransactionStatusStreamFor, TxHash,
44};
45use sp_blockchain::{HashAndNumber, TreeRoute};
46use sp_core::traits::SpawnEssentialNamed;
47use sp_runtime::{
48	generic::BlockId,
49	traits::{AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, Zero},
50};
51use std::{
52	collections::{HashMap, HashSet},
53	pin::Pin,
54	sync::Arc,
55	time::Instant,
56};
57use tokio::select;
58
59/// Basic implementation of transaction pool that can be customized by providing PoolApi.
60pub struct BasicPool<PoolApi, Block>
61where
62	Block: BlockT,
63	PoolApi: graph::ChainApi<Block = Block>,
64{
65	pool: Arc<graph::Pool<PoolApi>>,
66	api: Arc<PoolApi>,
67	revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
68	revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
69	ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
70	metrics: PrometheusMetrics,
71	enactment_state: Arc<Mutex<EnactmentState<Block>>>,
72}
73
74struct ReadyPoll<T, Block: BlockT> {
75	updated_at: NumberFor<Block>,
76	pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
77}
78
79impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
80	fn default() -> Self {
81		Self { updated_at: NumberFor::<Block>::zero(), pollers: Default::default() }
82	}
83}
84
85impl<T, Block: BlockT> ReadyPoll<T, Block> {
86	fn new(best_block_number: NumberFor<Block>) -> Self {
87		Self { updated_at: best_block_number, pollers: Default::default() }
88	}
89
90	fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
91		self.updated_at = number;
92
93		let mut idx = 0;
94		while idx < self.pollers.len() {
95			if self.pollers[idx].0 <= number {
96				let poller_sender = self.pollers.swap_remove(idx);
97				log::trace!(target: LOG_TARGET, "Sending ready signal at block {}", number);
98				let _ = poller_sender.1.send(iterator_factory());
99			} else {
100				idx += 1;
101			}
102		}
103	}
104
105	fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
106		let (sender, receiver) = oneshot::channel();
107		self.pollers.push((number, sender));
108		receiver
109	}
110
111	fn updated_at(&self) -> NumberFor<Block> {
112		self.updated_at
113	}
114}
115
116/// Type of revalidation.
117pub enum RevalidationType {
118	/// Light revalidation type.
119	///
120	/// During maintenance, transaction pool makes periodic revalidation
121	/// of all transactions depending on number of blocks or time passed.
122	/// Also this kind of revalidation does not resubmit transactions from
123	/// retracted blocks, since it is too expensive.
124	Light,
125
126	/// Full revalidation type.
127	///
128	/// During maintenance, transaction pool revalidates some fixed amount of
129	/// transactions from the pool of valid transactions.
130	Full,
131}
132
133impl<PoolApi, Block> BasicPool<PoolApi, Block>
134where
135	Block: BlockT,
136	PoolApi: graph::ChainApi<Block = Block> + 'static,
137{
138	/// Create new basic transaction pool with provided api, for tests.
139	pub fn new_test(
140		pool_api: Arc<PoolApi>,
141		best_block_hash: Block::Hash,
142		finalized_hash: Block::Hash,
143		options: graph::Options,
144	) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
145		let pool = Arc::new(graph::Pool::new(options, true.into(), pool_api.clone()));
146		let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
147			pool_api.clone(),
148			pool.clone(),
149			finalized_hash,
150		);
151		(
152			Self {
153				api: pool_api,
154				pool,
155				revalidation_queue: Arc::new(revalidation_queue),
156				revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
157				ready_poll: Default::default(),
158				metrics: Default::default(),
159				enactment_state: Arc::new(Mutex::new(EnactmentState::new(
160					best_block_hash,
161					finalized_hash,
162				))),
163			},
164			background_task,
165		)
166	}
167
168	/// Create new basic transaction pool with provided api and custom
169	/// revalidation type.
170	pub fn with_revalidation_type(
171		options: graph::Options,
172		is_validator: IsValidator,
173		pool_api: Arc<PoolApi>,
174		prometheus: Option<&PrometheusRegistry>,
175		revalidation_type: RevalidationType,
176		spawner: impl SpawnEssentialNamed,
177		best_block_number: NumberFor<Block>,
178		best_block_hash: Block::Hash,
179		finalized_hash: Block::Hash,
180	) -> Self {
181		let pool = Arc::new(graph::Pool::new(options, is_validator, pool_api.clone()));
182		let (revalidation_queue, background_task) = match revalidation_type {
183			RevalidationType::Light =>
184				(revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
185			RevalidationType::Full => {
186				let (queue, background) = revalidation::RevalidationQueue::new_background(
187					pool_api.clone(),
188					pool.clone(),
189					finalized_hash,
190				);
191				(queue, Some(background))
192			},
193		};
194
195		if let Some(background_task) = background_task {
196			spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task);
197		}
198
199		Self {
200			api: pool_api,
201			pool,
202			revalidation_queue: Arc::new(revalidation_queue),
203			revalidation_strategy: Arc::new(Mutex::new(match revalidation_type {
204				RevalidationType::Light =>
205					RevalidationStrategy::Light(RevalidationStatus::NotScheduled),
206				RevalidationType::Full => RevalidationStrategy::Always,
207			})),
208			ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
209			metrics: PrometheusMetrics::new(prometheus),
210			enactment_state: Arc::new(Mutex::new(EnactmentState::new(
211				best_block_hash,
212				finalized_hash,
213			))),
214		}
215	}
216
217	/// Gets shared reference to the underlying pool.
218	pub fn pool(&self) -> &Arc<graph::Pool<PoolApi>> {
219		&self.pool
220	}
221
222	/// Get access to the underlying api
223	pub fn api(&self) -> &PoolApi {
224		&self.api
225	}
226
227	fn ready_at_with_timeout_internal(
228		&self,
229		at: Block::Hash,
230		timeout: std::time::Duration,
231	) -> PolledIterator<PoolApi> {
232		let timeout = futures_timer::Delay::new(timeout);
233		let ready_maintained = self.ready_at(at);
234		let ready_current = self.ready();
235
236		let ready = async {
237			select! {
238				ready = ready_maintained => ready,
239				_ = timeout => ready_current
240			}
241		};
242
243		Box::pin(ready)
244	}
245}
246
247impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
248where
249	Block: BlockT,
250	PoolApi: 'static + graph::ChainApi<Block = Block>,
251{
252	type Block = PoolApi::Block;
253	type Hash = graph::ExtrinsicHash<PoolApi>;
254	type InPoolTransaction =
255		graph::base_pool::Transaction<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
256	type Error = PoolApi::Error;
257
258	fn submit_at(
259		&self,
260		at: <Self::Block as BlockT>::Hash,
261		source: TransactionSource,
262		xts: Vec<TransactionFor<Self>>,
263	) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
264		let pool = self.pool.clone();
265		let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
266
267		self.metrics
268			.report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64));
269
270		let number = self.api.resolve_block_number(at);
271		async move {
272			let at = HashAndNumber { hash: at, number: number? };
273			Ok(pool.submit_at(&at, source, xts).await)
274		}
275		.boxed()
276	}
277
278	fn submit_one(
279		&self,
280		at: <Self::Block as BlockT>::Hash,
281		source: TransactionSource,
282		xt: TransactionFor<Self>,
283	) -> PoolFuture<TxHash<Self>, Self::Error> {
284		let pool = self.pool.clone();
285		let xt = Arc::from(xt);
286
287		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
288
289		let number = self.api.resolve_block_number(at);
290		async move {
291			let at = HashAndNumber { hash: at, number: number? };
292			pool.submit_one(&at, source, xt).await
293		}
294		.boxed()
295	}
296
297	fn submit_and_watch(
298		&self,
299		at: <Self::Block as BlockT>::Hash,
300		source: TransactionSource,
301		xt: TransactionFor<Self>,
302	) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
303		let pool = self.pool.clone();
304		let xt = Arc::from(xt);
305
306		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
307
308		let number = self.api.resolve_block_number(at);
309
310		async move {
311			let at = HashAndNumber { hash: at, number: number? };
312			let watcher = pool.submit_and_watch(&at, source, xt).await?;
313
314			Ok(watcher.into_stream().boxed())
315		}
316		.boxed()
317	}
318
319	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
320		let removed = self.pool.validated_pool().remove_invalid(hashes);
321		self.metrics
322			.report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
323		removed
324	}
325
326	fn status(&self) -> PoolStatus {
327		self.pool.validated_pool().status()
328	}
329
330	fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
331		self.pool.validated_pool().import_notification_stream()
332	}
333
334	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
335		self.pool.hash_of(xt)
336	}
337
338	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
339		self.pool.validated_pool().on_broadcasted(propagations)
340	}
341
342	fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
343		self.pool.validated_pool().ready_by_hash(hash)
344	}
345
346	fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> PolledIterator<PoolApi> {
347		let Ok(at) = self.api.resolve_block_number(at) else {
348			return async { Box::new(std::iter::empty()) as Box<_> }.boxed()
349		};
350
351		let status = self.status();
352		// If there are no transactions in the pool, it is fine to return early.
353		//
354		// There could be transaction being added because of some re-org happening at the relevant
355		// block, but this is relative unlikely.
356		if status.ready == 0 && status.future == 0 {
357			return async { Box::new(std::iter::empty()) as Box<_> }.boxed()
358		}
359
360		if self.ready_poll.lock().updated_at() >= at {
361			log::trace!(target: LOG_TARGET, "Transaction pool already processed block  #{}", at);
362			let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
363			return async move { iterator }.boxed()
364		}
365
366		self.ready_poll
367			.lock()
368			.add(at)
369			.map(|received| {
370				received.unwrap_or_else(|e| {
371					log::warn!(target: LOG_TARGET, "Error receiving pending set: {:?}", e);
372					Box::new(std::iter::empty())
373				})
374			})
375			.boxed()
376	}
377
378	fn ready(&self) -> ReadyIteratorFor<PoolApi> {
379		Box::new(self.pool.validated_pool().ready())
380	}
381
382	fn futures(&self) -> Vec<Self::InPoolTransaction> {
383		let pool = self.pool.validated_pool().pool.read();
384		pool.futures().cloned().collect::<Vec<_>>()
385	}
386
387	fn ready_at_with_timeout(
388		&self,
389		at: <Self::Block as BlockT>::Hash,
390		timeout: std::time::Duration,
391	) -> PolledIterator<PoolApi> {
392		self.ready_at_with_timeout_internal(at, timeout)
393	}
394}
395
396impl<Block, Client> BasicPool<FullChainApi<Client, Block>, Block>
397where
398	Block: BlockT,
399	Client: sp_api::ProvideRuntimeApi<Block>
400		+ sc_client_api::BlockBackend<Block>
401		+ sc_client_api::blockchain::HeaderBackend<Block>
402		+ sp_runtime::traits::BlockIdTo<Block>
403		+ sc_client_api::ExecutorProvider<Block>
404		+ sc_client_api::UsageProvider<Block>
405		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
406		+ Send
407		+ Sync
408		+ 'static,
409	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
410{
411	/// Create new basic transaction pool for a full node with the provided api.
412	pub fn new_full(
413		options: graph::Options,
414		is_validator: IsValidator,
415		prometheus: Option<&PrometheusRegistry>,
416		spawner: impl SpawnEssentialNamed,
417		client: Arc<Client>,
418	) -> Self {
419		let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
420		let pool = Self::with_revalidation_type(
421			options,
422			is_validator,
423			pool_api,
424			prometheus,
425			RevalidationType::Full,
426			spawner,
427			client.usage_info().chain.best_number,
428			client.usage_info().chain.best_hash,
429			client.usage_info().chain.finalized_hash,
430		);
431
432		pool
433	}
434}
435
436impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
437	for BasicPool<FullChainApi<Client, Block>, Block>
438where
439	Block: BlockT,
440	Client: sp_api::ProvideRuntimeApi<Block>
441		+ sc_client_api::BlockBackend<Block>
442		+ sc_client_api::blockchain::HeaderBackend<Block>
443		+ sp_runtime::traits::BlockIdTo<Block>
444		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
445	Client: Send + Sync + 'static,
446	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
447{
448	type Block = Block;
449	type Hash = graph::ExtrinsicHash<FullChainApi<Client, Block>>;
450	type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
451
452	fn submit_local(
453		&self,
454		at: Block::Hash,
455		xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
456	) -> Result<Self::Hash, Self::Error> {
457		use sp_runtime::{
458			traits::SaturatedConversion, transaction_validity::TransactionValidityError,
459		};
460
461		let validity = self
462			.api
463			.validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))?
464			.map_err(|e| {
465				Self::Error::Pool(match e {
466					TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
467					TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
468				})
469			})?;
470
471		let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
472		let block_number = self
473			.api
474			.block_id_to_number(&BlockId::hash(at))?
475			.ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;
476
477		let validated = ValidatedTransaction::valid_at(
478			block_number.saturated_into::<u64>(),
479			hash,
480			TransactionSource::Local,
481			Arc::from(xt),
482			bytes,
483			validity,
484		);
485
486		self.pool.validated_pool().submit(vec![validated]).remove(0)
487	}
488}
489
490#[cfg_attr(test, derive(Debug))]
491enum RevalidationStatus<N> {
492	/// The revalidation has never been completed.
493	NotScheduled,
494	/// The revalidation is scheduled.
495	Scheduled(Option<Instant>, Option<N>),
496	/// The revalidation is in progress.
497	InProgress,
498}
499
500enum RevalidationStrategy<N> {
501	Always,
502	Light(RevalidationStatus<N>),
503}
504
505struct RevalidationAction {
506	revalidate: bool,
507	resubmit: bool,
508}
509
510impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
511	pub fn clear(&mut self) {
512		if let Self::Light(status) = self {
513			status.clear()
514		}
515	}
516
517	pub fn next(
518		&mut self,
519		block: N,
520		revalidate_time_period: Option<std::time::Duration>,
521		revalidate_block_period: Option<N>,
522	) -> RevalidationAction {
523		match self {
524			Self::Light(status) => RevalidationAction {
525				revalidate: status.next_required(
526					block,
527					revalidate_time_period,
528					revalidate_block_period,
529				),
530				resubmit: false,
531			},
532			Self::Always => RevalidationAction { revalidate: true, resubmit: true },
533		}
534	}
535}
536
537impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
538	/// Called when revalidation is completed.
539	pub fn clear(&mut self) {
540		*self = Self::NotScheduled;
541	}
542
543	/// Returns true if revalidation is required.
544	pub fn next_required(
545		&mut self,
546		block: N,
547		revalidate_time_period: Option<std::time::Duration>,
548		revalidate_block_period: Option<N>,
549	) -> bool {
550		match *self {
551			Self::NotScheduled => {
552				*self = Self::Scheduled(
553					revalidate_time_period.map(|period| Instant::now() + period),
554					revalidate_block_period.map(|period| block + period),
555				);
556				false
557			},
558			Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
559				let is_required =
560					revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) ||
561						revalidate_at_block.map(|at| block >= at).unwrap_or(false);
562				if is_required {
563					*self = Self::InProgress;
564				}
565				is_required
566			},
567			Self::InProgress => false,
568		}
569	}
570}
571
572/// Prune the known txs for the given block.
573pub async fn prune_known_txs_for_block<Block: BlockT, Api: graph::ChainApi<Block = Block>>(
574	at: &HashAndNumber<Block>,
575	api: &Api,
576	pool: &graph::Pool<Api>,
577) -> Vec<ExtrinsicHash<Api>> {
578	let extrinsics = api
579		.block_body(at.hash)
580		.await
581		.unwrap_or_else(|e| {
582			log::warn!(target: LOG_TARGET, "Prune known transactions: error request: {}", e);
583			None
584		})
585		.unwrap_or_default();
586
587	let hashes = extrinsics.iter().map(|tx| pool.hash_of(tx)).collect::<Vec<_>>();
588
589	let header = match api.block_header(at.hash) {
590		Ok(Some(h)) => h,
591		Ok(None) => {
592			log::trace!(target: LOG_TARGET, "Could not find header for {:?}.", at.hash);
593			return hashes
594		},
595		Err(e) => {
596			log::trace!(target: LOG_TARGET, "Error retrieving header for {:?}: {}", at.hash, e);
597			return hashes
598		},
599	};
600
601	log_xt_trace!(target: LOG_TARGET, &hashes, "[{:?}] Pruning transaction.");
602
603	pool.prune(at, *header.parent_hash(), &extrinsics).await;
604	hashes
605}
606
607impl<PoolApi, Block> BasicPool<PoolApi, Block>
608where
609	Block: BlockT,
610	PoolApi: 'static + graph::ChainApi<Block = Block>,
611{
612	/// Handles enactment and retraction of blocks, prunes stale transactions
613	/// (that have already been enacted) and resubmits transactions that were
614	/// retracted.
615	async fn handle_enactment(&self, tree_route: TreeRoute<Block>) {
616		log::trace!(target: LOG_TARGET, "handle_enactment tree_route: {tree_route:?}");
617		let pool = self.pool.clone();
618		let api = self.api.clone();
619
620		let hash_and_number = match tree_route.last() {
621			Some(hash_and_number) => hash_and_number,
622			None => {
623				log::warn!(
624					target: LOG_TARGET,
625					"Skipping ChainEvent - no last block in tree route {:?}",
626					tree_route,
627				);
628				return
629			},
630		};
631
632		let next_action = self.revalidation_strategy.lock().next(
633			hash_and_number.number,
634			Some(std::time::Duration::from_secs(60)),
635			Some(20u32.into()),
636		);
637
638		// We keep track of everything we prune so that later we won't add
639		// transactions with those hashes from the retracted blocks.
640		let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
641
642		// If there is a tree route, we use this to prune known tx based on the enacted
643		// blocks. Before pruning enacted transactions, we inform the listeners about
644		// retracted blocks and their transactions. This order is important, because
645		// if we enact and retract the same transaction at the same time, we want to
646		// send first the retract and then the prune event.
647		for retracted in tree_route.retracted() {
648			// notify txs awaiting finality that it has been retracted
649			pool.validated_pool().on_block_retracted(retracted.hash);
650		}
651
652		future::join_all(
653			tree_route.enacted().iter().map(|h| prune_known_txs_for_block(h, &*api, &*pool)),
654		)
655		.await
656		.into_iter()
657		.for_each(|enacted_log| {
658			pruned_log.extend(enacted_log);
659		});
660
661		self.metrics
662			.report(|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64));
663
664		if next_action.resubmit {
665			let mut resubmit_transactions = Vec::new();
666
667			for retracted in tree_route.retracted() {
668				let hash = retracted.hash;
669
670				let block_transactions = api
671					.block_body(hash)
672					.await
673					.unwrap_or_else(|e| {
674						log::warn!(target: LOG_TARGET, "Failed to fetch block body: {}", e);
675						None
676					})
677					.unwrap_or_default()
678					.into_iter();
679
680				let mut resubmitted_to_report = 0;
681
682				resubmit_transactions.extend(
683					//todo: arctx - we need to get ref from somewhere
684					block_transactions.into_iter().map(Arc::from).filter(|tx| {
685						let tx_hash = pool.hash_of(tx);
686						let contains = pruned_log.contains(&tx_hash);
687
688						// need to count all transactions, not just filtered, here
689						resubmitted_to_report += 1;
690
691						if !contains {
692							log::trace!(
693								target: LOG_TARGET,
694								"[{:?}]: Resubmitting from retracted block {:?}",
695								tx_hash,
696								hash,
697							);
698						}
699						!contains
700					}),
701				);
702
703				self.metrics.report(|metrics| {
704					metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
705				});
706			}
707
708			pool.resubmit_at(
709				&hash_and_number,
710				// These transactions are coming from retracted blocks, we should
711				// simply consider them external.
712				TransactionSource::External,
713				resubmit_transactions,
714			)
715			.await;
716		}
717
718		let extra_pool = pool.clone();
719		// After #5200 lands, this arguably might be moved to the
720		// handler of "all blocks notification".
721		self.ready_poll
722			.lock()
723			.trigger(hash_and_number.number, move || Box::new(extra_pool.validated_pool().ready()));
724
725		if next_action.revalidate {
726			let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
727			self.revalidation_queue.revalidate_later(hash_and_number.hash, hashes).await;
728
729			self.revalidation_strategy.lock().clear();
730		}
731	}
732}
733
734#[async_trait]
735impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
736where
737	Block: BlockT,
738	PoolApi: 'static + graph::ChainApi<Block = Block>,
739{
740	async fn maintain(&self, event: ChainEvent<Self::Block>) {
741		let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
742		let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
743			match self.api.tree_route(from, to) {
744				Ok(tree_route) => Ok(tree_route),
745				Err(e) =>
746					return Err(format!(
747						"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
748					)),
749			}
750		};
751		let block_id_to_number =
752			|hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
753
754		let result =
755			self.enactment_state
756				.lock()
757				.update(&event, &compute_tree_route, &block_id_to_number);
758
759		match result {
760			Err(msg) => {
761				log::trace!(target: LOG_TARGET, "{msg}");
762				self.enactment_state.lock().force_update(&event);
763			},
764			Ok(EnactmentAction::Skip) => return,
765			Ok(EnactmentAction::HandleFinalization) => {},
766			Ok(EnactmentAction::HandleEnactment(tree_route)) => {
767				self.handle_enactment(tree_route).await;
768			},
769		};
770
771		if let ChainEvent::Finalized { hash, tree_route } = event {
772			log::trace!(
773				target: LOG_TARGET,
774				"on-finalized enacted: {tree_route:?}, previously finalized: \
775				{prev_finalized_block:?}",
776			);
777
778			for hash in tree_route.iter().chain(std::iter::once(&hash)) {
779				if let Err(e) = self.pool.validated_pool().on_block_finalized(*hash).await {
780					log::warn!(
781						target: LOG_TARGET,
782						"Error occurred while attempting to notify watchers about finalization {}: {}",
783						hash, e
784					)
785				}
786			}
787		}
788	}
789}