sc_transaction_pool_api/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Transaction pool client facing API.
20#![warn(missing_docs)]
21
22pub mod error;
23
24use async_trait::async_trait;
25use codec::Codec;
26use futures::{Future, Stream};
27use serde::{de::DeserializeOwned, Deserialize, Serialize};
28use sp_core::offchain::TransactionPoolExt;
29use sp_runtime::traits::{Block as BlockT, Member};
30use std::{collections::HashMap, hash::Hash, marker::PhantomData, pin::Pin, sync::Arc};
31
32const LOG_TARGET: &str = "txpool::api";
33
34pub use sp_runtime::transaction_validity::{
35	TransactionLongevity, TransactionPriority, TransactionSource, TransactionTag,
36};
37
38/// Transaction pool status.
39#[derive(Debug, Clone)]
40pub struct PoolStatus {
41	/// Number of transactions in the ready queue.
42	pub ready: usize,
43	/// Sum of bytes of ready transaction encodings.
44	pub ready_bytes: usize,
45	/// Number of transactions in the future queue.
46	pub future: usize,
47	/// Sum of bytes of ready transaction encodings.
48	pub future_bytes: usize,
49}
50
51impl PoolStatus {
52	/// Returns true if there are no transactions in the pool.
53	pub fn is_empty(&self) -> bool {
54		self.ready == 0 && self.future == 0
55	}
56}
57
58/// Possible transaction status events.
59///
60/// These events are being emitted by `TransactionPool` watchers,
61/// which are also exposed over RPC.
62///
63/// The status events can be grouped based on their kinds as:
64/// 1. Entering/Moving within the pool:
65/// 		- [Future](TransactionStatus::Future)
66/// 		- [Ready](TransactionStatus::Ready)
67/// 2. Inside `Ready` queue:
68/// 		- [Broadcast](TransactionStatus::Broadcast)
69/// 3. Leaving the pool:
70/// 		- [InBlock](TransactionStatus::InBlock)
71/// 		- [Invalid](TransactionStatus::Invalid)
72/// 		- [Usurped](TransactionStatus::Usurped)
73/// 		- [Dropped](TransactionStatus::Dropped)
74/// 	4. Re-entering the pool:
75/// 		- [Retracted](TransactionStatus::Retracted)
76/// 	5. Block finalized:
77/// 		- [Finalized](TransactionStatus::Finalized)
78/// 		- [FinalityTimeout](TransactionStatus::FinalityTimeout)
79///
80/// Transactions are first placed in either the `Ready` or `Future` queues of the transaction pool.
81/// Substrate validates the transaction before it enters the pool.
82///
83/// A transaction is placed in the `Future` queue if it will become valid at a future time.
84/// For example, submitting a transaction with a higher account nonce than the current
85/// expected nonce will place the transaction in the `Future` queue.
86///
87/// The events will always be received in the order described above, however
88/// there might be cases where transactions alternate between `Future` and `Ready`
89/// pool, and are `Broadcast` in the meantime.
90///
91/// There is also only single event causing the transaction to leave the pool.
92/// I.e. only one of the listed ones should be triggered.
93///
94/// Note that there are conditions that may cause transactions to reappear in the pool.
95/// 1. Due to possible forks, the transaction that ends up being in included
96/// in one block, may later re-enter the pool or be marked as invalid.
97/// 2. Transaction `Dropped` at one point, may later re-enter the pool if some other
98/// transactions are removed. A `Dropped` transaction may re-enter the pool only if it is
99/// resubmitted.
100/// 3. `Invalid` transaction may become valid at some point in the future.
101/// (Note that runtimes are encouraged to use `UnknownValidity` to inform the pool about
102/// such case). An `Invalid` transaction may re-enter the pool only if it is resubmitted.
103/// 4. `Retracted` transactions might be included in some next block.
104///
105/// The `FinalityTimeout` event will be emitted when the block did not reach finality
106/// within 512 blocks. This either indicates that finality is not available for your chain,
107/// or that finality gadget is lagging behind. If you choose to wait for finality longer, you can
108/// re-subscribe for a particular transaction hash manually again.
109///
110/// ### Last Event
111///
112/// The stream is considered finished when one of the following events happen:
113/// - [Finalized](TransactionStatus::Finalized)
114/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
115/// - [Usurped](TransactionStatus::Usurped)
116/// - [Invalid](TransactionStatus::Invalid)
117/// - [Dropped](TransactionStatus::Dropped)
118///
119/// See [`TransactionStatus::is_final`] for more details.
120///
121/// ### Resubmit Transactions
122///
123/// Users might resubmit the transaction at a later time for the following events:
124/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
125/// - [Invalid](TransactionStatus::Invalid)
126/// - [Dropped](TransactionStatus::Dropped)
127///
128/// See [`TransactionStatus::is_retriable`] for more details.
129#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
130#[serde(rename_all = "camelCase")]
131pub enum TransactionStatus<Hash, BlockHash> {
132	/// Transaction is part of the future queue.
133	Future,
134	/// Transaction is part of the ready queue.
135	Ready,
136	/// The transaction has been broadcast to the given peers.
137	Broadcast(Vec<String>),
138	/// Transaction has been included in block with given hash
139	/// at the given position.
140	#[serde(with = "v1_compatible")]
141	InBlock((BlockHash, TxIndex)),
142	/// The block this transaction was included in has been retracted.
143	Retracted(BlockHash),
144	/// Maximum number of finality watchers has been reached,
145	/// old watchers are being removed.
146	FinalityTimeout(BlockHash),
147	/// Transaction has been finalized by a finality-gadget, e.g. GRANDPA.
148	#[serde(with = "v1_compatible")]
149	Finalized((BlockHash, TxIndex)),
150	/// Transaction has been replaced in the pool, by another transaction
151	/// that provides the same tags. (e.g. same (sender, nonce)).
152	Usurped(Hash),
153	/// Transaction has been dropped from the pool because of the limit.
154	Dropped,
155	/// Transaction is no longer valid in the current state.
156	Invalid,
157}
158
159impl<Hash, BlockHash> TransactionStatus<Hash, BlockHash> {
160	/// Returns true if this is the last event emitted by [`TransactionStatusStream`].
161	pub fn is_final(&self) -> bool {
162		// The state must be kept in sync with `crate::graph::Sender`.
163		match self {
164			Self::Usurped(_) |
165			Self::Finalized(_) |
166			Self::FinalityTimeout(_) |
167			Self::Invalid |
168			Self::Dropped => true,
169			_ => false,
170		}
171	}
172
173	/// Returns true if the transaction could be re-submitted to the pool in the future.
174	///
175	/// For example, `TransactionStatus::Dropped` is retriable, because the transaction
176	/// may enter the pool if there is space for it in the future.
177	pub fn is_retriable(&self) -> bool {
178		match self {
179			// The number of finality watchers has been reached.
180			Self::FinalityTimeout(_) |
181			// An invalid transaction might be valid at a later time.
182			Self::Invalid |
183			// The transaction was dropped because of the limits of the pool.
184			// It can reenter the pool when other transactions are removed / finalized.
185			Self::Dropped => true,
186			_ => false,
187		}
188	}
189}
190
191/// The stream of transaction events.
192pub type TransactionStatusStream<Hash, BlockHash> =
193	dyn Stream<Item = TransactionStatus<Hash, BlockHash>> + Send;
194
195/// The import notification event stream.
196pub type ImportNotificationStream<H> = futures::channel::mpsc::Receiver<H>;
197
198/// Transaction hash type for a pool.
199pub type TxHash<P> = <P as TransactionPool>::Hash;
200/// Block hash type for a pool.
201pub type BlockHash<P> = <<P as TransactionPool>::Block as BlockT>::Hash;
202/// Transaction type for a pool.
203pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsic;
204/// Type of transactions event stream for a pool.
205pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
206/// Transaction type for a local pool.
207pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;
208/// Transaction's index within the block in which it was included.
209pub type TxIndex = usize;
210
211/// Typical future type used in transaction pool api.
212pub type PoolFuture<T, E> = std::pin::Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;
213
214/// In-pool transaction interface.
215///
216/// The pool is container of transactions that are implementing this trait.
217/// See `sp_runtime::ValidTransaction` for details about every field.
218pub trait InPoolTransaction {
219	/// Transaction type.
220	type Transaction;
221	/// Transaction hash type.
222	type Hash;
223
224	/// Get the reference to the transaction data.
225	fn data(&self) -> &Self::Transaction;
226	/// Get hash of the transaction.
227	fn hash(&self) -> &Self::Hash;
228	/// Get priority of the transaction.
229	fn priority(&self) -> &TransactionPriority;
230	/// Get longevity of the transaction.
231	fn longevity(&self) -> &TransactionLongevity;
232	/// Get transaction dependencies.
233	fn requires(&self) -> &[TransactionTag];
234	/// Get tags that transaction provides.
235	fn provides(&self) -> &[TransactionTag];
236	/// Return a flag indicating if the transaction should be propagated to other peers.
237	fn is_propagable(&self) -> bool;
238}
239
240/// Transaction pool interface.
241pub trait TransactionPool: Send + Sync {
242	/// Block type.
243	type Block: BlockT;
244	/// Transaction hash type.
245	type Hash: Hash + Eq + Member + Serialize + DeserializeOwned + Codec;
246	/// In-pool transaction type.
247	type InPoolTransaction: InPoolTransaction<
248		Transaction = Arc<TransactionFor<Self>>,
249		Hash = TxHash<Self>,
250	>;
251	/// Error type.
252	type Error: From<crate::error::Error> + crate::error::IntoPoolError;
253
254	// *** RPC
255
256	/// Returns a future that imports a bunch of unverified transactions to the pool.
257	fn submit_at(
258		&self,
259		at: <Self::Block as BlockT>::Hash,
260		source: TransactionSource,
261		xts: Vec<TransactionFor<Self>>,
262	) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>;
263
264	/// Returns a future that imports one unverified transaction to the pool.
265	fn submit_one(
266		&self,
267		at: <Self::Block as BlockT>::Hash,
268		source: TransactionSource,
269		xt: TransactionFor<Self>,
270	) -> PoolFuture<TxHash<Self>, Self::Error>;
271
272	/// Returns a future that imports a single transaction and starts to watch their progress in the
273	/// pool.
274	fn submit_and_watch(
275		&self,
276		at: <Self::Block as BlockT>::Hash,
277		source: TransactionSource,
278		xt: TransactionFor<Self>,
279	) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error>;
280
281	// *** Block production / Networking
282	/// Get an iterator for ready transactions ordered by priority.
283	///
284	/// Guarantees to return only when transaction pool got updated at `at` block.
285	/// Guarantees to return immediately when `None` is passed.
286	fn ready_at(
287		&self,
288		at: <Self::Block as BlockT>::Hash,
289	) -> Pin<
290		Box<
291			dyn Future<
292					Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
293				> + Send,
294		>,
295	>;
296
297	/// Get an iterator for ready transactions ordered by priority.
298	fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
299
300	// *** Block production
301	/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
302	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>>;
303
304	// *** logging
305	/// Get futures transaction list.
306	fn futures(&self) -> Vec<Self::InPoolTransaction>;
307
308	/// Returns pool status.
309	fn status(&self) -> PoolStatus;
310
311	// *** logging / RPC / networking
312	/// Return an event stream of transactions imported to the pool.
313	fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>>;
314
315	// *** networking
316	/// Notify the pool about transactions broadcast.
317	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>);
318
319	/// Returns transaction hash
320	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self>;
321
322	/// Return specific ready transaction by hash, if there is one.
323	fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
324
325	/// Returns set of ready transaction at given block within given timeout.
326	///
327	/// If the timeout is hit during method execution then the best effort set of ready transactions
328	/// for given block, without executing full maintain process is returned.
329	fn ready_at_with_timeout(
330		&self,
331		at: <Self::Block as BlockT>::Hash,
332		timeout: std::time::Duration,
333	) -> Pin<
334		Box<
335			dyn Future<
336					Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
337				> + Send
338				+ '_,
339		>,
340	>;
341}
342
343/// An iterator of ready transactions.
344///
345/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting
346/// last-returned element as invalid.
347///
348/// The implementation is then allowed, for performance reasons, to change the elements
349/// returned next, by e.g.  skipping elements that are known to depend on the reported
350/// transaction, which yields them invalid as well.
351pub trait ReadyTransactions: Iterator {
352	/// Report given transaction as invalid.
353	///
354	/// This might affect subsequent elements returned by the iterator, so dependent transactions
355	/// are skipped for performance reasons.
356	fn report_invalid(&mut self, _tx: &Self::Item);
357}
358
359/// A no-op implementation for an empty iterator.
360impl<T> ReadyTransactions for std::iter::Empty<T> {
361	fn report_invalid(&mut self, _tx: &T) {}
362}
363
364/// Events that the transaction pool listens for.
365#[derive(Debug)]
366pub enum ChainEvent<B: BlockT> {
367	/// New best block have been added to the chain.
368	NewBestBlock {
369		/// Hash of the block.
370		hash: B::Hash,
371		/// Tree route from old best to new best parent that was calculated on import.
372		///
373		/// If `None`, no re-org happened on import.
374		tree_route: Option<Arc<sp_blockchain::TreeRoute<B>>>,
375	},
376	/// An existing block has been finalized.
377	Finalized {
378		/// Hash of just finalized block.
379		hash: B::Hash,
380		/// Path from old finalized to new finalized parent.
381		tree_route: Arc<[B::Hash]>,
382	},
383}
384
385impl<B: BlockT> ChainEvent<B> {
386	/// Returns the block hash associated to the event.
387	pub fn hash(&self) -> B::Hash {
388		match self {
389			Self::NewBestBlock { hash, .. } | Self::Finalized { hash, .. } => *hash,
390		}
391	}
392
393	/// Is `self == Self::Finalized`?
394	pub fn is_finalized(&self) -> bool {
395		matches!(self, Self::Finalized { .. })
396	}
397}
398
399/// Trait for transaction pool maintenance.
400#[async_trait]
401pub trait MaintainedTransactionPool: TransactionPool {
402	/// Perform maintenance
403	async fn maintain(&self, event: ChainEvent<Self::Block>);
404}
405
406/// Transaction pool interface for submitting local transactions that exposes a
407/// blocking interface for submission.
408pub trait LocalTransactionPool: Send + Sync {
409	/// Block type.
410	type Block: BlockT;
411	/// Transaction hash type.
412	type Hash: Hash + Eq + Member + Serialize;
413	/// Error type.
414	type Error: From<crate::error::Error> + crate::error::IntoPoolError;
415
416	/// Submits the given local unverified transaction to the pool blocking the
417	/// current thread for any necessary pre-verification.
418	/// NOTE: It MUST NOT be used for transactions that originate from the
419	/// network or RPC, since the validation is performed with
420	/// `TransactionSource::Local`.
421	fn submit_local(
422		&self,
423		at: <Self::Block as BlockT>::Hash,
424		xt: LocalTransactionFor<Self>,
425	) -> Result<Self::Hash, Self::Error>;
426}
427
428impl<T: LocalTransactionPool> LocalTransactionPool for Arc<T> {
429	type Block = T::Block;
430
431	type Hash = T::Hash;
432
433	type Error = T::Error;
434
435	fn submit_local(
436		&self,
437		at: <Self::Block as BlockT>::Hash,
438		xt: LocalTransactionFor<Self>,
439	) -> Result<Self::Hash, Self::Error> {
440		(**self).submit_local(at, xt)
441	}
442}
443
444/// An abstraction for [`LocalTransactionPool`]
445///
446/// We want to use a transaction pool in [`OffchainTransactionPoolFactory`] in a `Arc` without
447/// bleeding the associated types besides the `Block`. Thus, this abstraction here exists to achieve
448/// the wrapping in a `Arc`.
449trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
450	/// Submit transaction.
451	///
452	/// The transaction will end up in the pool and be propagated to others.
453	fn submit_at(&self, at: Block::Hash, extrinsic: Block::Extrinsic) -> Result<(), ()>;
454}
455
456impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
457	fn submit_at(
458		&self,
459		at: <TPool::Block as BlockT>::Hash,
460		extrinsic: <TPool::Block as BlockT>::Extrinsic,
461	) -> Result<(), ()> {
462		log::trace!(
463			target: LOG_TARGET,
464			"(offchain call) Submitting a transaction to the pool: {:?}",
465			extrinsic
466		);
467
468		let result = self.submit_local(at, extrinsic);
469
470		result.map(|_| ()).map_err(|e| {
471			log::warn!(
472				target: LOG_TARGET,
473				"(offchain call) Error submitting a transaction to the pool: {}",
474				e
475			)
476		})
477	}
478}
479
480/// Factory for creating [`TransactionPoolExt`]s.
481///
482/// This provides an easy way for creating [`TransactionPoolExt`] extensions for registering them in
483/// the wasm execution environment to send transactions from an offchain call to the  runtime.
484#[derive(Clone)]
485pub struct OffchainTransactionPoolFactory<Block: BlockT> {
486	pool: Arc<dyn OffchainSubmitTransaction<Block>>,
487}
488
489impl<Block: BlockT> OffchainTransactionPoolFactory<Block> {
490	/// Creates a new instance using the given `tx_pool`.
491	pub fn new<T: LocalTransactionPool<Block = Block> + 'static>(tx_pool: T) -> Self {
492		Self { pool: Arc::new(tx_pool) as Arc<_> }
493	}
494
495	/// Returns an instance of [`TransactionPoolExt`] bound to the given `block_hash`.
496	///
497	/// Transactions that are being submitted by this instance will be submitted with `block_hash`
498	/// as context for validation.
499	pub fn offchain_transaction_pool(&self, block_hash: Block::Hash) -> TransactionPoolExt {
500		TransactionPoolExt::new(OffchainTransactionPool { pool: self.pool.clone(), block_hash })
501	}
502}
503
504/// Wraps a `pool` and `block_hash` to implement [`sp_core::offchain::TransactionPool`].
505struct OffchainTransactionPool<Block: BlockT> {
506	block_hash: Block::Hash,
507	pool: Arc<dyn OffchainSubmitTransaction<Block>>,
508}
509
510impl<Block: BlockT> sp_core::offchain::TransactionPool for OffchainTransactionPool<Block> {
511	fn submit_transaction(&mut self, extrinsic: Vec<u8>) -> Result<(), ()> {
512		let extrinsic = match codec::Decode::decode(&mut &extrinsic[..]) {
513			Ok(t) => t,
514			Err(e) => {
515				log::error!(
516					target: LOG_TARGET,
517					"Failed to decode extrinsic in `OffchainTransactionPool::submit_transaction`: {e:?}"
518				);
519
520				return Err(())
521			},
522		};
523
524		self.pool.submit_at(self.block_hash, extrinsic)
525	}
526}
527
528/// Wrapper functions to keep the API backwards compatible over the wire for the old RPC spec.
529mod v1_compatible {
530	use serde::{Deserialize, Deserializer, Serialize, Serializer};
531
532	pub fn serialize<S, H>(data: &(H, usize), serializer: S) -> Result<S::Ok, S::Error>
533	where
534		S: Serializer,
535		H: Serialize,
536	{
537		let (hash, _) = data;
538		serde::Serialize::serialize(&hash, serializer)
539	}
540
541	pub fn deserialize<'de, D, H>(deserializer: D) -> Result<(H, usize), D::Error>
542	where
543		D: Deserializer<'de>,
544		H: Deserialize<'de>,
545	{
546		let hash: H = serde::Deserialize::deserialize(deserializer)?;
547		Ok((hash, 0))
548	}
549}
550
551/// Transaction pool that rejects all submitted transactions.
552///
553/// Could be used for example in tests.
554pub struct RejectAllTxPool<Block>(PhantomData<Block>);
555
556impl<Block> Default for RejectAllTxPool<Block> {
557	fn default() -> Self {
558		Self(PhantomData)
559	}
560}
561
562impl<Block: BlockT> LocalTransactionPool for RejectAllTxPool<Block> {
563	type Block = Block;
564
565	type Hash = Block::Hash;
566
567	type Error = error::Error;
568
569	fn submit_local(&self, _: Block::Hash, _: Block::Extrinsic) -> Result<Self::Hash, Self::Error> {
570		Err(error::Error::ImmediatelyDropped)
571	}
572}
573
574#[cfg(test)]
575mod tests {
576	use super::*;
577
578	#[test]
579	fn tx_status_compatibility() {
580		let event: TransactionStatus<u8, u8> = TransactionStatus::InBlock((1, 2));
581		let ser = serde_json::to_string(&event).unwrap();
582
583		let exp = r#"{"inBlock":1}"#;
584		assert_eq!(ser, exp);
585
586		let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
587		assert_eq!(event_dec, TransactionStatus::InBlock((1, 0)));
588
589		let event: TransactionStatus<u8, u8> = TransactionStatus::Finalized((1, 2));
590		let ser = serde_json::to_string(&event).unwrap();
591
592		let exp = r#"{"finalized":1}"#;
593		assert_eq!(ser, exp);
594
595		let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
596		assert_eq!(event_dec, TransactionStatus::Finalized((1, 0)));
597	}
598}