sc_transaction_pool/common/
api.rs1use crate::LOG_TARGET;
22use codec::Encode;
23use futures::{
24 channel::{mpsc, oneshot},
25 future::{ready, Future, FutureExt, Ready},
26 lock::Mutex,
27 SinkExt, StreamExt,
28};
29use std::{marker::PhantomData, pin::Pin, sync::Arc};
30
31use prometheus_endpoint::Registry as PrometheusRegistry;
32use sc_client_api::{blockchain::HeaderBackend, BlockBackend};
33use sp_api::{ApiExt, ProvideRuntimeApi};
34use sp_blockchain::{HeaderMetadata, TreeRoute};
35use sp_core::traits::SpawnEssentialNamed;
36use sp_runtime::{
37 generic::BlockId,
38 traits::{self, Block as BlockT, BlockIdTo},
39 transaction_validity::{TransactionSource, TransactionValidity},
40};
41use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
42
43use super::{
44 error::{self, Error},
45 metrics::{ApiMetrics, ApiMetricsExt},
46};
47use crate::graph;
48
49pub struct FullChainApi<Client, Block> {
51 client: Arc<Client>,
52 _marker: PhantomData<Block>,
53 metrics: Option<Arc<ApiMetrics>>,
54 validation_pool: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
55}
56
57fn spawn_validation_pool_task(
59 name: &'static str,
60 receiver: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
61 spawner: &impl SpawnEssentialNamed,
62) {
63 spawner.spawn_essential_blocking(
64 name,
65 Some("transaction-pool"),
66 async move {
67 loop {
68 let task = receiver.lock().await.next().await;
69 match task {
70 None => return,
71 Some(task) => task.await,
72 }
73 }
74 }
75 .boxed(),
76 );
77}
78
79impl<Client, Block> FullChainApi<Client, Block> {
80 pub fn new(
82 client: Arc<Client>,
83 prometheus: Option<&PrometheusRegistry>,
84 spawner: &impl SpawnEssentialNamed,
85 ) -> Self {
86 let metrics = prometheus.map(ApiMetrics::register).and_then(|r| match r {
87 Err(err) => {
88 log::warn!(
89 target: LOG_TARGET,
90 "Failed to register transaction pool api prometheus metrics: {:?}",
91 err,
92 );
93 None
94 },
95 Ok(api) => Some(Arc::new(api)),
96 });
97
98 let (sender, receiver) = mpsc::channel(0);
99
100 let receiver = Arc::new(Mutex::new(receiver));
101 spawn_validation_pool_task("transaction-pool-task-0", receiver.clone(), spawner);
102 spawn_validation_pool_task("transaction-pool-task-1", receiver, spawner);
103
104 FullChainApi { client, validation_pool: sender, _marker: Default::default(), metrics }
105 }
106}
107
108impl<Client, Block> graph::ChainApi for FullChainApi<Client, Block>
109where
110 Block: BlockT,
111 Client: ProvideRuntimeApi<Block>
112 + BlockBackend<Block>
113 + BlockIdTo<Block>
114 + HeaderBackend<Block>
115 + HeaderMetadata<Block, Error = sp_blockchain::Error>,
116 Client: Send + Sync + 'static,
117 Client::Api: TaggedTransactionQueue<Block>,
118{
119 type Block = Block;
120 type Error = error::Error;
121 type ValidationFuture =
122 Pin<Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>>;
123 type BodyFuture = Ready<error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>;
124
125 fn block_body(&self, hash: Block::Hash) -> Self::BodyFuture {
126 ready(self.client.block_body(hash).map_err(error::Error::from))
127 }
128
129 fn validate_transaction(
130 &self,
131 at: <Self::Block as BlockT>::Hash,
132 source: TransactionSource,
133 uxt: graph::ExtrinsicFor<Self>,
134 ) -> Self::ValidationFuture {
135 let (tx, rx) = oneshot::channel();
136 let client = self.client.clone();
137 let mut validation_pool = self.validation_pool.clone();
138 let metrics = self.metrics.clone();
139
140 async move {
141 metrics.report(|m| m.validations_scheduled.inc());
142
143 {
144 validation_pool
145 .send(
146 async move {
147 let res = validate_transaction_blocking(&*client, at, source, uxt);
148 let _ = tx.send(res);
149 metrics.report(|m| m.validations_finished.inc());
150 }
151 .boxed(),
152 )
153 .await
154 .map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
155 }
156
157 match rx.await {
158 Ok(r) => r,
159 Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
160 }
161 }
162 .boxed()
163 }
164
165 fn validate_transaction_blocking(
169 &self,
170 at: Block::Hash,
171 source: TransactionSource,
172 uxt: graph::ExtrinsicFor<Self>,
173 ) -> error::Result<TransactionValidity> {
174 validate_transaction_blocking(&*self.client, at, source, uxt)
175 }
176
177 fn block_id_to_number(
178 &self,
179 at: &BlockId<Self::Block>,
180 ) -> error::Result<Option<graph::NumberFor<Self>>> {
181 self.client.to_number(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
182 }
183
184 fn block_id_to_hash(
185 &self,
186 at: &BlockId<Self::Block>,
187 ) -> error::Result<Option<graph::BlockHash<Self>>> {
188 self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
189 }
190
191 fn hash_and_length(
192 &self,
193 ex: &graph::RawExtrinsicFor<Self>,
194 ) -> (graph::ExtrinsicHash<Self>, usize) {
195 ex.using_encoded(|x| (<traits::HashingFor<Block> as traits::Hash>::hash(x), x.len()))
196 }
197
198 fn block_header(
199 &self,
200 hash: <Self::Block as BlockT>::Hash,
201 ) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
202 self.client.header(hash).map_err(Into::into)
203 }
204
205 fn tree_route(
206 &self,
207 from: <Self::Block as BlockT>::Hash,
208 to: <Self::Block as BlockT>::Hash,
209 ) -> Result<TreeRoute<Self::Block>, Self::Error> {
210 sp_blockchain::tree_route::<Block, Client>(&*self.client, from, to).map_err(Into::into)
211 }
212}
213
214fn validate_transaction_blocking<Client, Block>(
217 client: &Client,
218 at: Block::Hash,
219 source: TransactionSource,
220 uxt: graph::ExtrinsicFor<FullChainApi<Client, Block>>,
221) -> error::Result<TransactionValidity>
222where
223 Block: BlockT,
224 Client: ProvideRuntimeApi<Block>
225 + BlockBackend<Block>
226 + BlockIdTo<Block>
227 + HeaderBackend<Block>
228 + HeaderMetadata<Block, Error = sp_blockchain::Error>,
229 Client: Send + Sync + 'static,
230 Client::Api: TaggedTransactionQueue<Block>,
231{
232 let s = std::time::Instant::now();
233 let h = uxt.using_encoded(|x| <traits::HashingFor<Block> as traits::Hash>::hash(x));
234
235 let result = sp_tracing::within_span!(sp_tracing::Level::TRACE, "validate_transaction";
236 {
237 let runtime_api = client.runtime_api();
238 let api_version = sp_tracing::within_span! { sp_tracing::Level::TRACE, "check_version";
239 runtime_api
240 .api_version::<dyn TaggedTransactionQueue<Block>>(at)
241 .map_err(|e| Error::RuntimeApi(e.to_string()))?
242 .ok_or_else(|| Error::RuntimeApi(
243 format!("Could not find `TaggedTransactionQueue` api for block `{:?}`.", at)
244 ))
245 }?;
246
247 use sp_api::Core;
248
249 sp_tracing::within_span!(
250 sp_tracing::Level::TRACE, "runtime::validate_transaction";
251 {
252 if api_version >= 3 {
253 runtime_api.validate_transaction(at, source, (*uxt).clone(), at)
254 .map_err(|e| Error::RuntimeApi(e.to_string()))
255 } else {
256 let block_number = client.to_number(&BlockId::Hash(at))
257 .map_err(|e| Error::RuntimeApi(e.to_string()))?
258 .ok_or_else(||
259 Error::RuntimeApi(format!("Could not get number for block `{:?}`.", at))
260 )?;
261
262 runtime_api.initialize_block(at, &sp_runtime::traits::Header::new(
264 block_number + sp_runtime::traits::One::one(),
265 Default::default(),
266 Default::default(),
267 at,
268 Default::default()),
269 ).map_err(|e| Error::RuntimeApi(e.to_string()))?;
270
271 if api_version == 2 {
272 #[allow(deprecated)] runtime_api.validate_transaction_before_version_3(at, source, (*uxt).clone())
274 .map_err(|e| Error::RuntimeApi(e.to_string()))
275 } else {
276 #[allow(deprecated)] runtime_api.validate_transaction_before_version_2(at, (*uxt).clone())
278 .map_err(|e| Error::RuntimeApi(e.to_string()))
279 }
280 }
281 })
282 });
283 log::trace!(target: LOG_TARGET, "[{h:?}] validate_transaction_blocking: at:{at:?} took:{:?}", s.elapsed());
284
285 result
286}