1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
mod storage_wrapper;
mod vm_wrapper;
use crate::{
adapter_common::{preprocess_transaction, PreprocessedTransaction},
aptos_vm::AptosVM,
parallel_executor::vm_wrapper::AptosVMWrapper,
};
use aptos_parallel_executor::{
errors::Error,
executor::ParallelTransactionExecutor,
task::{Transaction as PTransaction, TransactionOutput as PTransactionOutput},
};
use aptos_state_view::StateView;
use aptos_types::{
state_store::state_key::StateKey,
transaction::{Transaction, TransactionOutput, TransactionStatus},
write_set::{WriteOp, WriteSet},
};
use move_deps::move_core_types::vm_status::{StatusCode, VMStatus};
use rayon::prelude::*;
impl PTransaction for PreprocessedTransaction {
type Key = StateKey;
type Value = WriteOp;
}
pub(crate) struct AptosTransactionOutput(TransactionOutput);
impl AptosTransactionOutput {
pub fn new(output: TransactionOutput) -> Self {
Self(output)
}
pub fn into(self) -> TransactionOutput {
self.0
}
}
impl PTransactionOutput for AptosTransactionOutput {
type T = PreprocessedTransaction;
fn get_writes(&self) -> Vec<(StateKey, WriteOp)> {
self.0.write_set().iter().cloned().collect()
}
fn skip_output() -> Self {
Self(TransactionOutput::new(
WriteSet::default(),
vec![],
0,
TransactionStatus::Retry,
))
}
}
pub struct ParallelAptosVM();
impl ParallelAptosVM {
pub fn execute_block<S: StateView>(
transactions: Vec<Transaction>,
state_view: &S,
concurrency_level: usize,
) -> Result<(Vec<TransactionOutput>, Option<Error<VMStatus>>), VMStatus> {
let signature_verified_block: Vec<PreprocessedTransaction> = transactions
.par_iter()
.map(|txn| preprocess_transaction::<AptosVM>(txn.clone()))
.collect();
match ParallelTransactionExecutor::<PreprocessedTransaction, AptosVMWrapper<S>>::new(
concurrency_level,
)
.execute_transactions_parallel(state_view, signature_verified_block)
{
Ok(results) => Ok((
results
.into_iter()
.map(AptosTransactionOutput::into)
.collect(),
None,
)),
Err(err @ Error::InferencerError) | Err(err @ Error::UnestimatedWrite) => {
let output = AptosVM::execute_block_and_keep_vm_status(transactions, state_view)?;
Ok((
output
.into_iter()
.map(|(_vm_status, txn_output)| txn_output)
.collect(),
Some(err),
))
}
Err(Error::InvariantViolation) => Err(VMStatus::Error(
StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR,
)),
Err(Error::UserError(err)) => Err(err),
}
}
}