1use super::{
22 dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
23 import_notification_sink::MultiViewImportNotificationSink,
24 metrics::MetricsLink as PrometheusMetrics,
25 multi_view_listener::MultiViewListener,
26 tx_mem_pool::{TxInMemPool, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
27 view::View,
28 view_store::ViewStore,
29};
30use crate::{
31 api::FullChainApi,
32 common::log_xt::log_xt_trace,
33 enactment_state::{EnactmentAction, EnactmentState},
34 fork_aware_txpool::revalidation_worker,
35 graph::{self, base_pool::Transaction, ExtrinsicFor, ExtrinsicHash, IsValidator, Options},
36 PolledIterator, ReadyIteratorFor, LOG_TARGET,
37};
38use async_trait::async_trait;
39use futures::{
40 channel::oneshot,
41 future::{self},
42 prelude::*,
43 FutureExt,
44};
45use parking_lot::Mutex;
46use prometheus_endpoint::Registry as PrometheusRegistry;
47use sc_transaction_pool_api::{
48 error::{Error, IntoPoolError},
49 ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus,
50 TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
51};
52use sp_blockchain::{HashAndNumber, TreeRoute};
53use sp_core::traits::SpawnEssentialNamed;
54use sp_runtime::{
55 generic::BlockId,
56 traits::{Block as BlockT, NumberFor},
57};
58use std::{
59 collections::{HashMap, HashSet},
60 pin::Pin,
61 sync::Arc,
62 time::Instant,
63};
64use tokio::select;
65
66pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
68
69struct ReadyPoll<T, Block>
72where
73 Block: BlockT,
74{
75 pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
76}
77
78impl<T, Block> ReadyPoll<T, Block>
79where
80 Block: BlockT,
81{
82 fn new() -> Self {
84 Self { pollers: Default::default() }
85 }
86
87 fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
90 let (s, r) = oneshot::channel();
91 self.pollers.entry(at).or_default().push(s);
92 r
93 }
94
95 fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
100 log::trace!(target: LOG_TARGET, "fatp::trigger {at:?} pending keys: {:?}", self.pollers.keys());
101 let Some(pollers) = self.pollers.remove(&at) else { return };
102 pollers.into_iter().for_each(|p| {
103 log::debug!(target: LOG_TARGET, "trigger ready signal at block {}", at);
104 let _ = p.send(ready_iterator());
105 });
106 }
107
108 fn remove_cancelled(&mut self) {
110 self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
111 }
112}
113
114pub struct ForkAwareTxPool<ChainApi, Block>
118where
119 Block: BlockT,
120 ChainApi: graph::ChainApi<Block = Block> + 'static,
121{
122 api: Arc<ChainApi>,
124
125 mempool: Arc<TxMemPool<ChainApi, Block>>,
127
128 view_store: Arc<ViewStore<ChainApi, Block>>,
130
131 ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
133
134 metrics: PrometheusMetrics,
136
137 enactment_state: Arc<Mutex<EnactmentState<Block>>>,
139
140 revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
142
143 import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
146
147 options: Options,
149
150 is_validator: IsValidator,
152}
153
154impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
155where
156 Block: BlockT,
157 ChainApi: graph::ChainApi<Block = Block> + 'static,
158 <Block as BlockT>::Hash: Unpin,
159{
160 pub fn new_test(
163 pool_api: Arc<ChainApi>,
164 best_block_hash: Block::Hash,
165 finalized_hash: Block::Hash,
166 ) -> (Self, ForkAwareTxPoolTask) {
167 Self::new_test_with_limits(
168 pool_api,
169 best_block_hash,
170 finalized_hash,
171 Options::default().ready,
172 Options::default().future,
173 usize::MAX,
174 )
175 }
176
177 pub fn new_test_with_limits(
180 pool_api: Arc<ChainApi>,
181 best_block_hash: Block::Hash,
182 finalized_hash: Block::Hash,
183 ready_limits: crate::PoolLimit,
184 future_limits: crate::PoolLimit,
185 mempool_max_transactions_count: usize,
186 ) -> (Self, ForkAwareTxPoolTask) {
187 let listener = Arc::from(MultiViewListener::new());
188 let (import_notification_sink, import_notification_sink_task) =
189 MultiViewImportNotificationSink::new_with_worker();
190
191 let mempool = Arc::from(TxMemPool::new(
192 pool_api.clone(),
193 listener.clone(),
194 Default::default(),
195 mempool_max_transactions_count,
196 ));
197
198 let (dropped_stream_controller, dropped_stream) =
199 MultiViewDroppedWatcherController::<ChainApi>::new();
200 let dropped_monitor_task = Self::dropped_monitor_task(
201 dropped_stream,
202 mempool.clone(),
203 import_notification_sink.clone(),
204 );
205
206 let combined_tasks = async move {
207 tokio::select! {
208 _ = import_notification_sink_task => {},
209 _ = dropped_monitor_task => {}
210 }
211 }
212 .boxed();
213
214 let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
215
216 (
217 Self {
218 mempool,
219 api: pool_api.clone(),
220 view_store: Arc::new(ViewStore::new(pool_api, listener, dropped_stream_controller)),
221 ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
222 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
223 best_block_hash,
224 finalized_hash,
225 ))),
226 revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
227 import_notification_sink,
228 options,
229 is_validator: false.into(),
230 metrics: Default::default(),
231 },
232 combined_tasks,
233 )
234 }
235
236 async fn dropped_monitor_task(
242 mut dropped_stream: StreamOfDropped<ChainApi>,
243 mempool: Arc<TxMemPool<ChainApi, Block>>,
244 import_notification_sink: MultiViewImportNotificationSink<
245 Block::Hash,
246 ExtrinsicHash<ChainApi>,
247 >,
248 ) {
249 loop {
250 let Some(dropped) = dropped_stream.next().await else {
251 log::debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
252 break;
253 };
254 log::trace!(target: LOG_TARGET, "[{:?}] fatp::dropped notification, removing", dropped);
255 mempool.remove_dropped_transactions(&[dropped]).await;
256 import_notification_sink.clean_notified_items(&[dropped]);
257 }
258 }
259
260 pub fn new_with_background_worker(
265 options: Options,
266 is_validator: IsValidator,
267 pool_api: Arc<ChainApi>,
268 prometheus: Option<&PrometheusRegistry>,
269 spawner: impl SpawnEssentialNamed,
270 best_block_hash: Block::Hash,
271 finalized_hash: Block::Hash,
272 ) -> Self {
273 let metrics = PrometheusMetrics::new(prometheus);
274 let listener = Arc::from(MultiViewListener::new());
275 let (revalidation_queue, revalidation_task) =
276 revalidation_worker::RevalidationQueue::new_with_worker();
277
278 let (import_notification_sink, import_notification_sink_task) =
279 MultiViewImportNotificationSink::new_with_worker();
280
281 let mempool = Arc::from(TxMemPool::new(
282 pool_api.clone(),
283 listener.clone(),
284 metrics.clone(),
285 TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * (options.ready.count + options.future.count),
286 ));
287
288 let (dropped_stream_controller, dropped_stream) =
289 MultiViewDroppedWatcherController::<ChainApi>::new();
290 let dropped_monitor_task = Self::dropped_monitor_task(
291 dropped_stream,
292 mempool.clone(),
293 import_notification_sink.clone(),
294 );
295
296 let combined_tasks = async move {
297 tokio::select! {
298 _ = revalidation_task => {},
299 _ = import_notification_sink_task => {},
300 _ = dropped_monitor_task => {}
301 }
302 }
303 .boxed();
304 spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
305
306 Self {
307 mempool,
308 api: pool_api.clone(),
309 view_store: Arc::new(ViewStore::new(pool_api, listener, dropped_stream_controller)),
310 ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
311 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
312 best_block_hash,
313 finalized_hash,
314 ))),
315 revalidation_queue: Arc::from(revalidation_queue),
316 import_notification_sink,
317 options,
318 metrics,
319 is_validator,
320 }
321 }
322
323 pub fn api(&self) -> &ChainApi {
325 &self.api
326 }
327
328 pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
330 self.view_store.status()
331 }
332
333 pub fn active_views_count(&self) -> usize {
335 self.view_store.active_views.read().len()
336 }
337
338 pub fn inactive_views_count(&self) -> usize {
340 self.view_store.inactive_views.read().len()
341 }
342
343 fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
348 self.view_store
349 .active_views
350 .read()
351 .iter()
352 .map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
353 .collect()
354 }
355
356 pub fn has_view(&self, hash: &Block::Hash) -> bool {
358 self.view_store.active_views.read().contains_key(hash)
359 }
360
361 pub fn mempool_len(&self) -> (usize, usize) {
365 self.mempool.unwatched_and_watched_count()
366 }
367
368 pub fn ready_at_light(&self, at: Block::Hash) -> PolledIterator<ChainApi> {
378 let start = Instant::now();
379 let api = self.api.clone();
380 log::trace!(target: LOG_TARGET, "fatp::ready_at_light {:?}", at);
381
382 let Ok(block_number) = self.api.resolve_block_number(at) else {
383 let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
384 return Box::pin(async { empty })
385 };
386
387 let best_result = {
388 api.tree_route(self.enactment_state.lock().recent_finalized_block(), at).map(
389 |tree_route| {
390 if let Some((index, view)) =
391 tree_route.enacted().iter().enumerate().rev().skip(1).find_map(|(i, b)| {
392 self.view_store.get_view_at(b.hash, true).map(|(view, _)| (i, view))
393 }) {
394 let e = tree_route.enacted()[index..].to_vec();
395 (TreeRoute::new(e, 0).ok(), Some(view))
396 } else {
397 (None, None)
398 }
399 },
400 )
401 };
402
403 Box::pin(async move {
404 if let Ok((Some(best_tree_route), Some(best_view))) = best_result {
405 let tmp_view: View<ChainApi> = View::new_from_other(
406 &best_view,
407 &HashAndNumber { hash: at, number: block_number },
408 );
409
410 let mut all_extrinsics = vec![];
411
412 for h in best_tree_route.enacted() {
413 let extrinsics = api
414 .block_body(h.hash)
415 .await
416 .unwrap_or_else(|e| {
417 log::warn!(target: LOG_TARGET, "Compute ready light transactions: error request: {}", e);
418 None
419 })
420 .unwrap_or_default()
421 .into_iter()
422 .map(|t| api.hash_and_length(&t).0);
423 all_extrinsics.extend(extrinsics);
424 }
425
426 let before_count = tmp_view.pool.validated_pool().status().ready;
427 let tags = tmp_view
428 .pool
429 .validated_pool()
430 .extrinsics_tags(&all_extrinsics)
431 .into_iter()
432 .flatten()
433 .flatten()
434 .collect::<Vec<_>>();
435 let _ = tmp_view.pool.validated_pool().prune_tags(tags);
436
437 let after_count = tmp_view.pool.validated_pool().status().ready;
438 log::debug!(target: LOG_TARGET,
439 "fatp::ready_at_light {} from {} before: {} to be removed: {} after: {} took:{:?}",
440 at,
441 best_view.at.hash,
442 before_count,
443 all_extrinsics.len(),
444 after_count,
445 start.elapsed()
446 );
447 Box::new(tmp_view.pool.validated_pool().ready())
448 } else {
449 let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
450 log::debug!(target: LOG_TARGET, "fatp::ready_at_light {} -> empty, took:{:?}", at, start.elapsed());
451 empty
452 }
453 })
454 }
455
456 fn ready_at_with_timeout_internal(
467 &self,
468 at: Block::Hash,
469 timeout: std::time::Duration,
470 ) -> PolledIterator<ChainApi> {
471 log::debug!(target: LOG_TARGET, "fatp::ready_at_with_timeout at {:?} allowed delay: {:?}", at, timeout);
472
473 let timeout = futures_timer::Delay::new(timeout);
474 let (view_already_exists, ready_at) = self.ready_at_internal(at);
475
476 if view_already_exists {
477 return ready_at;
478 }
479
480 let maybe_ready = async move {
481 select! {
482 ready = ready_at => Some(ready),
483 _ = timeout => {
484 log::warn!(target: LOG_TARGET,
485 "Timeout fired waiting for transaction pool at block: ({:?}). \
486 Proceeding with production.",
487 at,
488 );
489 None
490 }
491 }
492 };
493
494 let fall_back_ready = self.ready_at_light(at);
495 Box::pin(async {
496 let (maybe_ready, fall_back_ready) =
497 futures::future::join(maybe_ready.boxed(), fall_back_ready.boxed()).await;
498 maybe_ready.unwrap_or(fall_back_ready)
499 })
500 }
501
502 fn ready_at_internal(&self, at: Block::Hash) -> (bool, PolledIterator<ChainApi>) {
503 let mut ready_poll = self.ready_poll.lock();
504
505 if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
506 log::debug!(target: LOG_TARGET, "fatp::ready_at {at:?} (inactive:{inactive:?})");
507 let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
508 return (true, async move { iterator }.boxed());
509 }
510
511 let pending = ready_poll
512 .add(at)
513 .map(|received| {
514 received.unwrap_or_else(|e| {
515 log::warn!(target: LOG_TARGET, "Error receiving ready-set iterator: {:?}", e);
516 Box::new(std::iter::empty())
517 })
518 })
519 .boxed();
520 log::debug!(target: LOG_TARGET,
521 "fatp::ready_at {at:?} pending keys: {:?}",
522 ready_poll.pollers.keys()
523 );
524 (false, pending)
525 }
526}
527
528fn reduce_multiview_result<H, E>(input: HashMap<H, Vec<Result<H, E>>>) -> Vec<Result<H, E>> {
555 let mut values = input.values();
556 let Some(first) = values.next() else {
557 return Default::default();
558 };
559 let length = first.len();
560 debug_assert!(values.all(|x| length == x.len()));
561
562 input
563 .into_values()
564 .reduce(|mut agg_results, results| {
565 agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
566 if agg_r.is_err() {
567 *agg_r = r;
568 }
569 });
570 agg_results
571 })
572 .unwrap_or_default()
573}
574
575impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
576where
577 Block: BlockT,
578 ChainApi: 'static + graph::ChainApi<Block = Block>,
579 <Block as BlockT>::Hash: Unpin,
580{
581 type Block = ChainApi::Block;
582 type Hash = ExtrinsicHash<ChainApi>;
583 type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
584 type Error = ChainApi::Error;
585
586 fn submit_at(
593 &self,
594 _: <Self::Block as BlockT>::Hash,
595 source: TransactionSource,
596 xts: Vec<TransactionFor<Self>>,
597 ) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
598 let view_store = self.view_store.clone();
599 log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count());
600 log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at");
601 let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
602 let mempool_result = self.mempool.extend_unwatched(source, &xts);
603
604 if view_store.is_empty() {
605 return future::ready(Ok(mempool_result)).boxed()
606 }
607
608 let (hashes, to_be_submitted): (Vec<TxHash<Self>>, Vec<ExtrinsicFor<ChainApi>>) =
609 mempool_result
610 .iter()
611 .zip(xts)
612 .filter_map(|(result, xt)| result.as_ref().ok().map(|xt_hash| (xt_hash, xt)))
613 .unzip();
614
615 self.metrics
616 .report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
617
618 let mempool = self.mempool.clone();
619 async move {
620 let results_map = view_store.submit(source, to_be_submitted.into_iter(), hashes).await;
621 let mut submission_results = reduce_multiview_result(results_map).into_iter();
622
623 Ok(mempool_result
624 .into_iter()
625 .map(|result| {
626 result.and_then(|xt_hash| {
627 let result = submission_results
628 .next()
629 .expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.");
630 result.or_else(|error| {
631 let error = error.into_pool_error();
632 match error {
633 Ok(
634 Error::ImmediatelyDropped
636 ) => Ok(xt_hash),
637 Ok(e) => {
638 mempool.remove(xt_hash);
639 Err(e.into())
640 },
641 Err(e) => Err(e),
642 }
643 })
644 })
645 })
646 .collect::<Vec<_>>())
647 }
648 .boxed()
649 }
650
651 fn submit_one(
655 &self,
656 _at: <Self::Block as BlockT>::Hash,
657 source: TransactionSource,
658 xt: TransactionFor<Self>,
659 ) -> PoolFuture<TxHash<Self>, Self::Error> {
660 log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_one views:{}", self.tx_hash(&xt), self.active_views_count());
661 let result_future = self.submit_at(_at, source, vec![xt]);
662 async move {
663 let result = result_future.await;
664 match result {
665 Ok(mut v) =>
666 v.pop().expect("There is exactly one element in result of submit_at. qed."),
667 Err(e) => Err(e),
668 }
669 }
670 .boxed()
671 }
672
673 fn submit_and_watch(
678 &self,
679 at: <Self::Block as BlockT>::Hash,
680 source: TransactionSource,
681 xt: TransactionFor<Self>,
682 ) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
683 log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count());
684 let xt = Arc::from(xt);
685 let xt_hash = match self.mempool.push_watched(source, xt.clone()) {
686 Ok(xt_hash) => xt_hash,
687 Err(e) => return future::ready(Err(e)).boxed(),
688 };
689
690 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
691
692 let view_store = self.view_store.clone();
693 let mempool = self.mempool.clone();
694 async move {
695 let result = view_store.submit_and_watch(at, source, xt).await;
696 let result = result.or_else(|(e, maybe_watcher)| {
697 let error = e.into_pool_error();
698 match (error, maybe_watcher) {
699 (
700 Ok(
701 Error::ImmediatelyDropped,
704 ),
705 Some(watcher),
706 ) => Ok(watcher),
707 (Ok(e), _) => {
708 mempool.remove(xt_hash);
709 Err(e.into())
710 },
711 (Err(e), _) => Err(e),
712 }
713 });
714 result
715 }
716 .boxed()
717 }
718
719 fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
725 if !hashes.is_empty() {
726 log::debug!(target: LOG_TARGET, "fatp::remove_invalid {}", hashes.len());
727 log_xt_trace!(target:LOG_TARGET, hashes, "[{:?}] fatp::remove_invalid");
728 self.metrics
729 .report(|metrics| metrics.removed_invalid_txs.inc_by(hashes.len() as _));
730 }
731 Default::default()
732 }
733
734 fn status(&self) -> PoolStatus {
742 self.view_store
743 .most_recent_view
744 .read()
745 .map(|hash| self.view_store.status()[&hash].clone())
746 .unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
747 }
748
749 fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
754 self.import_notification_sink.event_stream()
755 }
756
757 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
759 self.api().hash_and_length(xt).0
760 }
761
762 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
764 self.view_store.listener.transactions_broadcasted(propagations);
765 }
766
767 fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
773 let most_recent_view = self.view_store.most_recent_view.read();
774 let result = most_recent_view
775 .map(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash))
776 .flatten();
777 log::trace!(
778 target: LOG_TARGET,
779 "[{tx_hash:?}] ready_transaction: {} {:?}",
780 result.is_some(),
781 most_recent_view
782 );
783 result
784 }
785
786 fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> PolledIterator<ChainApi> {
788 let (_, result) = self.ready_at_internal(at);
789 result
790 }
791
792 fn ready(&self) -> ReadyIteratorFor<ChainApi> {
797 self.view_store.ready()
798 }
799
800 fn futures(&self) -> Vec<Self::InPoolTransaction> {
805 self.view_store.futures()
806 }
807
808 fn ready_at_with_timeout(
813 &self,
814 at: <Self::Block as BlockT>::Hash,
815 timeout: std::time::Duration,
816 ) -> PolledIterator<ChainApi> {
817 self.ready_at_with_timeout_internal(at, timeout)
818 }
819}
820
821impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
822 for ForkAwareTxPool<FullChainApi<Client, Block>, Block>
823where
824 Block: BlockT,
825 <Block as BlockT>::Hash: Unpin,
826 Client: sp_api::ProvideRuntimeApi<Block>
827 + sc_client_api::BlockBackend<Block>
828 + sc_client_api::blockchain::HeaderBackend<Block>
829 + sp_runtime::traits::BlockIdTo<Block>
830 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
831 Client: Send + Sync + 'static,
832 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
833{
834 type Block = Block;
835 type Hash = ExtrinsicHash<FullChainApi<Client, Block>>;
836 type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
837
838 fn submit_local(
839 &self,
840 _at: Block::Hash,
841 xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
842 ) -> Result<Self::Hash, Self::Error> {
843 log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count());
844 let xt = Arc::from(xt);
845 let result = self
846 .mempool
847 .extend_unwatched(TransactionSource::Local, &[xt.clone()])
848 .remove(0)?;
849
850 self.view_store.submit_local(xt).or_else(|_| Ok(result))
851 }
852}
853
854impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
855where
856 Block: BlockT,
857 ChainApi: graph::ChainApi<Block = Block> + 'static,
858 <Block as BlockT>::Hash: Unpin,
859{
860 async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
868 let hash_and_number = match tree_route.last() {
869 Some(hash_and_number) => hash_and_number,
870 None => {
871 log::warn!(
872 target: LOG_TARGET,
873 "Skipping ChainEvent - no last block in tree route {:?}",
874 tree_route,
875 );
876 return
877 },
878 };
879
880 if self.has_view(&hash_and_number.hash) {
881 log::trace!(
882 target: LOG_TARGET,
883 "view already exists for block: {:?}",
884 hash_and_number,
885 );
886 return
887 }
888
889 let best_view = self.view_store.find_best_view(tree_route);
890 let new_view = self.build_new_view(best_view, hash_and_number, tree_route).await;
891
892 if let Some(view) = new_view {
893 {
894 let view = view.clone();
895 self.ready_poll.lock().trigger(hash_and_number.hash, move || {
896 Box::from(view.pool.validated_pool().ready())
897 });
898 }
899
900 View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
901 }
902 }
903
904 async fn build_new_view(
914 &self,
915 origin_view: Option<Arc<View<ChainApi>>>,
916 at: &HashAndNumber<Block>,
917 tree_route: &TreeRoute<Block>,
918 ) -> Option<Arc<View<ChainApi>>> {
919 log::debug!(
920 target: LOG_TARGET,
921 "build_new_view: for: {:?} from: {:?} tree_route: {:?}",
922 at,
923 origin_view.as_ref().map(|v| v.at.clone()),
924 tree_route
925 );
926 let mut view = if let Some(origin_view) = origin_view {
927 let mut view = View::new_from_other(&origin_view, at);
928 if !tree_route.retracted().is_empty() {
929 view.pool.clear_recently_pruned();
930 }
931 view
932 } else {
933 log::debug!(target: LOG_TARGET, "creating non-cloned view: for: {at:?}");
934 View::new(
935 self.api.clone(),
936 at.clone(),
937 self.options.clone(),
938 self.metrics.clone(),
939 self.is_validator.clone(),
940 )
941 };
942
943 self.import_notification_sink.add_view(
946 view.at.hash,
947 view.pool.validated_pool().import_notification_stream().boxed(),
948 );
949
950 self.view_store.dropped_stream_controller.add_view(
951 view.at.hash,
952 view.pool.validated_pool().create_dropped_by_limits_stream().boxed(),
953 );
954
955 let start = Instant::now();
956 let watched_xts = self.register_listeners(&mut view).await;
957 let duration = start.elapsed();
958 log::debug!(target: LOG_TARGET, "register_listeners: at {at:?} took {duration:?}");
959
960 let start = Instant::now();
963 self.update_view_with_fork(&view, tree_route, at.clone()).await;
964 let duration = start.elapsed();
965 log::debug!(target: LOG_TARGET, "update_view_with_fork: at {at:?} took {duration:?}");
966
967 let start = Instant::now();
969 self.update_view_with_mempool(&mut view, watched_xts).await;
970 let duration = start.elapsed();
971 log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {at:?} took {duration:?}");
972
973 let view = Arc::from(view);
974 self.view_store.insert_new_view(view.clone(), tree_route).await;
975 Some(view)
976 }
977
978 async fn extrinsics_included_since_finalized(&self, at: Block::Hash) -> HashSet<TxHash<Self>> {
982 let start = Instant::now();
983 let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
984
985 let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at) else {
986 return Default::default()
987 };
988
989 let api = self.api.clone();
990 let mut all_extrinsics = HashSet::new();
991
992 for h in tree_route.enacted().iter().rev() {
993 api.block_body(h.hash)
994 .await
995 .unwrap_or_else(|e| {
996 log::warn!(target: LOG_TARGET, "Compute ready light transactions: error request: {}", e);
997 None
998 })
999 .unwrap_or_default()
1000 .into_iter()
1001 .map(|t| self.hash_of(&t))
1002 .for_each(|tx_hash| {
1003 all_extrinsics.insert(tx_hash);
1004 });
1005 }
1006
1007 log::debug!(target: LOG_TARGET,
1008 "fatp::extrinsics_included_since_finalized {} from {} count: {} took:{:?}",
1009 at,
1010 recent_finalized_block,
1011 all_extrinsics.len(),
1012 start.elapsed()
1013 );
1014 all_extrinsics
1015 }
1016
1017 async fn register_listeners(
1024 &self,
1025 view: &View<ChainApi>,
1026 ) -> Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)> {
1027 log::debug!(
1028 target: LOG_TARGET,
1029 "register_listeners: {:?} xts:{:?} v:{}",
1030 view.at,
1031 self.mempool.unwatched_and_watched_count(),
1032 self.active_views_count()
1033 );
1034
1035 let results = self
1038 .mempool
1039 .clone_watched()
1040 .into_iter()
1041 .map(|(tx_hash, tx)| {
1042 let watcher = view.create_watcher(tx_hash);
1043 let at = view.at.clone();
1044 async move {
1045 log::trace!(target: LOG_TARGET, "[{:?}] adding watcher {:?}", tx_hash, at.hash);
1046 self.view_store.listener.add_view_watcher_for_tx(
1047 tx_hash,
1048 at.hash,
1049 watcher.into_stream().boxed(),
1050 );
1051 (tx_hash, tx)
1052 }
1053 })
1054 .collect::<Vec<_>>();
1055
1056 future::join_all(results).await
1057 }
1058
1059 async fn update_view_with_mempool(
1073 &self,
1074 view: &View<ChainApi>,
1075 watched_xts: Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)>,
1076 ) {
1077 log::debug!(
1078 target: LOG_TARGET,
1079 "update_view_with_mempool: {:?} xts:{:?} v:{}",
1080 view.at,
1081 self.mempool.unwatched_and_watched_count(),
1082 self.active_views_count()
1083 );
1084 let included_xts = self.extrinsics_included_since_finalized(view.at.hash).await;
1085 let xts = self.mempool.clone_unwatched();
1086
1087 let mut all_submitted_count = 0;
1088 if !xts.is_empty() {
1089 let unwatched_count = xts.len();
1090 let mut buckets = HashMap::<TransactionSource, Vec<ExtrinsicFor<ChainApi>>>::default();
1091 xts.into_iter()
1092 .filter(|(hash, _)| !view.pool.validated_pool().pool.read().is_imported(hash))
1093 .filter(|(hash, _)| !included_xts.contains(&hash))
1094 .map(|(_, tx)| (tx.source(), tx.tx()))
1095 .for_each(|(source, tx)| buckets.entry(source).or_default().push(tx));
1096
1097 for (source, xts) in buckets {
1098 all_submitted_count += xts.len();
1099 let _ = view.submit_many(source, xts).await;
1100 }
1101 log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {:?} unwatched {}/{}", view.at.hash, all_submitted_count, unwatched_count);
1102 }
1103
1104 let watched_submitted_count = watched_xts.len();
1105
1106 let mut buckets = HashMap::<
1107 TransactionSource,
1108 Vec<(ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>)>,
1109 >::default();
1110 watched_xts
1111 .into_iter()
1112 .filter(|(hash, _)| !included_xts.contains(&hash))
1113 .map(|(tx_hash, tx)| (tx.source(), tx_hash, tx.tx()))
1114 .for_each(|(source, tx_hash, tx)| {
1115 buckets.entry(source).or_default().push((tx_hash, tx))
1116 });
1117
1118 let mut watched_results = Vec::default();
1119 for (source, watched_xts) in buckets {
1120 let hashes = watched_xts.iter().map(|i| i.0).collect::<Vec<_>>();
1121 let results = view
1122 .submit_many(source, watched_xts.into_iter().map(|i| i.1))
1123 .await
1124 .into_iter()
1125 .zip(hashes)
1126 .map(|(result, tx_hash)| result.or_else(|_| Err(tx_hash)))
1127 .collect::<Vec<_>>();
1128 watched_results.extend(results);
1129 }
1130
1131 log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {:?} watched {}/{}", view.at.hash, watched_submitted_count, self.mempool_len().1);
1132
1133 all_submitted_count += watched_submitted_count;
1134 let _ = all_submitted_count
1135 .try_into()
1136 .map(|v| self.metrics.report(|metrics| metrics.submitted_from_mempool_txs.inc_by(v)));
1137
1138 if self.view_store.is_empty() {
1141 for result in watched_results {
1142 match result {
1143 Err(tx_hash) => {
1144 self.view_store.listener.invalidate_transactions(&[tx_hash]);
1145 self.mempool.remove(tx_hash);
1146 },
1147 Ok(_) => {},
1148 }
1149 }
1150 }
1151 }
1152
1153 async fn update_view_with_fork(
1158 &self,
1159 view: &View<ChainApi>,
1160 tree_route: &TreeRoute<Block>,
1161 hash_and_number: HashAndNumber<Block>,
1162 ) {
1163 log::debug!(target: LOG_TARGET, "update_view_with_fork tree_route: {:?} {tree_route:?}", view.at);
1164 let api = self.api.clone();
1165
1166 let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
1169
1170 future::join_all(
1171 tree_route
1172 .enacted()
1173 .iter()
1174 .map(|h| crate::prune_known_txs_for_block(h, &*api, &view.pool)),
1175 )
1176 .await
1177 .into_iter()
1178 .for_each(|enacted_log| {
1179 pruned_log.extend(enacted_log);
1180 });
1181
1182 {
1184 let mut resubmit_transactions = Vec::new();
1185
1186 for retracted in tree_route.retracted() {
1187 let hash = retracted.hash;
1188
1189 let block_transactions = api
1190 .block_body(hash)
1191 .await
1192 .unwrap_or_else(|e| {
1193 log::warn!(target: LOG_TARGET, "Failed to fetch block body: {}", e);
1194 None
1195 })
1196 .unwrap_or_default()
1197 .into_iter();
1198
1199 let mut resubmitted_to_report = 0;
1200
1201 resubmit_transactions.extend(
1202 block_transactions
1203 .into_iter()
1204 .map(|tx| (self.hash_of(&tx), tx))
1205 .filter(|(tx_hash, _)| {
1206 let contains = pruned_log.contains(&tx_hash);
1207
1208 resubmitted_to_report += 1;
1210
1211 if !contains {
1212 log::trace!(
1213 target: LOG_TARGET,
1214 "[{:?}]: Resubmitting from retracted block {:?}",
1215 tx_hash,
1216 hash,
1217 );
1218 }
1219 !contains
1220 })
1221 .map(|(tx_hash, tx)| {
1222 self.mempool.get_by_hash(tx_hash).unwrap_or_else(|| Arc::from(tx))
1224 }),
1225 );
1226
1227 self.metrics.report(|metrics| {
1228 metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
1229 });
1230 }
1231
1232 let _ = view
1233 .pool
1234 .resubmit_at(
1235 &hash_and_number,
1236 TransactionSource::External,
1239 resubmit_transactions,
1240 )
1241 .await;
1242 }
1243 }
1244
1245 async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
1251 let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
1252 log::debug!(target: LOG_TARGET, "handle_finalized {finalized_number:?} tree_route: {tree_route:?} views_count:{}", self.active_views_count());
1253
1254 let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
1255
1256 self.mempool.purge_finalized_transactions(&finalized_xts).await;
1257 self.import_notification_sink.clean_notified_items(&finalized_xts);
1258
1259 self.metrics
1260 .report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
1261
1262 if let Ok(Some(finalized_number)) = finalized_number {
1263 self.revalidation_queue
1264 .revalidate_mempool(
1265 self.mempool.clone(),
1266 HashAndNumber { hash: finalized_hash, number: finalized_number },
1267 )
1268 .await;
1269 } else {
1270 log::trace!(target: LOG_TARGET, "purge_transactions_later skipped, cannot find block number {finalized_number:?}");
1271 }
1272
1273 self.ready_poll.lock().remove_cancelled();
1274 log::trace!(target: LOG_TARGET, "handle_finalized after views_count:{:?}", self.active_views_count());
1275 }
1276
1277 fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1279 self.api.hash_and_length(xt).0
1280 }
1281}
1282
1283#[async_trait]
1284impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
1285where
1286 Block: BlockT,
1287 ChainApi: 'static + graph::ChainApi<Block = Block>,
1288 <Block as BlockT>::Hash: Unpin,
1289{
1290 async fn maintain(&self, event: ChainEvent<Self::Block>) {
1292 let start = Instant::now();
1293 log::debug!(target: LOG_TARGET, "processing event: {event:?}");
1294
1295 self.view_store.finish_background_revalidations().await;
1296
1297 let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
1298
1299 let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
1300 match self.api.tree_route(from, to) {
1301 Ok(tree_route) => Ok(tree_route),
1302 Err(e) =>
1303 return Err(format!(
1304 "Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
1305 )),
1306 }
1307 };
1308 let block_id_to_number =
1309 |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
1310
1311 let result =
1312 self.enactment_state
1313 .lock()
1314 .update(&event, &compute_tree_route, &block_id_to_number);
1315
1316 match result {
1317 Err(msg) => {
1318 log::trace!(target: LOG_TARGET, "enactment_state::update error: {msg}");
1319 self.enactment_state.lock().force_update(&event);
1320 },
1321 Ok(EnactmentAction::Skip) => return,
1322 Ok(EnactmentAction::HandleFinalization) => {
1323 },
1331 Ok(EnactmentAction::HandleEnactment(tree_route)) => {
1332 if matches!(event, ChainEvent::Finalized { .. }) {
1333 self.view_store.handle_pre_finalized(event.hash()).await;
1334 };
1335 self.handle_new_block(&tree_route).await;
1336 },
1337 };
1338
1339 match event {
1340 ChainEvent::NewBestBlock { .. } => {},
1341 ChainEvent::Finalized { hash, ref tree_route } => {
1342 self.handle_finalized(hash, tree_route).await;
1343
1344 log::trace!(
1345 target: LOG_TARGET,
1346 "on-finalized enacted: {tree_route:?}, previously finalized: \
1347 {prev_finalized_block:?}",
1348 );
1349 },
1350 }
1351
1352 let maintain_duration = start.elapsed();
1353
1354 log::info!(
1355 target: LOG_TARGET,
1356 "maintain: txs:{:?} views:[{};{:?}] event:{event:?} took:{:?}",
1357 self.mempool_len(),
1358 self.active_views_count(),
1359 self.views_stats(),
1360 maintain_duration
1361 );
1362
1363 self.metrics.report(|metrics| {
1364 let (unwatched, watched) = self.mempool_len();
1365 let _ = (
1366 self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
1367 self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
1368 watched.try_into().map(|v| metrics.watched_txs.set(v)),
1369 unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
1370 );
1371 metrics.maintain_duration.observe(maintain_duration.as_secs_f64());
1372 });
1373 }
1374}
1375
1376impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
1377where
1378 Block: BlockT,
1379 Client: sp_api::ProvideRuntimeApi<Block>
1380 + sc_client_api::BlockBackend<Block>
1381 + sc_client_api::blockchain::HeaderBackend<Block>
1382 + sp_runtime::traits::BlockIdTo<Block>
1383 + sc_client_api::ExecutorProvider<Block>
1384 + sc_client_api::UsageProvider<Block>
1385 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
1386 + Send
1387 + Sync
1388 + 'static,
1389 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
1390 <Block as BlockT>::Hash: std::marker::Unpin,
1391{
1392 pub fn new_full(
1394 options: Options,
1395 is_validator: IsValidator,
1396 prometheus: Option<&PrometheusRegistry>,
1397 spawner: impl SpawnEssentialNamed,
1398 client: Arc<Client>,
1399 ) -> Self {
1400 let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
1401 let pool = Self::new_with_background_worker(
1402 options,
1403 is_validator,
1404 pool_api,
1405 prometheus,
1406 spawner,
1407 client.usage_info().chain.best_hash,
1408 client.usage_info().chain.finalized_hash,
1409 );
1410
1411 pool
1412 }
1413}
1414
1415#[cfg(test)]
1416mod reduce_multiview_result_tests {
1417 use super::*;
1418 use sp_core::H256;
1419 #[derive(Debug, PartialEq, Clone)]
1420 enum Error {
1421 Custom(u8),
1422 }
1423
1424 #[test]
1425 fn empty() {
1426 sp_tracing::try_init_simple();
1427 let input = HashMap::default();
1428 let r = reduce_multiview_result::<H256, Error>(input);
1429 assert!(r.is_empty());
1430 }
1431
1432 #[test]
1433 fn errors_only() {
1434 sp_tracing::try_init_simple();
1435 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
1436 (
1437 H256::repeat_byte(0x13),
1438 vec![
1439 Err(Error::Custom(10)),
1440 Err(Error::Custom(11)),
1441 Err(Error::Custom(12)),
1442 Err(Error::Custom(13)),
1443 ],
1444 ),
1445 (
1446 H256::repeat_byte(0x14),
1447 vec![
1448 Err(Error::Custom(20)),
1449 Err(Error::Custom(21)),
1450 Err(Error::Custom(22)),
1451 Err(Error::Custom(23)),
1452 ],
1453 ),
1454 (
1455 H256::repeat_byte(0x15),
1456 vec![
1457 Err(Error::Custom(30)),
1458 Err(Error::Custom(31)),
1459 Err(Error::Custom(32)),
1460 Err(Error::Custom(33)),
1461 ],
1462 ),
1463 ];
1464 let input = HashMap::from_iter(v.clone());
1465 let r = reduce_multiview_result(input);
1466
1467 assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
1469 }
1470
1471 #[test]
1472 #[should_panic]
1473 #[cfg(debug_assertions)]
1474 fn invalid_lengths() {
1475 sp_tracing::try_init_simple();
1476 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
1477 (H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
1478 (H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
1479 ];
1480 let input = HashMap::from_iter(v);
1481 let _ = reduce_multiview_result(input);
1482 }
1483
1484 #[test]
1485 fn only_hashes() {
1486 sp_tracing::try_init_simple();
1487
1488 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
1489 (
1490 H256::repeat_byte(0x13),
1491 vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
1492 ),
1493 (
1494 H256::repeat_byte(0x14),
1495 vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
1496 ),
1497 ];
1498 let input = HashMap::from_iter(v);
1499 let r = reduce_multiview_result(input);
1500
1501 assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
1502 }
1503
1504 #[test]
1505 fn one_view() {
1506 sp_tracing::try_init_simple();
1507 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
1508 H256::repeat_byte(0x13),
1509 vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
1510 )];
1511 let input = HashMap::from_iter(v);
1512 let r = reduce_multiview_result(input);
1513
1514 assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
1515 }
1516
1517 #[test]
1518 fn mix() {
1519 sp_tracing::try_init_simple();
1520 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
1521 (
1522 H256::repeat_byte(0x13),
1523 vec![
1524 Ok(H256::repeat_byte(0x10)),
1525 Err(Error::Custom(11)),
1526 Err(Error::Custom(12)),
1527 Err(Error::Custom(33)),
1528 ],
1529 ),
1530 (
1531 H256::repeat_byte(0x14),
1532 vec![
1533 Err(Error::Custom(20)),
1534 Ok(H256::repeat_byte(0x21)),
1535 Err(Error::Custom(22)),
1536 Err(Error::Custom(33)),
1537 ],
1538 ),
1539 (
1540 H256::repeat_byte(0x15),
1541 vec![
1542 Err(Error::Custom(30)),
1543 Err(Error::Custom(31)),
1544 Ok(H256::repeat_byte(0x32)),
1545 Err(Error::Custom(33)),
1546 ],
1547 ),
1548 ];
1549 let input = HashMap::from_iter(v);
1550 let r = reduce_multiview_result(input);
1551
1552 assert_eq!(
1553 r,
1554 vec![
1555 Ok(H256::repeat_byte(0x10)),
1556 Ok(H256::repeat_byte(0x21)),
1557 Ok(H256::repeat_byte(0x32)),
1558 Err(Error::Custom(33))
1559 ]
1560 );
1561 }
1562}