1use super::{metrics::MetricsLink as PrometheusMetrics, revalidation};
22pub use crate::{
23 api::FullChainApi,
24 graph::{ChainApi, ValidatedTransaction},
25};
26use crate::{
27 common::{
28 enactment_state::{EnactmentAction, EnactmentState},
29 error,
30 log_xt::log_xt_trace,
31 },
32 graph,
33 graph::{ExtrinsicHash, IsValidator},
34 PolledIterator, ReadyIteratorFor, LOG_TARGET,
35};
36use async_trait::async_trait;
37use futures::{channel::oneshot, future, prelude::*, Future, FutureExt};
38use parking_lot::Mutex;
39use prometheus_endpoint::Registry as PrometheusRegistry;
40use sc_transaction_pool_api::{
41 error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
42 PoolFuture, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
43 TransactionStatusStreamFor, TxHash,
44};
45use sp_blockchain::{HashAndNumber, TreeRoute};
46use sp_core::traits::SpawnEssentialNamed;
47use sp_runtime::{
48 generic::BlockId,
49 traits::{AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, Zero},
50};
51use std::{
52 collections::{HashMap, HashSet},
53 pin::Pin,
54 sync::Arc,
55 time::Instant,
56};
57use tokio::select;
58
59pub struct BasicPool<PoolApi, Block>
61where
62 Block: BlockT,
63 PoolApi: graph::ChainApi<Block = Block>,
64{
65 pool: Arc<graph::Pool<PoolApi>>,
66 api: Arc<PoolApi>,
67 revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
68 revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
69 ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
70 metrics: PrometheusMetrics,
71 enactment_state: Arc<Mutex<EnactmentState<Block>>>,
72}
73
74struct ReadyPoll<T, Block: BlockT> {
75 updated_at: NumberFor<Block>,
76 pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
77}
78
79impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
80 fn default() -> Self {
81 Self { updated_at: NumberFor::<Block>::zero(), pollers: Default::default() }
82 }
83}
84
85impl<T, Block: BlockT> ReadyPoll<T, Block> {
86 fn new(best_block_number: NumberFor<Block>) -> Self {
87 Self { updated_at: best_block_number, pollers: Default::default() }
88 }
89
90 fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
91 self.updated_at = number;
92
93 let mut idx = 0;
94 while idx < self.pollers.len() {
95 if self.pollers[idx].0 <= number {
96 let poller_sender = self.pollers.swap_remove(idx);
97 log::trace!(target: LOG_TARGET, "Sending ready signal at block {}", number);
98 let _ = poller_sender.1.send(iterator_factory());
99 } else {
100 idx += 1;
101 }
102 }
103 }
104
105 fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
106 let (sender, receiver) = oneshot::channel();
107 self.pollers.push((number, sender));
108 receiver
109 }
110
111 fn updated_at(&self) -> NumberFor<Block> {
112 self.updated_at
113 }
114}
115
116pub enum RevalidationType {
118 Light,
125
126 Full,
131}
132
133impl<PoolApi, Block> BasicPool<PoolApi, Block>
134where
135 Block: BlockT,
136 PoolApi: graph::ChainApi<Block = Block> + 'static,
137{
138 pub fn new_test(
140 pool_api: Arc<PoolApi>,
141 best_block_hash: Block::Hash,
142 finalized_hash: Block::Hash,
143 options: graph::Options,
144 ) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
145 let pool = Arc::new(graph::Pool::new(options, true.into(), pool_api.clone()));
146 let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
147 pool_api.clone(),
148 pool.clone(),
149 finalized_hash,
150 );
151 (
152 Self {
153 api: pool_api,
154 pool,
155 revalidation_queue: Arc::new(revalidation_queue),
156 revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
157 ready_poll: Default::default(),
158 metrics: Default::default(),
159 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
160 best_block_hash,
161 finalized_hash,
162 ))),
163 },
164 background_task,
165 )
166 }
167
168 pub fn with_revalidation_type(
171 options: graph::Options,
172 is_validator: IsValidator,
173 pool_api: Arc<PoolApi>,
174 prometheus: Option<&PrometheusRegistry>,
175 revalidation_type: RevalidationType,
176 spawner: impl SpawnEssentialNamed,
177 best_block_number: NumberFor<Block>,
178 best_block_hash: Block::Hash,
179 finalized_hash: Block::Hash,
180 ) -> Self {
181 let pool = Arc::new(graph::Pool::new(options, is_validator, pool_api.clone()));
182 let (revalidation_queue, background_task) = match revalidation_type {
183 RevalidationType::Light =>
184 (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
185 RevalidationType::Full => {
186 let (queue, background) = revalidation::RevalidationQueue::new_background(
187 pool_api.clone(),
188 pool.clone(),
189 finalized_hash,
190 );
191 (queue, Some(background))
192 },
193 };
194
195 if let Some(background_task) = background_task {
196 spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task);
197 }
198
199 Self {
200 api: pool_api,
201 pool,
202 revalidation_queue: Arc::new(revalidation_queue),
203 revalidation_strategy: Arc::new(Mutex::new(match revalidation_type {
204 RevalidationType::Light =>
205 RevalidationStrategy::Light(RevalidationStatus::NotScheduled),
206 RevalidationType::Full => RevalidationStrategy::Always,
207 })),
208 ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
209 metrics: PrometheusMetrics::new(prometheus),
210 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
211 best_block_hash,
212 finalized_hash,
213 ))),
214 }
215 }
216
217 pub fn pool(&self) -> &Arc<graph::Pool<PoolApi>> {
219 &self.pool
220 }
221
222 pub fn api(&self) -> &PoolApi {
224 &self.api
225 }
226
227 fn ready_at_with_timeout_internal(
228 &self,
229 at: Block::Hash,
230 timeout: std::time::Duration,
231 ) -> PolledIterator<PoolApi> {
232 let timeout = futures_timer::Delay::new(timeout);
233 let ready_maintained = self.ready_at(at);
234 let ready_current = self.ready();
235
236 let ready = async {
237 select! {
238 ready = ready_maintained => ready,
239 _ = timeout => ready_current
240 }
241 };
242
243 Box::pin(ready)
244 }
245}
246
247impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
248where
249 Block: BlockT,
250 PoolApi: 'static + graph::ChainApi<Block = Block>,
251{
252 type Block = PoolApi::Block;
253 type Hash = graph::ExtrinsicHash<PoolApi>;
254 type InPoolTransaction =
255 graph::base_pool::Transaction<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
256 type Error = PoolApi::Error;
257
258 fn submit_at(
259 &self,
260 at: <Self::Block as BlockT>::Hash,
261 source: TransactionSource,
262 xts: Vec<TransactionFor<Self>>,
263 ) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
264 let pool = self.pool.clone();
265 let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
266
267 self.metrics
268 .report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64));
269
270 let number = self.api.resolve_block_number(at);
271 async move {
272 let at = HashAndNumber { hash: at, number: number? };
273 Ok(pool.submit_at(&at, source, xts).await)
274 }
275 .boxed()
276 }
277
278 fn submit_one(
279 &self,
280 at: <Self::Block as BlockT>::Hash,
281 source: TransactionSource,
282 xt: TransactionFor<Self>,
283 ) -> PoolFuture<TxHash<Self>, Self::Error> {
284 let pool = self.pool.clone();
285 let xt = Arc::from(xt);
286
287 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
288
289 let number = self.api.resolve_block_number(at);
290 async move {
291 let at = HashAndNumber { hash: at, number: number? };
292 pool.submit_one(&at, source, xt).await
293 }
294 .boxed()
295 }
296
297 fn submit_and_watch(
298 &self,
299 at: <Self::Block as BlockT>::Hash,
300 source: TransactionSource,
301 xt: TransactionFor<Self>,
302 ) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
303 let pool = self.pool.clone();
304 let xt = Arc::from(xt);
305
306 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
307
308 let number = self.api.resolve_block_number(at);
309
310 async move {
311 let at = HashAndNumber { hash: at, number: number? };
312 let watcher = pool.submit_and_watch(&at, source, xt).await?;
313
314 Ok(watcher.into_stream().boxed())
315 }
316 .boxed()
317 }
318
319 fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
320 let removed = self.pool.validated_pool().remove_invalid(hashes);
321 self.metrics
322 .report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
323 removed
324 }
325
326 fn status(&self) -> PoolStatus {
327 self.pool.validated_pool().status()
328 }
329
330 fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
331 self.pool.validated_pool().import_notification_stream()
332 }
333
334 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
335 self.pool.hash_of(xt)
336 }
337
338 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
339 self.pool.validated_pool().on_broadcasted(propagations)
340 }
341
342 fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
343 self.pool.validated_pool().ready_by_hash(hash)
344 }
345
346 fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> PolledIterator<PoolApi> {
347 let Ok(at) = self.api.resolve_block_number(at) else {
348 return async { Box::new(std::iter::empty()) as Box<_> }.boxed()
349 };
350
351 let status = self.status();
352 if status.ready == 0 && status.future == 0 {
357 return async { Box::new(std::iter::empty()) as Box<_> }.boxed()
358 }
359
360 if self.ready_poll.lock().updated_at() >= at {
361 log::trace!(target: LOG_TARGET, "Transaction pool already processed block #{}", at);
362 let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
363 return async move { iterator }.boxed()
364 }
365
366 self.ready_poll
367 .lock()
368 .add(at)
369 .map(|received| {
370 received.unwrap_or_else(|e| {
371 log::warn!(target: LOG_TARGET, "Error receiving pending set: {:?}", e);
372 Box::new(std::iter::empty())
373 })
374 })
375 .boxed()
376 }
377
378 fn ready(&self) -> ReadyIteratorFor<PoolApi> {
379 Box::new(self.pool.validated_pool().ready())
380 }
381
382 fn futures(&self) -> Vec<Self::InPoolTransaction> {
383 let pool = self.pool.validated_pool().pool.read();
384 pool.futures().cloned().collect::<Vec<_>>()
385 }
386
387 fn ready_at_with_timeout(
388 &self,
389 at: <Self::Block as BlockT>::Hash,
390 timeout: std::time::Duration,
391 ) -> PolledIterator<PoolApi> {
392 self.ready_at_with_timeout_internal(at, timeout)
393 }
394}
395
396impl<Block, Client> BasicPool<FullChainApi<Client, Block>, Block>
397where
398 Block: BlockT,
399 Client: sp_api::ProvideRuntimeApi<Block>
400 + sc_client_api::BlockBackend<Block>
401 + sc_client_api::blockchain::HeaderBackend<Block>
402 + sp_runtime::traits::BlockIdTo<Block>
403 + sc_client_api::ExecutorProvider<Block>
404 + sc_client_api::UsageProvider<Block>
405 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
406 + Send
407 + Sync
408 + 'static,
409 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
410{
411 pub fn new_full(
413 options: graph::Options,
414 is_validator: IsValidator,
415 prometheus: Option<&PrometheusRegistry>,
416 spawner: impl SpawnEssentialNamed,
417 client: Arc<Client>,
418 ) -> Self {
419 let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
420 let pool = Self::with_revalidation_type(
421 options,
422 is_validator,
423 pool_api,
424 prometheus,
425 RevalidationType::Full,
426 spawner,
427 client.usage_info().chain.best_number,
428 client.usage_info().chain.best_hash,
429 client.usage_info().chain.finalized_hash,
430 );
431
432 pool
433 }
434}
435
436impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
437 for BasicPool<FullChainApi<Client, Block>, Block>
438where
439 Block: BlockT,
440 Client: sp_api::ProvideRuntimeApi<Block>
441 + sc_client_api::BlockBackend<Block>
442 + sc_client_api::blockchain::HeaderBackend<Block>
443 + sp_runtime::traits::BlockIdTo<Block>
444 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
445 Client: Send + Sync + 'static,
446 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
447{
448 type Block = Block;
449 type Hash = graph::ExtrinsicHash<FullChainApi<Client, Block>>;
450 type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
451
452 fn submit_local(
453 &self,
454 at: Block::Hash,
455 xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
456 ) -> Result<Self::Hash, Self::Error> {
457 use sp_runtime::{
458 traits::SaturatedConversion, transaction_validity::TransactionValidityError,
459 };
460
461 let validity = self
462 .api
463 .validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))?
464 .map_err(|e| {
465 Self::Error::Pool(match e {
466 TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
467 TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
468 })
469 })?;
470
471 let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
472 let block_number = self
473 .api
474 .block_id_to_number(&BlockId::hash(at))?
475 .ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;
476
477 let validated = ValidatedTransaction::valid_at(
478 block_number.saturated_into::<u64>(),
479 hash,
480 TransactionSource::Local,
481 Arc::from(xt),
482 bytes,
483 validity,
484 );
485
486 self.pool.validated_pool().submit(vec![validated]).remove(0)
487 }
488}
489
490#[cfg_attr(test, derive(Debug))]
491enum RevalidationStatus<N> {
492 NotScheduled,
494 Scheduled(Option<Instant>, Option<N>),
496 InProgress,
498}
499
500enum RevalidationStrategy<N> {
501 Always,
502 Light(RevalidationStatus<N>),
503}
504
505struct RevalidationAction {
506 revalidate: bool,
507 resubmit: bool,
508}
509
510impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
511 pub fn clear(&mut self) {
512 if let Self::Light(status) = self {
513 status.clear()
514 }
515 }
516
517 pub fn next(
518 &mut self,
519 block: N,
520 revalidate_time_period: Option<std::time::Duration>,
521 revalidate_block_period: Option<N>,
522 ) -> RevalidationAction {
523 match self {
524 Self::Light(status) => RevalidationAction {
525 revalidate: status.next_required(
526 block,
527 revalidate_time_period,
528 revalidate_block_period,
529 ),
530 resubmit: false,
531 },
532 Self::Always => RevalidationAction { revalidate: true, resubmit: true },
533 }
534 }
535}
536
537impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
538 pub fn clear(&mut self) {
540 *self = Self::NotScheduled;
541 }
542
543 pub fn next_required(
545 &mut self,
546 block: N,
547 revalidate_time_period: Option<std::time::Duration>,
548 revalidate_block_period: Option<N>,
549 ) -> bool {
550 match *self {
551 Self::NotScheduled => {
552 *self = Self::Scheduled(
553 revalidate_time_period.map(|period| Instant::now() + period),
554 revalidate_block_period.map(|period| block + period),
555 );
556 false
557 },
558 Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
559 let is_required =
560 revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) ||
561 revalidate_at_block.map(|at| block >= at).unwrap_or(false);
562 if is_required {
563 *self = Self::InProgress;
564 }
565 is_required
566 },
567 Self::InProgress => false,
568 }
569 }
570}
571
572pub async fn prune_known_txs_for_block<Block: BlockT, Api: graph::ChainApi<Block = Block>>(
574 at: &HashAndNumber<Block>,
575 api: &Api,
576 pool: &graph::Pool<Api>,
577) -> Vec<ExtrinsicHash<Api>> {
578 let extrinsics = api
579 .block_body(at.hash)
580 .await
581 .unwrap_or_else(|e| {
582 log::warn!(target: LOG_TARGET, "Prune known transactions: error request: {}", e);
583 None
584 })
585 .unwrap_or_default();
586
587 let hashes = extrinsics.iter().map(|tx| pool.hash_of(tx)).collect::<Vec<_>>();
588
589 let header = match api.block_header(at.hash) {
590 Ok(Some(h)) => h,
591 Ok(None) => {
592 log::trace!(target: LOG_TARGET, "Could not find header for {:?}.", at.hash);
593 return hashes
594 },
595 Err(e) => {
596 log::trace!(target: LOG_TARGET, "Error retrieving header for {:?}: {}", at.hash, e);
597 return hashes
598 },
599 };
600
601 log_xt_trace!(target: LOG_TARGET, &hashes, "[{:?}] Pruning transaction.");
602
603 pool.prune(at, *header.parent_hash(), &extrinsics).await;
604 hashes
605}
606
607impl<PoolApi, Block> BasicPool<PoolApi, Block>
608where
609 Block: BlockT,
610 PoolApi: 'static + graph::ChainApi<Block = Block>,
611{
612 async fn handle_enactment(&self, tree_route: TreeRoute<Block>) {
616 log::trace!(target: LOG_TARGET, "handle_enactment tree_route: {tree_route:?}");
617 let pool = self.pool.clone();
618 let api = self.api.clone();
619
620 let hash_and_number = match tree_route.last() {
621 Some(hash_and_number) => hash_and_number,
622 None => {
623 log::warn!(
624 target: LOG_TARGET,
625 "Skipping ChainEvent - no last block in tree route {:?}",
626 tree_route,
627 );
628 return
629 },
630 };
631
632 let next_action = self.revalidation_strategy.lock().next(
633 hash_and_number.number,
634 Some(std::time::Duration::from_secs(60)),
635 Some(20u32.into()),
636 );
637
638 let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
641
642 for retracted in tree_route.retracted() {
648 pool.validated_pool().on_block_retracted(retracted.hash);
650 }
651
652 future::join_all(
653 tree_route.enacted().iter().map(|h| prune_known_txs_for_block(h, &*api, &*pool)),
654 )
655 .await
656 .into_iter()
657 .for_each(|enacted_log| {
658 pruned_log.extend(enacted_log);
659 });
660
661 self.metrics
662 .report(|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64));
663
664 if next_action.resubmit {
665 let mut resubmit_transactions = Vec::new();
666
667 for retracted in tree_route.retracted() {
668 let hash = retracted.hash;
669
670 let block_transactions = api
671 .block_body(hash)
672 .await
673 .unwrap_or_else(|e| {
674 log::warn!(target: LOG_TARGET, "Failed to fetch block body: {}", e);
675 None
676 })
677 .unwrap_or_default()
678 .into_iter();
679
680 let mut resubmitted_to_report = 0;
681
682 resubmit_transactions.extend(
683 block_transactions.into_iter().map(Arc::from).filter(|tx| {
685 let tx_hash = pool.hash_of(tx);
686 let contains = pruned_log.contains(&tx_hash);
687
688 resubmitted_to_report += 1;
690
691 if !contains {
692 log::trace!(
693 target: LOG_TARGET,
694 "[{:?}]: Resubmitting from retracted block {:?}",
695 tx_hash,
696 hash,
697 );
698 }
699 !contains
700 }),
701 );
702
703 self.metrics.report(|metrics| {
704 metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
705 });
706 }
707
708 pool.resubmit_at(
709 &hash_and_number,
710 TransactionSource::External,
713 resubmit_transactions,
714 )
715 .await;
716 }
717
718 let extra_pool = pool.clone();
719 self.ready_poll
722 .lock()
723 .trigger(hash_and_number.number, move || Box::new(extra_pool.validated_pool().ready()));
724
725 if next_action.revalidate {
726 let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
727 self.revalidation_queue.revalidate_later(hash_and_number.hash, hashes).await;
728
729 self.revalidation_strategy.lock().clear();
730 }
731 }
732}
733
734#[async_trait]
735impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
736where
737 Block: BlockT,
738 PoolApi: 'static + graph::ChainApi<Block = Block>,
739{
740 async fn maintain(&self, event: ChainEvent<Self::Block>) {
741 let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
742 let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
743 match self.api.tree_route(from, to) {
744 Ok(tree_route) => Ok(tree_route),
745 Err(e) =>
746 return Err(format!(
747 "Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
748 )),
749 }
750 };
751 let block_id_to_number =
752 |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
753
754 let result =
755 self.enactment_state
756 .lock()
757 .update(&event, &compute_tree_route, &block_id_to_number);
758
759 match result {
760 Err(msg) => {
761 log::trace!(target: LOG_TARGET, "{msg}");
762 self.enactment_state.lock().force_update(&event);
763 },
764 Ok(EnactmentAction::Skip) => return,
765 Ok(EnactmentAction::HandleFinalization) => {},
766 Ok(EnactmentAction::HandleEnactment(tree_route)) => {
767 self.handle_enactment(tree_route).await;
768 },
769 };
770
771 if let ChainEvent::Finalized { hash, tree_route } = event {
772 log::trace!(
773 target: LOG_TARGET,
774 "on-finalized enacted: {tree_route:?}, previously finalized: \
775 {prev_finalized_block:?}",
776 );
777
778 for hash in tree_route.iter().chain(std::iter::once(&hash)) {
779 if let Err(e) = self.pool.validated_pool().on_block_finalized(*hash).await {
780 log::warn!(
781 target: LOG_TARGET,
782 "Error occurred while attempting to notify watchers about finalization {}: {}",
783 hash, e
784 )
785 }
786 }
787 }
788 }
789}