sc_transaction_pool/common/
api.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Chain api required for the transaction pool.
20
21use crate::LOG_TARGET;
22use codec::Encode;
23use futures::{
24	channel::{mpsc, oneshot},
25	future::{ready, Future, FutureExt, Ready},
26	lock::Mutex,
27	SinkExt, StreamExt,
28};
29use std::{marker::PhantomData, pin::Pin, sync::Arc};
30
31use prometheus_endpoint::Registry as PrometheusRegistry;
32use sc_client_api::{blockchain::HeaderBackend, BlockBackend};
33use sp_api::{ApiExt, ProvideRuntimeApi};
34use sp_blockchain::{HeaderMetadata, TreeRoute};
35use sp_core::traits::SpawnEssentialNamed;
36use sp_runtime::{
37	generic::BlockId,
38	traits::{self, Block as BlockT, BlockIdTo},
39	transaction_validity::{TransactionSource, TransactionValidity},
40};
41use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
42
43use super::{
44	error::{self, Error},
45	metrics::{ApiMetrics, ApiMetricsExt},
46};
47use crate::graph;
48
49/// The transaction pool logic for full client.
50pub struct FullChainApi<Client, Block> {
51	client: Arc<Client>,
52	_marker: PhantomData<Block>,
53	metrics: Option<Arc<ApiMetrics>>,
54	validation_pool: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
55}
56
57/// Spawn a validation task that will be used by the transaction pool to validate transactions.
58fn spawn_validation_pool_task(
59	name: &'static str,
60	receiver: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
61	spawner: &impl SpawnEssentialNamed,
62) {
63	spawner.spawn_essential_blocking(
64		name,
65		Some("transaction-pool"),
66		async move {
67			loop {
68				let task = receiver.lock().await.next().await;
69				match task {
70					None => return,
71					Some(task) => task.await,
72				}
73			}
74		}
75		.boxed(),
76	);
77}
78
79impl<Client, Block> FullChainApi<Client, Block> {
80	/// Create new transaction pool logic.
81	pub fn new(
82		client: Arc<Client>,
83		prometheus: Option<&PrometheusRegistry>,
84		spawner: &impl SpawnEssentialNamed,
85	) -> Self {
86		let metrics = prometheus.map(ApiMetrics::register).and_then(|r| match r {
87			Err(err) => {
88				log::warn!(
89					target: LOG_TARGET,
90					"Failed to register transaction pool api prometheus metrics: {:?}",
91					err,
92				);
93				None
94			},
95			Ok(api) => Some(Arc::new(api)),
96		});
97
98		let (sender, receiver) = mpsc::channel(0);
99
100		let receiver = Arc::new(Mutex::new(receiver));
101		spawn_validation_pool_task("transaction-pool-task-0", receiver.clone(), spawner);
102		spawn_validation_pool_task("transaction-pool-task-1", receiver, spawner);
103
104		FullChainApi { client, validation_pool: sender, _marker: Default::default(), metrics }
105	}
106}
107
108impl<Client, Block> graph::ChainApi for FullChainApi<Client, Block>
109where
110	Block: BlockT,
111	Client: ProvideRuntimeApi<Block>
112		+ BlockBackend<Block>
113		+ BlockIdTo<Block>
114		+ HeaderBackend<Block>
115		+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
116	Client: Send + Sync + 'static,
117	Client::Api: TaggedTransactionQueue<Block>,
118{
119	type Block = Block;
120	type Error = error::Error;
121	type ValidationFuture =
122		Pin<Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>>;
123	type BodyFuture = Ready<error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>;
124
125	fn block_body(&self, hash: Block::Hash) -> Self::BodyFuture {
126		ready(self.client.block_body(hash).map_err(error::Error::from))
127	}
128
129	fn validate_transaction(
130		&self,
131		at: <Self::Block as BlockT>::Hash,
132		source: TransactionSource,
133		uxt: graph::ExtrinsicFor<Self>,
134	) -> Self::ValidationFuture {
135		let (tx, rx) = oneshot::channel();
136		let client = self.client.clone();
137		let mut validation_pool = self.validation_pool.clone();
138		let metrics = self.metrics.clone();
139
140		async move {
141			metrics.report(|m| m.validations_scheduled.inc());
142
143			{
144				validation_pool
145					.send(
146						async move {
147							let res = validate_transaction_blocking(&*client, at, source, uxt);
148							let _ = tx.send(res);
149							metrics.report(|m| m.validations_finished.inc());
150						}
151						.boxed(),
152					)
153					.await
154					.map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
155			}
156
157			match rx.await {
158				Ok(r) => r,
159				Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
160			}
161		}
162		.boxed()
163	}
164
165	/// Validates a transaction by calling into the runtime.
166	///
167	/// Same as `validate_transaction` but blocks the current thread when performing validation.
168	fn validate_transaction_blocking(
169		&self,
170		at: Block::Hash,
171		source: TransactionSource,
172		uxt: graph::ExtrinsicFor<Self>,
173	) -> error::Result<TransactionValidity> {
174		validate_transaction_blocking(&*self.client, at, source, uxt)
175	}
176
177	fn block_id_to_number(
178		&self,
179		at: &BlockId<Self::Block>,
180	) -> error::Result<Option<graph::NumberFor<Self>>> {
181		self.client.to_number(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
182	}
183
184	fn block_id_to_hash(
185		&self,
186		at: &BlockId<Self::Block>,
187	) -> error::Result<Option<graph::BlockHash<Self>>> {
188		self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
189	}
190
191	fn hash_and_length(
192		&self,
193		ex: &graph::RawExtrinsicFor<Self>,
194	) -> (graph::ExtrinsicHash<Self>, usize) {
195		ex.using_encoded(|x| (<traits::HashingFor<Block> as traits::Hash>::hash(x), x.len()))
196	}
197
198	fn block_header(
199		&self,
200		hash: <Self::Block as BlockT>::Hash,
201	) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
202		self.client.header(hash).map_err(Into::into)
203	}
204
205	fn tree_route(
206		&self,
207		from: <Self::Block as BlockT>::Hash,
208		to: <Self::Block as BlockT>::Hash,
209	) -> Result<TreeRoute<Self::Block>, Self::Error> {
210		sp_blockchain::tree_route::<Block, Client>(&*self.client, from, to).map_err(Into::into)
211	}
212}
213
214/// Helper function to validate a transaction using a full chain API.
215/// This method will call into the runtime to perform the validation.
216fn validate_transaction_blocking<Client, Block>(
217	client: &Client,
218	at: Block::Hash,
219	source: TransactionSource,
220	uxt: graph::ExtrinsicFor<FullChainApi<Client, Block>>,
221) -> error::Result<TransactionValidity>
222where
223	Block: BlockT,
224	Client: ProvideRuntimeApi<Block>
225		+ BlockBackend<Block>
226		+ BlockIdTo<Block>
227		+ HeaderBackend<Block>
228		+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
229	Client: Send + Sync + 'static,
230	Client::Api: TaggedTransactionQueue<Block>,
231{
232	let s = std::time::Instant::now();
233	let h = uxt.using_encoded(|x| <traits::HashingFor<Block> as traits::Hash>::hash(x));
234
235	let result = sp_tracing::within_span!(sp_tracing::Level::TRACE, "validate_transaction";
236	{
237		let runtime_api = client.runtime_api();
238		let api_version = sp_tracing::within_span! { sp_tracing::Level::TRACE, "check_version";
239			runtime_api
240				.api_version::<dyn TaggedTransactionQueue<Block>>(at)
241				.map_err(|e| Error::RuntimeApi(e.to_string()))?
242				.ok_or_else(|| Error::RuntimeApi(
243					format!("Could not find `TaggedTransactionQueue` api for block `{:?}`.", at)
244				))
245		}?;
246
247		use sp_api::Core;
248
249		sp_tracing::within_span!(
250			sp_tracing::Level::TRACE, "runtime::validate_transaction";
251		{
252			if api_version >= 3 {
253				runtime_api.validate_transaction(at, source, (*uxt).clone(), at)
254					.map_err(|e| Error::RuntimeApi(e.to_string()))
255			} else {
256				let block_number = client.to_number(&BlockId::Hash(at))
257					.map_err(|e| Error::RuntimeApi(e.to_string()))?
258					.ok_or_else(||
259						Error::RuntimeApi(format!("Could not get number for block `{:?}`.", at))
260					)?;
261
262				// The old versions require us to call `initialize_block` before.
263				runtime_api.initialize_block(at, &sp_runtime::traits::Header::new(
264					block_number + sp_runtime::traits::One::one(),
265					Default::default(),
266					Default::default(),
267					at,
268					Default::default()),
269				).map_err(|e| Error::RuntimeApi(e.to_string()))?;
270
271				if api_version == 2 {
272					#[allow(deprecated)] // old validate_transaction
273					runtime_api.validate_transaction_before_version_3(at, source, (*uxt).clone())
274						.map_err(|e| Error::RuntimeApi(e.to_string()))
275				} else {
276					#[allow(deprecated)] // old validate_transaction
277					runtime_api.validate_transaction_before_version_2(at, (*uxt).clone())
278						.map_err(|e| Error::RuntimeApi(e.to_string()))
279				}
280			}
281		})
282	});
283	log::trace!(target: LOG_TARGET, "[{h:?}] validate_transaction_blocking: at:{at:?} took:{:?}", s.elapsed());
284
285	result
286}