sc_transaction_pool/fork_aware_txpool/
fork_aware_txpool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate fork-aware transaction pool implementation.
20
21use super::{
22	dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
23	import_notification_sink::MultiViewImportNotificationSink,
24	metrics::MetricsLink as PrometheusMetrics,
25	multi_view_listener::MultiViewListener,
26	tx_mem_pool::{TxInMemPool, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
27	view::View,
28	view_store::ViewStore,
29};
30use crate::{
31	api::FullChainApi,
32	common::log_xt::log_xt_trace,
33	enactment_state::{EnactmentAction, EnactmentState},
34	fork_aware_txpool::revalidation_worker,
35	graph::{self, base_pool::Transaction, ExtrinsicFor, ExtrinsicHash, IsValidator, Options},
36	PolledIterator, ReadyIteratorFor, LOG_TARGET,
37};
38use async_trait::async_trait;
39use futures::{
40	channel::oneshot,
41	future::{self},
42	prelude::*,
43	FutureExt,
44};
45use parking_lot::Mutex;
46use prometheus_endpoint::Registry as PrometheusRegistry;
47use sc_transaction_pool_api::{
48	error::{Error, IntoPoolError},
49	ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus,
50	TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
51};
52use sp_blockchain::{HashAndNumber, TreeRoute};
53use sp_core::traits::SpawnEssentialNamed;
54use sp_runtime::{
55	generic::BlockId,
56	traits::{Block as BlockT, NumberFor},
57};
58use std::{
59	collections::{HashMap, HashSet},
60	pin::Pin,
61	sync::Arc,
62	time::Instant,
63};
64use tokio::select;
65
66/// Fork aware transaction pool task, that needs to be polled.
67pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
68
69/// A structure that maintains a collection of pollers associated with specific block hashes
70/// (views).
71struct ReadyPoll<T, Block>
72where
73	Block: BlockT,
74{
75	pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
76}
77
78impl<T, Block> ReadyPoll<T, Block>
79where
80	Block: BlockT,
81{
82	/// Creates a new `ReadyPoll` instance with an empty collection of pollers.
83	fn new() -> Self {
84		Self { pollers: Default::default() }
85	}
86
87	/// Adds a new poller for a specific block hash and returns the `Receiver` end of the created
88	/// oneshot channel which will be used to deliver polled result.
89	fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
90		let (s, r) = oneshot::channel();
91		self.pollers.entry(at).or_default().push(s);
92		r
93	}
94
95	/// Triggers all pollers associated with a specific block by sending the polled result through
96	/// each oneshot channel.
97	///
98	/// `ready_iterator` is a closure that generates the result data to be sent to the pollers.
99	fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
100		log::trace!(target: LOG_TARGET, "fatp::trigger {at:?} pending keys: {:?}", self.pollers.keys());
101		let Some(pollers) = self.pollers.remove(&at) else { return };
102		pollers.into_iter().for_each(|p| {
103			log::debug!(target: LOG_TARGET, "trigger ready signal at block {}", at);
104			let _ = p.send(ready_iterator());
105		});
106	}
107
108	/// Removes pollers that have their oneshot channels cancelled.
109	fn remove_cancelled(&mut self) {
110		self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
111	}
112}
113
114/// The fork-aware transaction pool.
115///
116/// It keeps track of every fork and provides the set of transactions that is valid for every fork.
117pub struct ForkAwareTxPool<ChainApi, Block>
118where
119	Block: BlockT,
120	ChainApi: graph::ChainApi<Block = Block> + 'static,
121{
122	/// The reference to the `ChainApi` provided by client/backend.
123	api: Arc<ChainApi>,
124
125	/// Intermediate buffer for the incoming transaction.
126	mempool: Arc<TxMemPool<ChainApi, Block>>,
127
128	/// The store for all the views.
129	view_store: Arc<ViewStore<ChainApi, Block>>,
130
131	/// Utility for managing pollers of `ready_at` future.
132	ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
133
134	/// Prometheus's metrics endpoint.
135	metrics: PrometheusMetrics,
136
137	/// Util tracking best and finalized block.
138	enactment_state: Arc<Mutex<EnactmentState<Block>>>,
139
140	/// The channel allowing to send revalidation jobs to the background thread.
141	revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
142
143	/// Util providing an aggregated stream of transactions that were imported to ready queue in
144	/// any view.
145	import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
146
147	/// Externally provided pool options.
148	options: Options,
149
150	/// Is node the validator.
151	is_validator: IsValidator,
152}
153
154impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
155where
156	Block: BlockT,
157	ChainApi: graph::ChainApi<Block = Block> + 'static,
158	<Block as BlockT>::Hash: Unpin,
159{
160	/// Create new fork aware transaction pool with provided shared instance of `ChainApi` intended
161	/// for tests.
162	pub fn new_test(
163		pool_api: Arc<ChainApi>,
164		best_block_hash: Block::Hash,
165		finalized_hash: Block::Hash,
166	) -> (Self, ForkAwareTxPoolTask) {
167		Self::new_test_with_limits(
168			pool_api,
169			best_block_hash,
170			finalized_hash,
171			Options::default().ready,
172			Options::default().future,
173			usize::MAX,
174		)
175	}
176
177	/// Create new fork aware transaction pool with given limits and with provided shared instance
178	/// of `ChainApi` intended for tests.
179	pub fn new_test_with_limits(
180		pool_api: Arc<ChainApi>,
181		best_block_hash: Block::Hash,
182		finalized_hash: Block::Hash,
183		ready_limits: crate::PoolLimit,
184		future_limits: crate::PoolLimit,
185		mempool_max_transactions_count: usize,
186	) -> (Self, ForkAwareTxPoolTask) {
187		let listener = Arc::from(MultiViewListener::new());
188		let (import_notification_sink, import_notification_sink_task) =
189			MultiViewImportNotificationSink::new_with_worker();
190
191		let mempool = Arc::from(TxMemPool::new(
192			pool_api.clone(),
193			listener.clone(),
194			Default::default(),
195			mempool_max_transactions_count,
196		));
197
198		let (dropped_stream_controller, dropped_stream) =
199			MultiViewDroppedWatcherController::<ChainApi>::new();
200		let dropped_monitor_task = Self::dropped_monitor_task(
201			dropped_stream,
202			mempool.clone(),
203			import_notification_sink.clone(),
204		);
205
206		let combined_tasks = async move {
207			tokio::select! {
208				_ = import_notification_sink_task => {},
209				_ = dropped_monitor_task => {}
210			}
211		}
212		.boxed();
213
214		let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
215
216		(
217			Self {
218				mempool,
219				api: pool_api.clone(),
220				view_store: Arc::new(ViewStore::new(pool_api, listener, dropped_stream_controller)),
221				ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
222				enactment_state: Arc::new(Mutex::new(EnactmentState::new(
223					best_block_hash,
224					finalized_hash,
225				))),
226				revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
227				import_notification_sink,
228				options,
229				is_validator: false.into(),
230				metrics: Default::default(),
231			},
232			combined_tasks,
233		)
234	}
235
236	/// Monitors the stream of dropped transactions and removes them from the mempool.
237	///
238	/// This asynchronous task continuously listens for dropped transaction notifications provided
239	/// within `dropped_stream` and ensures that these transactions are removed from the `mempool`
240	/// and `import_notification_sink` instances.
241	async fn dropped_monitor_task(
242		mut dropped_stream: StreamOfDropped<ChainApi>,
243		mempool: Arc<TxMemPool<ChainApi, Block>>,
244		import_notification_sink: MultiViewImportNotificationSink<
245			Block::Hash,
246			ExtrinsicHash<ChainApi>,
247		>,
248	) {
249		loop {
250			let Some(dropped) = dropped_stream.next().await else {
251				log::debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
252				break;
253			};
254			log::trace!(target: LOG_TARGET, "[{:?}] fatp::dropped notification, removing", dropped);
255			mempool.remove_dropped_transactions(&[dropped]).await;
256			import_notification_sink.clean_notified_items(&[dropped]);
257		}
258	}
259
260	/// Creates new fork aware transaction pool with the background revalidation worker.
261	///
262	/// The txpool essential tasks (including a revalidation worker) are spawned using provided
263	/// spawner.
264	pub fn new_with_background_worker(
265		options: Options,
266		is_validator: IsValidator,
267		pool_api: Arc<ChainApi>,
268		prometheus: Option<&PrometheusRegistry>,
269		spawner: impl SpawnEssentialNamed,
270		best_block_hash: Block::Hash,
271		finalized_hash: Block::Hash,
272	) -> Self {
273		let metrics = PrometheusMetrics::new(prometheus);
274		let listener = Arc::from(MultiViewListener::new());
275		let (revalidation_queue, revalidation_task) =
276			revalidation_worker::RevalidationQueue::new_with_worker();
277
278		let (import_notification_sink, import_notification_sink_task) =
279			MultiViewImportNotificationSink::new_with_worker();
280
281		let mempool = Arc::from(TxMemPool::new(
282			pool_api.clone(),
283			listener.clone(),
284			metrics.clone(),
285			TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * (options.ready.count + options.future.count),
286		));
287
288		let (dropped_stream_controller, dropped_stream) =
289			MultiViewDroppedWatcherController::<ChainApi>::new();
290		let dropped_monitor_task = Self::dropped_monitor_task(
291			dropped_stream,
292			mempool.clone(),
293			import_notification_sink.clone(),
294		);
295
296		let combined_tasks = async move {
297			tokio::select! {
298				_ = revalidation_task => {},
299				_ = import_notification_sink_task => {},
300				_ = dropped_monitor_task => {}
301			}
302		}
303		.boxed();
304		spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
305
306		Self {
307			mempool,
308			api: pool_api.clone(),
309			view_store: Arc::new(ViewStore::new(pool_api, listener, dropped_stream_controller)),
310			ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
311			enactment_state: Arc::new(Mutex::new(EnactmentState::new(
312				best_block_hash,
313				finalized_hash,
314			))),
315			revalidation_queue: Arc::from(revalidation_queue),
316			import_notification_sink,
317			options,
318			metrics,
319			is_validator,
320		}
321	}
322
323	/// Get access to the underlying api
324	pub fn api(&self) -> &ChainApi {
325		&self.api
326	}
327
328	/// Provides a status for all views at the tips of the forks.
329	pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
330		self.view_store.status()
331	}
332
333	/// Provides a number of views at the tips of the forks.
334	pub fn active_views_count(&self) -> usize {
335		self.view_store.active_views.read().len()
336	}
337
338	/// Provides a number of views at the tips of the forks.
339	pub fn inactive_views_count(&self) -> usize {
340		self.view_store.inactive_views.read().len()
341	}
342
343	/// Provides internal views statistics.
344	///
345	/// Provides block number, count of ready, count of future transactions for every view. It is
346	/// suitable for printing log information.
347	fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
348		self.view_store
349			.active_views
350			.read()
351			.iter()
352			.map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
353			.collect()
354	}
355
356	/// Checks if there is a view at the tip of the fork with given hash.
357	pub fn has_view(&self, hash: &Block::Hash) -> bool {
358		self.view_store.active_views.read().contains_key(hash)
359	}
360
361	/// Returns a number of unwatched and watched transactions in internal mempool.
362	///
363	/// Intended for use in unit tests.
364	pub fn mempool_len(&self) -> (usize, usize) {
365		self.mempool.unwatched_and_watched_count()
366	}
367
368	/// Returns a best-effort set of ready transactions for a given block, without executing full
369	/// maintain process.
370	///
371	/// The method attempts to build a temporary view and create an iterator of ready transactions
372	/// for a specific `at` hash. If a valid view is found, it collects and prunes
373	/// transactions already included in the blocks and returns the valid set.
374	///
375	/// Pruning is just rebuilding the underlying transactions graph, no validations are executed,
376	/// so this process shall be fast.
377	pub fn ready_at_light(&self, at: Block::Hash) -> PolledIterator<ChainApi> {
378		let start = Instant::now();
379		let api = self.api.clone();
380		log::trace!(target: LOG_TARGET, "fatp::ready_at_light {:?}", at);
381
382		let Ok(block_number) = self.api.resolve_block_number(at) else {
383			let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
384			return Box::pin(async { empty })
385		};
386
387		let best_result = {
388			api.tree_route(self.enactment_state.lock().recent_finalized_block(), at).map(
389				|tree_route| {
390					if let Some((index, view)) =
391						tree_route.enacted().iter().enumerate().rev().skip(1).find_map(|(i, b)| {
392							self.view_store.get_view_at(b.hash, true).map(|(view, _)| (i, view))
393						}) {
394						let e = tree_route.enacted()[index..].to_vec();
395						(TreeRoute::new(e, 0).ok(), Some(view))
396					} else {
397						(None, None)
398					}
399				},
400			)
401		};
402
403		Box::pin(async move {
404			if let Ok((Some(best_tree_route), Some(best_view))) = best_result {
405				let tmp_view: View<ChainApi> = View::new_from_other(
406					&best_view,
407					&HashAndNumber { hash: at, number: block_number },
408				);
409
410				let mut all_extrinsics = vec![];
411
412				for h in best_tree_route.enacted() {
413					let extrinsics = api
414						.block_body(h.hash)
415						.await
416						.unwrap_or_else(|e| {
417							log::warn!(target: LOG_TARGET, "Compute ready light transactions: error request: {}", e);
418							None
419						})
420						.unwrap_or_default()
421						.into_iter()
422						.map(|t| api.hash_and_length(&t).0);
423					all_extrinsics.extend(extrinsics);
424				}
425
426				let before_count = tmp_view.pool.validated_pool().status().ready;
427				let tags = tmp_view
428					.pool
429					.validated_pool()
430					.extrinsics_tags(&all_extrinsics)
431					.into_iter()
432					.flatten()
433					.flatten()
434					.collect::<Vec<_>>();
435				let _ = tmp_view.pool.validated_pool().prune_tags(tags);
436
437				let after_count = tmp_view.pool.validated_pool().status().ready;
438				log::debug!(target: LOG_TARGET,
439					"fatp::ready_at_light {} from {} before: {} to be removed: {} after: {} took:{:?}",
440					at,
441					best_view.at.hash,
442					before_count,
443					all_extrinsics.len(),
444					after_count,
445					start.elapsed()
446				);
447				Box::new(tmp_view.pool.validated_pool().ready())
448			} else {
449				let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
450				log::debug!(target: LOG_TARGET, "fatp::ready_at_light {} -> empty, took:{:?}", at, start.elapsed());
451				empty
452			}
453		})
454	}
455
456	/// Waits for the set of ready transactions for a given block up to a specified timeout.
457	///
458	/// This method combines two futures:
459	/// - The `ready_at` future, which waits for the ready transactions resulting from the full
460	/// maintenance process to be available.
461	/// - The `ready_at_light` future, used as a fallback if the timeout expires before `ready_at`
462	/// completes. This provides a best-effort, ready set of transactions as a result light
463	/// maintain.
464	///
465	/// Returns a future resolving to a ready iterator of transactions.
466	fn ready_at_with_timeout_internal(
467		&self,
468		at: Block::Hash,
469		timeout: std::time::Duration,
470	) -> PolledIterator<ChainApi> {
471		log::debug!(target: LOG_TARGET, "fatp::ready_at_with_timeout at {:?} allowed delay: {:?}", at, timeout);
472
473		let timeout = futures_timer::Delay::new(timeout);
474		let (view_already_exists, ready_at) = self.ready_at_internal(at);
475
476		if view_already_exists {
477			return ready_at;
478		}
479
480		let maybe_ready = async move {
481			select! {
482				ready = ready_at => Some(ready),
483				_ = timeout => {
484					log::warn!(target: LOG_TARGET,
485						"Timeout fired waiting for transaction pool at block: ({:?}). \
486						Proceeding with production.",
487						at,
488					);
489					None
490				}
491			}
492		};
493
494		let fall_back_ready = self.ready_at_light(at);
495		Box::pin(async {
496			let (maybe_ready, fall_back_ready) =
497				futures::future::join(maybe_ready.boxed(), fall_back_ready.boxed()).await;
498			maybe_ready.unwrap_or(fall_back_ready)
499		})
500	}
501
502	fn ready_at_internal(&self, at: Block::Hash) -> (bool, PolledIterator<ChainApi>) {
503		let mut ready_poll = self.ready_poll.lock();
504
505		if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
506			log::debug!(target: LOG_TARGET, "fatp::ready_at {at:?} (inactive:{inactive:?})");
507			let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
508			return (true, async move { iterator }.boxed());
509		}
510
511		let pending = ready_poll
512			.add(at)
513			.map(|received| {
514				received.unwrap_or_else(|e| {
515					log::warn!(target: LOG_TARGET, "Error receiving ready-set iterator: {:?}", e);
516					Box::new(std::iter::empty())
517				})
518			})
519			.boxed();
520		log::debug!(target: LOG_TARGET,
521			"fatp::ready_at {at:?} pending keys: {:?}",
522			ready_poll.pollers.keys()
523		);
524		(false, pending)
525	}
526}
527
528/// Converts the input view-to-statuses map into the output vector of statuses.
529///
530/// The result of importing a bunch of transactions into a single view is the vector of statuses.
531/// Every item represents a status for single transaction. The input is the map that associates
532/// hash-views with vectors indicating the statuses of transactions imports.
533///
534/// Import to multiple views result in two-dimensional array of statuses, which is provided as
535/// input map.
536///
537/// This function converts the map into the vec of results, according to the following rules:
538/// - for given transaction if at least one status is success, then output vector contains success,
539/// - if given transaction status is error for every view, then output vector contains error.
540///
541/// The results for transactions are in the same order for every view. An output vector preserves
542/// this order.
543///
544/// ```skip
545/// in:
546/// view  |   xt0 status | xt1 status | xt2 status
547/// h1   -> [ Ok(xth0),    Ok(xth1),    Err       ]
548/// h2   -> [ Ok(xth0),    Err,         Err       ]
549/// h3   -> [ Ok(xth0),    Ok(xth1),    Err       ]
550///
551/// out:
552/// [ Ok(xth0), Ok(xth1), Err ]
553/// ```
554fn reduce_multiview_result<H, E>(input: HashMap<H, Vec<Result<H, E>>>) -> Vec<Result<H, E>> {
555	let mut values = input.values();
556	let Some(first) = values.next() else {
557		return Default::default();
558	};
559	let length = first.len();
560	debug_assert!(values.all(|x| length == x.len()));
561
562	input
563		.into_values()
564		.reduce(|mut agg_results, results| {
565			agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
566				if agg_r.is_err() {
567					*agg_r = r;
568				}
569			});
570			agg_results
571		})
572		.unwrap_or_default()
573}
574
575impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
576where
577	Block: BlockT,
578	ChainApi: 'static + graph::ChainApi<Block = Block>,
579	<Block as BlockT>::Hash: Unpin,
580{
581	type Block = ChainApi::Block;
582	type Hash = ExtrinsicHash<ChainApi>;
583	type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
584	type Error = ChainApi::Error;
585
586	/// Submits multiple transactions and returns a future resolving to the submission results.
587	///
588	/// Actual transactions submission process is delegated to the `ViewStore` internal instance.
589	///
590	/// The internal limits of the pool are checked. The results of submissions to individual views
591	/// are reduced to single result. Refer to `reduce_multiview_result` for more details.
592	fn submit_at(
593		&self,
594		_: <Self::Block as BlockT>::Hash,
595		source: TransactionSource,
596		xts: Vec<TransactionFor<Self>>,
597	) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
598		let view_store = self.view_store.clone();
599		log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count());
600		log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at");
601		let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
602		let mempool_result = self.mempool.extend_unwatched(source, &xts);
603
604		if view_store.is_empty() {
605			return future::ready(Ok(mempool_result)).boxed()
606		}
607
608		let (hashes, to_be_submitted): (Vec<TxHash<Self>>, Vec<ExtrinsicFor<ChainApi>>) =
609			mempool_result
610				.iter()
611				.zip(xts)
612				.filter_map(|(result, xt)| result.as_ref().ok().map(|xt_hash| (xt_hash, xt)))
613				.unzip();
614
615		self.metrics
616			.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
617
618		let mempool = self.mempool.clone();
619		async move {
620			let results_map = view_store.submit(source, to_be_submitted.into_iter(), hashes).await;
621			let mut submission_results = reduce_multiview_result(results_map).into_iter();
622
623			Ok(mempool_result
624				.into_iter()
625				.map(|result| {
626					result.and_then(|xt_hash| {
627						let result = submission_results
628							.next()
629							.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.");
630						result.or_else(|error| {
631							let error = error.into_pool_error();
632							match error {
633								Ok(
634									// The transaction is still in mempool it may get included into the view for the next block.
635									Error::ImmediatelyDropped
636								) => Ok(xt_hash),
637								Ok(e) => {
638									mempool.remove(xt_hash);
639									Err(e.into())
640								},
641								Err(e) => Err(e),
642							}
643						})
644					})
645				})
646				.collect::<Vec<_>>())
647		}
648		.boxed()
649	}
650
651	/// Submits a single transaction and returns a future resolving to the submission results.
652	///
653	/// Actual transaction submission process is delegated to the `submit_at` function.
654	fn submit_one(
655		&self,
656		_at: <Self::Block as BlockT>::Hash,
657		source: TransactionSource,
658		xt: TransactionFor<Self>,
659	) -> PoolFuture<TxHash<Self>, Self::Error> {
660		log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_one views:{}", self.tx_hash(&xt), self.active_views_count());
661		let result_future = self.submit_at(_at, source, vec![xt]);
662		async move {
663			let result = result_future.await;
664			match result {
665				Ok(mut v) =>
666					v.pop().expect("There is exactly one element in result of submit_at. qed."),
667				Err(e) => Err(e),
668			}
669		}
670		.boxed()
671	}
672
673	/// Submits a transaction and starts to watch its progress in the pool, returning a stream of
674	/// status updates.
675	///
676	/// Actual transaction submission process is delegated to the `ViewStore` internal instance.
677	fn submit_and_watch(
678		&self,
679		at: <Self::Block as BlockT>::Hash,
680		source: TransactionSource,
681		xt: TransactionFor<Self>,
682	) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
683		log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count());
684		let xt = Arc::from(xt);
685		let xt_hash = match self.mempool.push_watched(source, xt.clone()) {
686			Ok(xt_hash) => xt_hash,
687			Err(e) => return future::ready(Err(e)).boxed(),
688		};
689
690		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
691
692		let view_store = self.view_store.clone();
693		let mempool = self.mempool.clone();
694		async move {
695			let result = view_store.submit_and_watch(at, source, xt).await;
696			let result = result.or_else(|(e, maybe_watcher)| {
697				let error = e.into_pool_error();
698				match (error, maybe_watcher) {
699					(
700						Ok(
701							// The transaction is still in mempool it may get included into the
702							// view for the next block.
703							Error::ImmediatelyDropped,
704						),
705						Some(watcher),
706					) => Ok(watcher),
707					(Ok(e), _) => {
708						mempool.remove(xt_hash);
709						Err(e.into())
710					},
711					(Err(e), _) => Err(e),
712				}
713			});
714			result
715		}
716		.boxed()
717	}
718
719	/// Intended to remove transactions identified by the given hashes, and any dependent
720	/// transactions, from the pool. In current implementation this function only outputs the error.
721	/// Seems that API change is needed here to make this call reasonable.
722	// todo [#5491]: api change? we need block hash here (assuming we need it at all - could be
723	// useful for verification for debugging purposes).
724	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
725		if !hashes.is_empty() {
726			log::debug!(target: LOG_TARGET, "fatp::remove_invalid {}", hashes.len());
727			log_xt_trace!(target:LOG_TARGET, hashes, "[{:?}] fatp::remove_invalid");
728			self.metrics
729				.report(|metrics| metrics.removed_invalid_txs.inc_by(hashes.len() as _));
730		}
731		Default::default()
732	}
733
734	// todo [#5491]: api change?
735	// status(Hash) -> Option<PoolStatus>
736	/// Returns the pool status which includes information like the number of ready and future
737	/// transactions.
738	///
739	/// Currently the status for the most recently notified best block is returned (for which
740	/// maintain process was accomplished).
741	fn status(&self) -> PoolStatus {
742		self.view_store
743			.most_recent_view
744			.read()
745			.map(|hash| self.view_store.status()[&hash].clone())
746			.unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
747	}
748
749	/// Return an event stream of notifications when transactions are imported to the pool.
750	///
751	/// Consumers of this stream should use the `ready` method to actually get the
752	/// pending transactions in the right order.
753	fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
754		self.import_notification_sink.event_stream()
755	}
756
757	/// Returns the hash of a given transaction.
758	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
759		self.api().hash_and_length(xt).0
760	}
761
762	/// Notifies the pool about the broadcasting status of transactions.
763	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
764		self.view_store.listener.transactions_broadcasted(propagations);
765	}
766
767	/// Return specific ready transaction by hash, if there is one.
768	///
769	/// Currently the ready transaction is returned if it exists for the most recently notified best
770	/// block (for which maintain process was accomplished).
771	// todo [#5491]: api change: we probably should have at here?
772	fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
773		let most_recent_view = self.view_store.most_recent_view.read();
774		let result = most_recent_view
775			.map(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash))
776			.flatten();
777		log::trace!(
778			target: LOG_TARGET,
779			"[{tx_hash:?}] ready_transaction: {} {:?}",
780			result.is_some(),
781			most_recent_view
782		);
783		result
784	}
785
786	/// Returns an iterator for ready transactions at a specific block, ordered by priority.
787	fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> PolledIterator<ChainApi> {
788		let (_, result) = self.ready_at_internal(at);
789		result
790	}
791
792	/// Returns an iterator for ready transactions, ordered by priority.
793	///
794	/// Currently the set of ready transactions is returned if it exists for the most recently
795	/// notified best block (for which maintain process was accomplished).
796	fn ready(&self) -> ReadyIteratorFor<ChainApi> {
797		self.view_store.ready()
798	}
799
800	/// Returns a list of future transactions in the pool.
801	///
802	/// Currently the set of future transactions is returned if it exists for the most recently
803	/// notified best block (for which maintain process was accomplished).
804	fn futures(&self) -> Vec<Self::InPoolTransaction> {
805		self.view_store.futures()
806	}
807
808	/// Returns a set of ready transactions at a given block within the specified timeout.
809	///
810	/// If the timeout expires before the maintain process is accomplished, a best-effort
811	/// set of transactions is returned (refer to `ready_at_light`).
812	fn ready_at_with_timeout(
813		&self,
814		at: <Self::Block as BlockT>::Hash,
815		timeout: std::time::Duration,
816	) -> PolledIterator<ChainApi> {
817		self.ready_at_with_timeout_internal(at, timeout)
818	}
819}
820
821impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
822	for ForkAwareTxPool<FullChainApi<Client, Block>, Block>
823where
824	Block: BlockT,
825	<Block as BlockT>::Hash: Unpin,
826	Client: sp_api::ProvideRuntimeApi<Block>
827		+ sc_client_api::BlockBackend<Block>
828		+ sc_client_api::blockchain::HeaderBackend<Block>
829		+ sp_runtime::traits::BlockIdTo<Block>
830		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
831	Client: Send + Sync + 'static,
832	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
833{
834	type Block = Block;
835	type Hash = ExtrinsicHash<FullChainApi<Client, Block>>;
836	type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
837
838	fn submit_local(
839		&self,
840		_at: Block::Hash,
841		xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
842	) -> Result<Self::Hash, Self::Error> {
843		log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count());
844		let xt = Arc::from(xt);
845		let result = self
846			.mempool
847			.extend_unwatched(TransactionSource::Local, &[xt.clone()])
848			.remove(0)?;
849
850		self.view_store.submit_local(xt).or_else(|_| Ok(result))
851	}
852}
853
854impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
855where
856	Block: BlockT,
857	ChainApi: graph::ChainApi<Block = Block> + 'static,
858	<Block as BlockT>::Hash: Unpin,
859{
860	/// Handles a new block notification.
861	///
862	/// It is responsible for handling a newly notified block. It executes some sanity checks, find
863	/// the best view to clone from and executes the new view build procedure for the notified
864	/// block.
865	///
866	/// If the view is correctly created, `ready_at` pollers for this block will be triggered.
867	async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
868		let hash_and_number = match tree_route.last() {
869			Some(hash_and_number) => hash_and_number,
870			None => {
871				log::warn!(
872					target: LOG_TARGET,
873					"Skipping ChainEvent - no last block in tree route {:?}",
874					tree_route,
875				);
876				return
877			},
878		};
879
880		if self.has_view(&hash_and_number.hash) {
881			log::trace!(
882				target: LOG_TARGET,
883				"view already exists for block: {:?}",
884				hash_and_number,
885			);
886			return
887		}
888
889		let best_view = self.view_store.find_best_view(tree_route);
890		let new_view = self.build_new_view(best_view, hash_and_number, tree_route).await;
891
892		if let Some(view) = new_view {
893			{
894				let view = view.clone();
895				self.ready_poll.lock().trigger(hash_and_number.hash, move || {
896					Box::from(view.pool.validated_pool().ready())
897				});
898			}
899
900			View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
901		}
902	}
903
904	/// Builds a new view.
905	///
906	/// If `origin_view` is provided, the new view will be cloned from it. Otherwise an empty view
907	/// will be created.
908	///
909	/// The new view will be updated with transactions from the tree_route and the mempool, all
910	/// required events will be triggered, it will be inserted to the view store.
911	///
912	/// This method will also update multi-view listeners with newly created view.
913	async fn build_new_view(
914		&self,
915		origin_view: Option<Arc<View<ChainApi>>>,
916		at: &HashAndNumber<Block>,
917		tree_route: &TreeRoute<Block>,
918	) -> Option<Arc<View<ChainApi>>> {
919		log::debug!(
920			target: LOG_TARGET,
921			"build_new_view: for: {:?} from: {:?} tree_route: {:?}",
922			at,
923			origin_view.as_ref().map(|v| v.at.clone()),
924			tree_route
925		);
926		let mut view = if let Some(origin_view) = origin_view {
927			let mut view = View::new_from_other(&origin_view, at);
928			if !tree_route.retracted().is_empty() {
929				view.pool.clear_recently_pruned();
930			}
931			view
932		} else {
933			log::debug!(target: LOG_TARGET, "creating non-cloned view: for: {at:?}");
934			View::new(
935				self.api.clone(),
936				at.clone(),
937				self.options.clone(),
938				self.metrics.clone(),
939				self.is_validator.clone(),
940			)
941		};
942
943		// 1. Capture all import notification from the very beginning, so first register all
944		//the listeners.
945		self.import_notification_sink.add_view(
946			view.at.hash,
947			view.pool.validated_pool().import_notification_stream().boxed(),
948		);
949
950		self.view_store.dropped_stream_controller.add_view(
951			view.at.hash,
952			view.pool.validated_pool().create_dropped_by_limits_stream().boxed(),
953		);
954
955		let start = Instant::now();
956		let watched_xts = self.register_listeners(&mut view).await;
957		let duration = start.elapsed();
958		log::debug!(target: LOG_TARGET, "register_listeners: at {at:?} took {duration:?}");
959
960		// 2. Handle transactions from the tree route. Pruning transactions from the view first
961		// will make some space for mempool transactions in case we are at the view's limits.
962		let start = Instant::now();
963		self.update_view_with_fork(&view, tree_route, at.clone()).await;
964		let duration = start.elapsed();
965		log::debug!(target: LOG_TARGET, "update_view_with_fork: at {at:?} took {duration:?}");
966
967		// 3. Finally, submit transactions from the mempool.
968		let start = Instant::now();
969		self.update_view_with_mempool(&mut view, watched_xts).await;
970		let duration = start.elapsed();
971		log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {at:?} took {duration:?}");
972
973		let view = Arc::from(view);
974		self.view_store.insert_new_view(view.clone(), tree_route).await;
975		Some(view)
976	}
977
978	/// Returns the list of xts included in all block ancestors, including the block itself.
979	///
980	/// Example: for the following chain `F<-B1<-B2<-B3` xts from `F,B1,B2,B3` will be returned.
981	async fn extrinsics_included_since_finalized(&self, at: Block::Hash) -> HashSet<TxHash<Self>> {
982		let start = Instant::now();
983		let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
984
985		let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at) else {
986			return Default::default()
987		};
988
989		let api = self.api.clone();
990		let mut all_extrinsics = HashSet::new();
991
992		for h in tree_route.enacted().iter().rev() {
993			api.block_body(h.hash)
994				.await
995				.unwrap_or_else(|e| {
996					log::warn!(target: LOG_TARGET, "Compute ready light transactions: error request: {}", e);
997					None
998				})
999				.unwrap_or_default()
1000				.into_iter()
1001				.map(|t| self.hash_of(&t))
1002				.for_each(|tx_hash| {
1003					all_extrinsics.insert(tx_hash);
1004				});
1005		}
1006
1007		log::debug!(target: LOG_TARGET,
1008			"fatp::extrinsics_included_since_finalized {} from {} count: {} took:{:?}",
1009			at,
1010			recent_finalized_block,
1011			all_extrinsics.len(),
1012			start.elapsed()
1013		);
1014		all_extrinsics
1015	}
1016
1017	/// For every watched transaction in the mempool registers a transaction listener in the view.
1018	///
1019	/// The transaction listener for a given view is also added to multi-view listener. This allows
1020	/// to track aggreagated progress of the transaction within the transaction pool.
1021	///
1022	/// Function returns a list of currently watched transactions in the mempool.
1023	async fn register_listeners(
1024		&self,
1025		view: &View<ChainApi>,
1026	) -> Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)> {
1027		log::debug!(
1028			target: LOG_TARGET,
1029			"register_listeners: {:?} xts:{:?} v:{}",
1030			view.at,
1031			self.mempool.unwatched_and_watched_count(),
1032			self.active_views_count()
1033		);
1034
1035		//todo [#5495]: maybe we don't need to register listener in view? We could use
1036		// multi_view_listener.transaction_in_block
1037		let results = self
1038			.mempool
1039			.clone_watched()
1040			.into_iter()
1041			.map(|(tx_hash, tx)| {
1042				let watcher = view.create_watcher(tx_hash);
1043				let at = view.at.clone();
1044				async move {
1045					log::trace!(target: LOG_TARGET, "[{:?}] adding watcher {:?}", tx_hash, at.hash);
1046					self.view_store.listener.add_view_watcher_for_tx(
1047						tx_hash,
1048						at.hash,
1049						watcher.into_stream().boxed(),
1050					);
1051					(tx_hash, tx)
1052				}
1053			})
1054			.collect::<Vec<_>>();
1055
1056		future::join_all(results).await
1057	}
1058
1059	/// Updates the given view with the transaction from the internal mempol.
1060	///
1061	/// All transactions from the mempool (excluding those which are either already imported or
1062	/// already included in blocks since recently finalized block) are submitted to the
1063	/// view.
1064	///
1065	/// If there are no views, and mempool transaction is reported as invalid for the given view,
1066	/// the transaction is reported as invalid and removed from the mempool. This does not apply to
1067	/// stale and temporarily banned transactions.
1068	///
1069	/// As the listeners for watched transactions were registered at the very beginning of maintain
1070	/// procedure (`register_listeners`), this function accepts the list of watched transactions
1071	/// from the mempool for which listener was actually registered to avoid submit/maintain races.
1072	async fn update_view_with_mempool(
1073		&self,
1074		view: &View<ChainApi>,
1075		watched_xts: Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)>,
1076	) {
1077		log::debug!(
1078			target: LOG_TARGET,
1079			"update_view_with_mempool: {:?} xts:{:?} v:{}",
1080			view.at,
1081			self.mempool.unwatched_and_watched_count(),
1082			self.active_views_count()
1083		);
1084		let included_xts = self.extrinsics_included_since_finalized(view.at.hash).await;
1085		let xts = self.mempool.clone_unwatched();
1086
1087		let mut all_submitted_count = 0;
1088		if !xts.is_empty() {
1089			let unwatched_count = xts.len();
1090			let mut buckets = HashMap::<TransactionSource, Vec<ExtrinsicFor<ChainApi>>>::default();
1091			xts.into_iter()
1092				.filter(|(hash, _)| !view.pool.validated_pool().pool.read().is_imported(hash))
1093				.filter(|(hash, _)| !included_xts.contains(&hash))
1094				.map(|(_, tx)| (tx.source(), tx.tx()))
1095				.for_each(|(source, tx)| buckets.entry(source).or_default().push(tx));
1096
1097			for (source, xts) in buckets {
1098				all_submitted_count += xts.len();
1099				let _ = view.submit_many(source, xts).await;
1100			}
1101			log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {:?} unwatched {}/{}", view.at.hash, all_submitted_count, unwatched_count);
1102		}
1103
1104		let watched_submitted_count = watched_xts.len();
1105
1106		let mut buckets = HashMap::<
1107			TransactionSource,
1108			Vec<(ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>)>,
1109		>::default();
1110		watched_xts
1111			.into_iter()
1112			.filter(|(hash, _)| !included_xts.contains(&hash))
1113			.map(|(tx_hash, tx)| (tx.source(), tx_hash, tx.tx()))
1114			.for_each(|(source, tx_hash, tx)| {
1115				buckets.entry(source).or_default().push((tx_hash, tx))
1116			});
1117
1118		let mut watched_results = Vec::default();
1119		for (source, watched_xts) in buckets {
1120			let hashes = watched_xts.iter().map(|i| i.0).collect::<Vec<_>>();
1121			let results = view
1122				.submit_many(source, watched_xts.into_iter().map(|i| i.1))
1123				.await
1124				.into_iter()
1125				.zip(hashes)
1126				.map(|(result, tx_hash)| result.or_else(|_| Err(tx_hash)))
1127				.collect::<Vec<_>>();
1128			watched_results.extend(results);
1129		}
1130
1131		log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {:?} watched {}/{}", view.at.hash, watched_submitted_count, self.mempool_len().1);
1132
1133		all_submitted_count += watched_submitted_count;
1134		let _ = all_submitted_count
1135			.try_into()
1136			.map(|v| self.metrics.report(|metrics| metrics.submitted_from_mempool_txs.inc_by(v)));
1137
1138		// if there are no views yet, and a single newly created view is reporting error, just send
1139		// out the invalid event, and remove transaction.
1140		if self.view_store.is_empty() {
1141			for result in watched_results {
1142				match result {
1143					Err(tx_hash) => {
1144						self.view_store.listener.invalidate_transactions(&[tx_hash]);
1145						self.mempool.remove(tx_hash);
1146					},
1147					Ok(_) => {},
1148				}
1149			}
1150		}
1151	}
1152
1153	/// Updates the view with the transactions from the given tree route.
1154	///
1155	/// Transactions from the retracted blocks are resubmitted to the given view. Tags for
1156	/// transactions included in blocks on enacted fork are pruned from the provided view.
1157	async fn update_view_with_fork(
1158		&self,
1159		view: &View<ChainApi>,
1160		tree_route: &TreeRoute<Block>,
1161		hash_and_number: HashAndNumber<Block>,
1162	) {
1163		log::debug!(target: LOG_TARGET, "update_view_with_fork tree_route: {:?} {tree_route:?}", view.at);
1164		let api = self.api.clone();
1165
1166		// We keep track of everything we prune so that later we won't add
1167		// transactions with those hashes from the retracted blocks.
1168		let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
1169
1170		future::join_all(
1171			tree_route
1172				.enacted()
1173				.iter()
1174				.map(|h| crate::prune_known_txs_for_block(h, &*api, &view.pool)),
1175		)
1176		.await
1177		.into_iter()
1178		.for_each(|enacted_log| {
1179			pruned_log.extend(enacted_log);
1180		});
1181
1182		//resubmit
1183		{
1184			let mut resubmit_transactions = Vec::new();
1185
1186			for retracted in tree_route.retracted() {
1187				let hash = retracted.hash;
1188
1189				let block_transactions = api
1190					.block_body(hash)
1191					.await
1192					.unwrap_or_else(|e| {
1193						log::warn!(target: LOG_TARGET, "Failed to fetch block body: {}", e);
1194						None
1195					})
1196					.unwrap_or_default()
1197					.into_iter();
1198
1199				let mut resubmitted_to_report = 0;
1200
1201				resubmit_transactions.extend(
1202					block_transactions
1203						.into_iter()
1204						.map(|tx| (self.hash_of(&tx), tx))
1205						.filter(|(tx_hash, _)| {
1206							let contains = pruned_log.contains(&tx_hash);
1207
1208							// need to count all transactions, not just filtered, here
1209							resubmitted_to_report += 1;
1210
1211							if !contains {
1212								log::trace!(
1213									target: LOG_TARGET,
1214									"[{:?}]: Resubmitting from retracted block {:?}",
1215									tx_hash,
1216									hash,
1217								);
1218							}
1219							!contains
1220						})
1221						.map(|(tx_hash, tx)| {
1222							//find arc if tx is known
1223							self.mempool.get_by_hash(tx_hash).unwrap_or_else(|| Arc::from(tx))
1224						}),
1225				);
1226
1227				self.metrics.report(|metrics| {
1228					metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
1229				});
1230			}
1231
1232			let _ = view
1233				.pool
1234				.resubmit_at(
1235					&hash_and_number,
1236					// These transactions are coming from retracted blocks, we should
1237					// simply consider them external.
1238					TransactionSource::External,
1239					resubmit_transactions,
1240				)
1241				.await;
1242		}
1243	}
1244
1245	/// Executes the maintainance for the finalized event.
1246	///
1247	/// Performs a house-keeping required for finalized event. This includes:
1248	/// - executing the on finalized procedure for the view store,
1249	/// - purging finalized transactions from the mempool and triggering mempool revalidation,
1250	async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
1251		let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
1252		log::debug!(target: LOG_TARGET, "handle_finalized {finalized_number:?} tree_route: {tree_route:?} views_count:{}", self.active_views_count());
1253
1254		let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
1255
1256		self.mempool.purge_finalized_transactions(&finalized_xts).await;
1257		self.import_notification_sink.clean_notified_items(&finalized_xts);
1258
1259		self.metrics
1260			.report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
1261
1262		if let Ok(Some(finalized_number)) = finalized_number {
1263			self.revalidation_queue
1264				.revalidate_mempool(
1265					self.mempool.clone(),
1266					HashAndNumber { hash: finalized_hash, number: finalized_number },
1267				)
1268				.await;
1269		} else {
1270			log::trace!(target: LOG_TARGET, "purge_transactions_later skipped, cannot find block number {finalized_number:?}");
1271		}
1272
1273		self.ready_poll.lock().remove_cancelled();
1274		log::trace!(target: LOG_TARGET, "handle_finalized after views_count:{:?}", self.active_views_count());
1275	}
1276
1277	/// Computes a hash of the provided transaction
1278	fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1279		self.api.hash_and_length(xt).0
1280	}
1281}
1282
1283#[async_trait]
1284impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
1285where
1286	Block: BlockT,
1287	ChainApi: 'static + graph::ChainApi<Block = Block>,
1288	<Block as BlockT>::Hash: Unpin,
1289{
1290	/// Executes the maintainance for the given chain event.
1291	async fn maintain(&self, event: ChainEvent<Self::Block>) {
1292		let start = Instant::now();
1293		log::debug!(target: LOG_TARGET, "processing event: {event:?}");
1294
1295		self.view_store.finish_background_revalidations().await;
1296
1297		let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
1298
1299		let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
1300			match self.api.tree_route(from, to) {
1301				Ok(tree_route) => Ok(tree_route),
1302				Err(e) =>
1303					return Err(format!(
1304						"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
1305					)),
1306			}
1307		};
1308		let block_id_to_number =
1309			|hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
1310
1311		let result =
1312			self.enactment_state
1313				.lock()
1314				.update(&event, &compute_tree_route, &block_id_to_number);
1315
1316		match result {
1317			Err(msg) => {
1318				log::trace!(target: LOG_TARGET, "enactment_state::update error: {msg}");
1319				self.enactment_state.lock().force_update(&event);
1320			},
1321			Ok(EnactmentAction::Skip) => return,
1322			Ok(EnactmentAction::HandleFinalization) => {
1323				// todo [#5492]: in some cases handle_new_block is actually needed (new_num >
1324				// tips_of_forks) let hash = event.hash();
1325				// if !self.has_view(hash) {
1326				// 	if let Ok(tree_route) = compute_tree_route(prev_finalized_block, hash) {
1327				// 		self.handle_new_block(&tree_route).await;
1328				// 	}
1329				// }
1330			},
1331			Ok(EnactmentAction::HandleEnactment(tree_route)) => {
1332				if matches!(event, ChainEvent::Finalized { .. }) {
1333					self.view_store.handle_pre_finalized(event.hash()).await;
1334				};
1335				self.handle_new_block(&tree_route).await;
1336			},
1337		};
1338
1339		match event {
1340			ChainEvent::NewBestBlock { .. } => {},
1341			ChainEvent::Finalized { hash, ref tree_route } => {
1342				self.handle_finalized(hash, tree_route).await;
1343
1344				log::trace!(
1345					target: LOG_TARGET,
1346					"on-finalized enacted: {tree_route:?}, previously finalized: \
1347					{prev_finalized_block:?}",
1348				);
1349			},
1350		}
1351
1352		let maintain_duration = start.elapsed();
1353
1354		log::info!(
1355			target: LOG_TARGET,
1356			"maintain: txs:{:?} views:[{};{:?}] event:{event:?}  took:{:?}",
1357			self.mempool_len(),
1358			self.active_views_count(),
1359			self.views_stats(),
1360			maintain_duration
1361		);
1362
1363		self.metrics.report(|metrics| {
1364			let (unwatched, watched) = self.mempool_len();
1365			let _ = (
1366				self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
1367				self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
1368				watched.try_into().map(|v| metrics.watched_txs.set(v)),
1369				unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
1370			);
1371			metrics.maintain_duration.observe(maintain_duration.as_secs_f64());
1372		});
1373	}
1374}
1375
1376impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
1377where
1378	Block: BlockT,
1379	Client: sp_api::ProvideRuntimeApi<Block>
1380		+ sc_client_api::BlockBackend<Block>
1381		+ sc_client_api::blockchain::HeaderBackend<Block>
1382		+ sp_runtime::traits::BlockIdTo<Block>
1383		+ sc_client_api::ExecutorProvider<Block>
1384		+ sc_client_api::UsageProvider<Block>
1385		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
1386		+ Send
1387		+ Sync
1388		+ 'static,
1389	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
1390	<Block as BlockT>::Hash: std::marker::Unpin,
1391{
1392	/// Create new fork aware transaction pool for a full node with the provided api.
1393	pub fn new_full(
1394		options: Options,
1395		is_validator: IsValidator,
1396		prometheus: Option<&PrometheusRegistry>,
1397		spawner: impl SpawnEssentialNamed,
1398		client: Arc<Client>,
1399	) -> Self {
1400		let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
1401		let pool = Self::new_with_background_worker(
1402			options,
1403			is_validator,
1404			pool_api,
1405			prometheus,
1406			spawner,
1407			client.usage_info().chain.best_hash,
1408			client.usage_info().chain.finalized_hash,
1409		);
1410
1411		pool
1412	}
1413}
1414
1415#[cfg(test)]
1416mod reduce_multiview_result_tests {
1417	use super::*;
1418	use sp_core::H256;
1419	#[derive(Debug, PartialEq, Clone)]
1420	enum Error {
1421		Custom(u8),
1422	}
1423
1424	#[test]
1425	fn empty() {
1426		sp_tracing::try_init_simple();
1427		let input = HashMap::default();
1428		let r = reduce_multiview_result::<H256, Error>(input);
1429		assert!(r.is_empty());
1430	}
1431
1432	#[test]
1433	fn errors_only() {
1434		sp_tracing::try_init_simple();
1435		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
1436			(
1437				H256::repeat_byte(0x13),
1438				vec![
1439					Err(Error::Custom(10)),
1440					Err(Error::Custom(11)),
1441					Err(Error::Custom(12)),
1442					Err(Error::Custom(13)),
1443				],
1444			),
1445			(
1446				H256::repeat_byte(0x14),
1447				vec![
1448					Err(Error::Custom(20)),
1449					Err(Error::Custom(21)),
1450					Err(Error::Custom(22)),
1451					Err(Error::Custom(23)),
1452				],
1453			),
1454			(
1455				H256::repeat_byte(0x15),
1456				vec![
1457					Err(Error::Custom(30)),
1458					Err(Error::Custom(31)),
1459					Err(Error::Custom(32)),
1460					Err(Error::Custom(33)),
1461				],
1462			),
1463		];
1464		let input = HashMap::from_iter(v.clone());
1465		let r = reduce_multiview_result(input);
1466
1467		//order in HashMap is random, the result shall be one of:
1468		assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
1469	}
1470
1471	#[test]
1472	#[should_panic]
1473	#[cfg(debug_assertions)]
1474	fn invalid_lengths() {
1475		sp_tracing::try_init_simple();
1476		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
1477			(H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
1478			(H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
1479		];
1480		let input = HashMap::from_iter(v);
1481		let _ = reduce_multiview_result(input);
1482	}
1483
1484	#[test]
1485	fn only_hashes() {
1486		sp_tracing::try_init_simple();
1487
1488		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
1489			(
1490				H256::repeat_byte(0x13),
1491				vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
1492			),
1493			(
1494				H256::repeat_byte(0x14),
1495				vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
1496			),
1497		];
1498		let input = HashMap::from_iter(v);
1499		let r = reduce_multiview_result(input);
1500
1501		assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
1502	}
1503
1504	#[test]
1505	fn one_view() {
1506		sp_tracing::try_init_simple();
1507		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
1508			H256::repeat_byte(0x13),
1509			vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
1510		)];
1511		let input = HashMap::from_iter(v);
1512		let r = reduce_multiview_result(input);
1513
1514		assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
1515	}
1516
1517	#[test]
1518	fn mix() {
1519		sp_tracing::try_init_simple();
1520		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
1521			(
1522				H256::repeat_byte(0x13),
1523				vec![
1524					Ok(H256::repeat_byte(0x10)),
1525					Err(Error::Custom(11)),
1526					Err(Error::Custom(12)),
1527					Err(Error::Custom(33)),
1528				],
1529			),
1530			(
1531				H256::repeat_byte(0x14),
1532				vec![
1533					Err(Error::Custom(20)),
1534					Ok(H256::repeat_byte(0x21)),
1535					Err(Error::Custom(22)),
1536					Err(Error::Custom(33)),
1537				],
1538			),
1539			(
1540				H256::repeat_byte(0x15),
1541				vec![
1542					Err(Error::Custom(30)),
1543					Err(Error::Custom(31)),
1544					Ok(H256::repeat_byte(0x32)),
1545					Err(Error::Custom(33)),
1546				],
1547			),
1548		];
1549		let input = HashMap::from_iter(v);
1550		let r = reduce_multiview_result(input);
1551
1552		assert_eq!(
1553			r,
1554			vec![
1555				Ok(H256::repeat_byte(0x10)),
1556				Ok(H256::repeat_byte(0x21)),
1557				Ok(H256::repeat_byte(0x32)),
1558				Err(Error::Custom(33))
1559			]
1560		);
1561	}
1562}