#[cfg(with_metrics)]
use std::sync::LazyLock;
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};
use async_graphql::SimpleObject;
use futures::stream::{self, StreamExt, TryStreamExt};
use linera_base::{
crypto::CryptoHash,
data_types::{
Amount, ArithmeticError, BlockHeight, OracleResponse, Timestamp, UserApplicationDescription,
},
ensure,
identifiers::{
ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner, StreamId,
UserApplicationId,
},
};
use linera_execution::{
system::OpenChainConfig, ExecutionError, ExecutionOutcome, ExecutionRuntimeContext,
ExecutionStateView, Message, MessageContext, Operation, OperationContext, Query, QueryContext,
RawExecutionOutcome, RawOutgoingMessage, ResourceController, ResourceTracker, Response,
ServiceRuntimeEndpoint, TransactionTracker,
};
use linera_views::{
context::Context,
log_view::LogView,
queue_view::QueueView,
reentrant_collection_view::ReentrantCollectionView,
register_view::RegisterView,
set_view::SetView,
views::{ClonableView, CryptoHashView, RootView, View},
};
use serde::{Deserialize, Serialize};
use crate::{
data_types::{
Block, BlockExecutionOutcome, ChainAndHeight, ChannelFullName, EventRecord, IncomingBundle,
MessageAction, MessageBundle, Origin, OutgoingMessage, PostedMessage, Target, Transaction,
},
inbox::{Cursor, InboxError, InboxStateView},
manager::ChainManager,
outbox::OutboxStateView,
ChainError, ChainExecutionContext,
};
#[cfg(test)]
#[path = "unit_tests/chain_tests.rs"]
mod chain_tests;
#[cfg(with_metrics)]
use {
linera_base::prometheus_util::{self, MeasureLatency},
prometheus::{HistogramVec, IntCounterVec},
};
#[cfg(with_metrics)]
static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
prometheus_util::register_int_counter_vec(
"num_blocks_executed",
"Number of blocks executed",
&[],
)
.expect("Counter creation should not fail")
});
#[cfg(with_metrics)]
static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
prometheus_util::register_histogram_vec(
"block_execution_latency",
"Block execution latency",
&[],
Some(vec![
0.000_1, 0.000_25, 0.000_5, 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
1.0, 2.5, 5.0, 10.0, 25.0, 50.0,
]),
)
.expect("Histogram creation should not fail")
});
#[cfg(with_metrics)]
static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
prometheus_util::register_histogram_vec(
"message_execution_latency",
"Message execution latency",
&[],
Some(vec![
0.000_1, 0.000_25, 0.000_5, 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
1.0, 2.5,
]),
)
.expect("Histogram creation should not fail")
});
#[cfg(with_metrics)]
static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
prometheus_util::register_histogram_vec(
"operation_execution_latency",
"Operation execution latency",
&[],
Some(vec![
0.000_1, 0.000_25, 0.000_5, 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
1.0, 2.5,
]),
)
.expect("Histogram creation should not fail")
});
#[cfg(with_metrics)]
static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
prometheus_util::register_histogram_vec(
"wasm_fuel_used_per_block",
"Wasm fuel used per block",
&[],
Some(vec![
50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10_000.0, 25_000.0, 50_000.0,
100_000.0, 250_000.0, 500_000.0,
]),
)
.expect("Histogram creation should not fail")
});
#[cfg(with_metrics)]
static WASM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
prometheus_util::register_histogram_vec(
"wasm_num_reads_per_block",
"Wasm number of reads per block",
&[],
Some(vec![0.5, 1.0, 2.0, 4.0, 8.0, 15.0, 30.0, 50.0, 100.0]),
)
.expect("Histogram creation should not fail")
});
#[cfg(with_metrics)]
static WASM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
prometheus_util::register_histogram_vec(
"wasm_bytes_read_per_block",
"Wasm number of bytes read per block",
&[],
Some(vec![
0.5,
1.0,
10.0,
100.0,
256.0,
512.0,
1024.0,
2048.0,
4096.0,
8192.0,
16384.0,
65_536.0,
524_288.0,
1_048_576.0,
8_388_608.0,
]),
)
.expect("Histogram creation should not fail")
});
#[cfg(with_metrics)]
static WASM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
prometheus_util::register_histogram_vec(
"wasm_bytes_written_per_block",
"Wasm number of bytes written per block",
&[],
Some(vec![
0.5,
1.0,
10.0,
100.0,
256.0,
512.0,
1024.0,
2048.0,
4096.0,
8192.0,
16384.0,
65_536.0,
524_288.0,
1_048_576.0,
8_388_608.0,
]),
)
.expect("Histogram creation should not fail")
});
#[cfg(with_metrics)]
static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
prometheus_util::register_histogram_vec(
"state_hash_computation_latency",
"Time to recompute the state hash",
&[],
Some(vec![
0.001, 0.003, 0.01, 0.03, 0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1.0, 2.0, 5.0,
]),
)
.expect("Histogram can be created")
});
const EMPTY_EXECUTED_BLOCK_SIZE: usize = 91;
#[derive(Debug, Clone, Serialize, Deserialize, async_graphql::SimpleObject)]
pub struct TimestampedBundleInInbox {
pub entry: BundleInInbox,
pub seen: Timestamp,
}
#[derive(
Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, async_graphql::SimpleObject,
)]
pub struct BundleInInbox {
pub origin: Origin,
pub cursor: Cursor,
}
impl BundleInInbox {
fn new(origin: Origin, bundle: &MessageBundle) -> Self {
BundleInInbox {
cursor: Cursor::from(bundle),
origin,
}
}
}
#[derive(Debug, RootView, ClonableView, SimpleObject)]
#[graphql(cache_control(no_cache))]
pub struct ChainStateView<C>
where
C: Clone + Context + Send + Sync + 'static,
{
pub execution_state: ExecutionStateView<C>,
pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
pub tip_state: RegisterView<C, ChainTipState>,
pub manager: RegisterView<C, ChainManager>,
pub confirmed_log: LogView<C, CryptoHash>,
pub received_log: LogView<C, ChainAndHeight>,
pub inboxes: ReentrantCollectionView<C, Origin, InboxStateView<C>>,
pub unskippable_bundles: QueueView<C, TimestampedBundleInInbox>,
pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
pub outboxes: ReentrantCollectionView<C, Target, OutboxStateView<C>>,
pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
pub channels: ReentrantCollectionView<C, ChannelFullName, ChannelStateView<C>>,
}
#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, SimpleObject)]
pub struct ChainTipState {
pub block_hash: Option<CryptoHash>,
pub next_block_height: BlockHeight,
pub num_incoming_bundles: u32,
pub num_operations: u32,
pub num_outgoing_messages: u32,
}
impl ChainTipState {
pub fn verify_block_chaining(&self, new_block: &Block) -> Result<(), ChainError> {
ensure!(
new_block.height == self.next_block_height,
ChainError::UnexpectedBlockHeight {
expected_block_height: self.next_block_height,
found_block_height: new_block.height
}
);
ensure!(
new_block.previous_block_hash == self.block_hash,
ChainError::UnexpectedPreviousBlockHash
);
Ok(())
}
pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
ensure!(
self.next_block_height >= height,
ChainError::MissingEarlierBlocks {
current_block_height: self.next_block_height,
}
);
Ok(self.next_block_height > height)
}
pub fn is_first_block(&self) -> bool {
self.next_block_height == BlockHeight::ZERO
}
pub fn verify_counters(
&self,
new_block: &Block,
outcome: &BlockExecutionOutcome,
) -> Result<(), ChainError> {
let num_incoming_bundles = u32::try_from(new_block.incoming_bundles.len())
.map_err(|_| ArithmeticError::Overflow)?;
self.num_incoming_bundles
.checked_add(num_incoming_bundles)
.ok_or(ArithmeticError::Overflow)?;
let num_operations =
u32::try_from(new_block.operations.len()).map_err(|_| ArithmeticError::Overflow)?;
self.num_operations
.checked_add(num_operations)
.ok_or(ArithmeticError::Overflow)?;
let num_outgoing_messages =
u32::try_from(outcome.messages.len()).map_err(|_| ArithmeticError::Overflow)?;
self.num_outgoing_messages
.checked_add(num_outgoing_messages)
.ok_or(ArithmeticError::Overflow)?;
Ok(())
}
}
#[derive(Debug, ClonableView, View, SimpleObject)]
pub struct ChannelStateView<C>
where
C: Context + Send + Sync,
{
pub subscribers: SetView<C, ChainId>,
pub block_heights: LogView<C, BlockHeight>,
}
impl<C> ChainStateView<C>
where
C: Context + Clone + Send + Sync + 'static,
C::Extra: ExecutionRuntimeContext,
{
pub fn chain_id(&self) -> ChainId {
self.context().extra().chain_id()
}
pub async fn query_application(
&mut self,
local_time: Timestamp,
query: Query,
service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
) -> Result<Response, ChainError> {
let context = QueryContext {
chain_id: self.chain_id(),
next_block_height: self.tip_state.get().next_block_height,
local_time,
};
let response = self
.execution_state
.query_application(context, query, service_runtime_endpoint)
.await
.map_err(|error| ChainError::ExecutionError(error, ChainExecutionContext::Query))?;
Ok(response)
}
pub async fn describe_application(
&mut self,
application_id: UserApplicationId,
) -> Result<UserApplicationDescription, ChainError> {
self.execution_state
.system
.registry
.describe_application(application_id)
.await
.map_err(|err| {
ChainError::ExecutionError(err.into(), ChainExecutionContext::DescribeApplication)
})
}
pub async fn mark_messages_as_received(
&mut self,
target: &Target,
height: BlockHeight,
) -> Result<bool, ChainError> {
let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
let updates = outbox.mark_messages_as_received(height).await?;
if updates.is_empty() {
return Ok(false);
}
for update in updates {
let counter = self
.outbox_counters
.get_mut()
.get_mut(&update)
.expect("message counter should be present");
*counter = counter
.checked_sub(1)
.expect("message counter should not underflow");
if *counter == 0 {
self.outbox_counters.get_mut().remove(&update);
}
}
if outbox.queue.count() == 0 {
self.outboxes.remove_entry(target)?;
}
Ok(true)
}
pub fn all_messages_delivered_up_to(&mut self, height: BlockHeight) -> bool {
tracing::debug!(
"Messages left in {:.8}'s outbox: {:?}",
self.chain_id(),
self.outbox_counters.get()
);
if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
key > &height
} else {
true
}
}
pub fn is_active(&self) -> bool {
self.execution_state.system.is_active()
}
pub fn is_closed(&self) -> bool {
*self.execution_state.system.closed.get()
}
pub fn ensure_is_active(&self) -> Result<(), ChainError> {
if self.is_active() {
Ok(())
} else {
Err(ChainError::InactiveChain(self.chain_id()))
}
}
pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
let chain_id = self.chain_id();
let pairs = self.inboxes.try_load_all_entries().await?;
let max_stream_queries = self.context().max_stream_queries();
let stream = stream::iter(pairs)
.map(|(origin, inbox)| async move {
if let Some(bundle) = inbox.removed_bundles.front().await? {
return Err(ChainError::MissingCrossChainUpdate {
chain_id,
origin: origin.into(),
height: bundle.height,
});
}
Ok::<(), ChainError>(())
})
.buffer_unordered(max_stream_queries);
stream.try_collect::<Vec<_>>().await?;
Ok(())
}
pub async fn next_block_height_to_receive(
&self,
origin: &Origin,
) -> Result<BlockHeight, ChainError> {
let inbox = self.inboxes.try_load_entry(origin).await?;
match inbox {
Some(inbox) => inbox.next_block_height_to_receive(),
None => Ok(BlockHeight::from(0)),
}
}
pub async fn last_anticipated_block_height(
&self,
origin: &Origin,
) -> Result<Option<BlockHeight>, ChainError> {
let inbox = self.inboxes.try_load_entry(origin).await?;
match inbox {
Some(inbox) => match inbox.removed_bundles.back().await? {
Some(bundle) => Ok(Some(bundle.height)),
None => Ok(None),
},
None => Ok(None),
}
}
pub async fn receive_message_bundle(
&mut self,
origin: &Origin,
bundle: MessageBundle,
local_time: Timestamp,
add_to_received_log: bool,
) -> Result<bool, ChainError> {
assert!(!bundle.messages.is_empty());
let chain_id = self.chain_id();
tracing::trace!(
"Processing new messages to {chain_id:.8} from {origin} at height {}",
bundle.height,
);
let chain_and_height = ChainAndHeight {
chain_id: origin.sender,
height: bundle.height,
};
let mut subscribe_names_and_ids = Vec::new();
let mut unsubscribe_names_and_ids = Vec::new();
for posted_message in &bundle.messages {
if let Some(config) = posted_message.message.matches_open_chain() {
if self.execution_state.system.description.get().is_none() {
let message_id = chain_and_height.to_message_id(posted_message.index);
self.execute_init_message(message_id, config, bundle.timestamp, local_time)
.await?;
}
} else if let Some((id, subscription)) = posted_message.message.matches_subscribe() {
subscribe_names_and_ids.push((subscription.name.clone(), *id));
}
if let Some((id, subscription)) = posted_message.message.matches_unsubscribe() {
unsubscribe_names_and_ids.push((subscription.name.clone(), *id));
}
}
self.process_unsubscribes(unsubscribe_names_and_ids, GenericApplicationId::System)
.await?;
let new_outbox_entries = self
.process_subscribes(subscribe_names_and_ids, GenericApplicationId::System)
.await?;
if bundle.goes_to_inbox() {
let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
let entry = BundleInInbox::new(origin.clone(), &bundle);
let skippable = bundle.is_skippable();
let newly_added = inbox
.add_bundle(bundle)
.await
.map_err(|error| match error {
InboxError::ViewError(error) => ChainError::ViewError(error),
error => ChainError::InternalError(format!(
"while processing messages in certified block: {error}"
)),
})?;
if newly_added && !skippable {
let seen = local_time;
self.unskippable_bundles
.push_back(TimestampedBundleInInbox { entry, seen });
}
}
if add_to_received_log {
self.received_log.push(chain_and_height);
}
Ok(new_outbox_entries)
}
pub async fn execute_init_message(
&mut self,
message_id: MessageId,
config: &OpenChainConfig,
timestamp: Timestamp,
local_time: Timestamp,
) -> Result<bool, ChainError> {
self.execution_state
.system
.initialize_chain(message_id, timestamp, config.clone());
let hash = self.execution_state.crypto_hash().await?;
self.execution_state_hash.set(Some(hash));
let maybe_committee = self.execution_state.system.current_committee().into_iter();
self.manager.get_mut().reset(
self.execution_state.system.ownership.get(),
BlockHeight(0),
local_time,
maybe_committee.flat_map(|(_, committee)| committee.keys_and_weights()),
)?;
Ok(true)
}
pub async fn remove_bundles_from_inboxes(&mut self, block: &Block) -> Result<(), ChainError> {
let chain_id = self.chain_id();
let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
for IncomingBundle { bundle, origin, .. } in &block.incoming_bundles {
ensure!(
bundle.timestamp <= block.timestamp,
ChainError::IncorrectBundleTimestamp {
chain_id,
bundle_timestamp: bundle.timestamp,
block_timestamp: block.timestamp,
}
);
let bundles = bundles_by_origin.entry(origin).or_default();
bundles.push(bundle);
}
let origins = bundles_by_origin.keys().copied();
let inboxes = self.inboxes.try_load_entries_mut(origins).await?;
let mut removed_unskippable = HashSet::new();
for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
tracing::trace!(
"Removing {:?} from {chain_id:.8}'s inbox for {origin:}",
bundles
.iter()
.map(|bundle| bundle.height)
.collect::<Vec<_>>()
);
for bundle in bundles {
let was_present = inbox
.remove_bundle(bundle)
.await
.map_err(|error| ChainError::from((chain_id, origin.clone(), error)))?;
if was_present && !bundle.is_skippable() {
removed_unskippable.insert(BundleInInbox::new(origin.clone(), bundle));
}
}
}
if !removed_unskippable.is_empty() {
let maybe_front = self.unskippable_bundles.front().await?;
if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
self.unskippable_bundles.delete_front();
while let Some(ts_entry) = self.unskippable_bundles.front().await? {
if !removed_unskippable.remove(&ts_entry.entry) {
if !self
.removed_unskippable_bundles
.contains(&ts_entry.entry)
.await?
{
break;
}
self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
}
self.unskippable_bundles.delete_front();
}
}
for entry in removed_unskippable {
self.removed_unskippable_bundles.insert(&entry)?;
}
}
Ok(())
}
pub async fn execute_block(
&mut self,
block: &Block,
local_time: Timestamp,
replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
) -> Result<BlockExecutionOutcome, ChainError> {
#[cfg(with_metrics)]
let _execution_latency = BLOCK_EXECUTION_LATENCY.measure_latency();
let chain_id = self.chain_id();
assert_eq!(block.chain_id, chain_id);
if block.height == BlockHeight::ZERO
&& self
.execution_state
.system
.description
.get()
.map_or(true, |description| description.is_child())
{
let (in_bundle, posted_message, config) = block
.starts_with_open_chain_message()
.ok_or_else(|| ChainError::InactiveChain(chain_id))?;
if !self.is_active() {
let message_id = MessageId {
chain_id: in_bundle.origin.sender,
height: in_bundle.bundle.height,
index: posted_message.index,
};
self.execute_init_message(message_id, config, block.timestamp, local_time)
.await?;
}
}
ensure!(
*self.execution_state.system.timestamp.get() <= block.timestamp,
ChainError::InvalidBlockTimestamp
);
self.execution_state.system.timestamp.set(block.timestamp);
let Some((_, committee)) = self.execution_state.system.current_committee() else {
return Err(ChainError::InactiveChain(chain_id));
};
let mut resource_controller = ResourceController {
policy: Arc::new(committee.policy().clone()),
tracker: ResourceTracker::default(),
account: block.authenticated_signer,
};
resource_controller
.track_executed_block_size(EMPTY_EXECUTED_BLOCK_SIZE)
.and_then(|()| {
resource_controller
.track_executed_block_size_sequence_extension(0, block.incoming_bundles.len())
})
.and_then(|()| {
resource_controller
.track_executed_block_size_sequence_extension(0, block.operations.len())
})
.map_err(|err| ChainError::ExecutionError(err, ChainExecutionContext::Block))?;
if self.is_closed() {
ensure!(
!block.incoming_bundles.is_empty() && block.has_only_rejected_messages(),
ChainError::ClosedChain
);
}
let app_permissions = self.execution_state.system.application_permissions.get();
let mut mandatory = HashSet::<UserApplicationId>::from_iter(
app_permissions.mandatory_applications.iter().cloned(),
);
for operation in &block.operations {
ensure!(
app_permissions.can_execute_operations(&operation.application_id()),
ChainError::AuthorizedApplications(
app_permissions.execute_operations.clone().unwrap()
)
);
if let Operation::User { application_id, .. } = operation {
mandatory.remove(application_id);
}
}
for pending in block.incoming_messages() {
if mandatory.is_empty() {
break;
}
if let Message::User { application_id, .. } = &pending.message {
mandatory.remove(application_id);
}
}
ensure!(
mandatory.is_empty(),
ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
);
let mut replaying_oracle_responses = replaying_oracle_responses.map(Vec::into_iter);
let mut next_message_index = 0;
let mut oracle_responses = Vec::new();
let mut events = Vec::new();
let mut messages = Vec::new();
for (txn_index, transaction) in block.transactions() {
let chain_execution_context = match transaction {
Transaction::ReceiveMessages(_) => ChainExecutionContext::IncomingBundle(txn_index),
Transaction::ExecuteOperation(_) => ChainExecutionContext::Operation(txn_index),
};
let with_context =
|error: ExecutionError| ChainError::ExecutionError(error, chain_execution_context);
let maybe_responses = match replaying_oracle_responses.as_mut().map(Iterator::next) {
Some(Some(responses)) => Some(responses),
Some(None) => return Err(ChainError::MissingOracleResponseList),
None => None,
};
let mut txn_tracker = TransactionTracker::new(next_message_index, maybe_responses);
match transaction {
Transaction::ReceiveMessages(incoming_bundle) => {
resource_controller
.track_executed_block_size_of(&incoming_bundle)
.map_err(with_context)?;
for (message_id, posted_message) in incoming_bundle.messages_and_ids() {
self.execute_message_in_block(
message_id,
posted_message,
incoming_bundle,
block,
txn_index,
local_time,
&mut txn_tracker,
&mut resource_controller,
)
.await?;
}
}
Transaction::ExecuteOperation(operation) => {
resource_controller
.track_executed_block_size_of(&operation)
.map_err(with_context)?;
#[cfg(with_metrics)]
let _operation_latency = OPERATION_EXECUTION_LATENCY.measure_latency();
let context = OperationContext {
chain_id,
height: block.height,
index: Some(txn_index),
authenticated_signer: block.authenticated_signer,
authenticated_caller_id: None,
};
self.execution_state
.execute_operation(
context,
local_time,
operation.clone(),
&mut txn_tracker,
&mut resource_controller,
)
.await
.map_err(with_context)?;
resource_controller
.with_state(&mut self.execution_state)
.await?
.track_operation(operation)
.map_err(with_context)?;
}
}
self.execution_state
.update_execution_outcomes_with_app_registrations(&mut txn_tracker)
.await
.map_err(with_context)?;
let (txn_outcomes, txn_oracle_responses, new_next_message_index) =
txn_tracker.destructure().map_err(with_context)?;
next_message_index = new_next_message_index;
let (txn_messages, txn_events) = self
.process_execution_outcomes(block.height, txn_outcomes)
.await?;
if matches!(
transaction,
Transaction::ExecuteOperation(_)
| Transaction::ReceiveMessages(IncomingBundle {
action: MessageAction::Accept,
..
})
) {
for message_out in &txn_messages {
resource_controller
.with_state(&mut self.execution_state)
.await?
.track_message(&message_out.message)
.map_err(with_context)?;
}
}
resource_controller
.track_executed_block_size_of(&(&txn_oracle_responses, &txn_messages, &txn_events))
.map_err(with_context)?;
resource_controller
.track_executed_block_size_sequence_extension(oracle_responses.len(), 1)
.map_err(with_context)?;
resource_controller
.track_executed_block_size_sequence_extension(messages.len(), 1)
.map_err(with_context)?;
resource_controller
.track_executed_block_size_sequence_extension(events.len(), 1)
.map_err(with_context)?;
oracle_responses.push(txn_oracle_responses);
messages.push(txn_messages);
events.push(txn_events);
}
if !self.is_closed() {
resource_controller
.with_state(&mut self.execution_state)
.await?
.track_block()
.map_err(|err| ChainError::ExecutionError(err, ChainExecutionContext::Block))?;
}
let state_hash = {
#[cfg(with_metrics)]
let _hash_latency = STATE_HASH_COMPUTATION_LATENCY.measure_latency();
self.execution_state.crypto_hash().await?
};
self.execution_state_hash.set(Some(state_hash));
let maybe_committee = self.execution_state.system.current_committee().into_iter();
self.manager.get_mut().reset(
self.execution_state.system.ownership.get(),
block.height.try_add_one()?,
local_time,
maybe_committee.flat_map(|(_, committee)| committee.keys_and_weights()),
)?;
#[cfg(with_metrics)]
{
NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
WASM_FUEL_USED_PER_BLOCK
.with_label_values(&[])
.observe(resource_controller.tracker.fuel as f64);
WASM_NUM_READS_PER_BLOCK
.with_label_values(&[])
.observe(resource_controller.tracker.read_operations as f64);
WASM_BYTES_READ_PER_BLOCK
.with_label_values(&[])
.observe(resource_controller.tracker.bytes_read as f64);
WASM_BYTES_WRITTEN_PER_BLOCK
.with_label_values(&[])
.observe(resource_controller.tracker.bytes_written as f64);
}
assert_eq!(
messages.len(),
block.incoming_bundles.len() + block.operations.len()
);
let outcome = BlockExecutionOutcome {
messages,
state_hash,
oracle_responses,
events,
};
Ok(outcome)
}
#[expect(clippy::too_many_arguments)]
async fn execute_message_in_block(
&mut self,
message_id: MessageId,
posted_message: &PostedMessage,
incoming_bundle: &IncomingBundle,
block: &Block,
txn_index: u32,
local_time: Timestamp,
txn_tracker: &mut TransactionTracker,
resource_controller: &mut ResourceController<Option<Owner>>,
) -> Result<(), ChainError> {
#[cfg(with_metrics)]
let _message_latency = MESSAGE_EXECUTION_LATENCY.measure_latency();
let context = MessageContext {
chain_id: block.chain_id,
is_bouncing: posted_message.is_bouncing(),
height: block.height,
certificate_hash: incoming_bundle.bundle.certificate_hash,
message_id,
authenticated_signer: posted_message.authenticated_signer,
refund_grant_to: posted_message.refund_grant_to,
};
let mut grant = posted_message.grant;
match incoming_bundle.action {
MessageAction::Accept => {
let with_context = |error: ExecutionError| {
let context = ChainExecutionContext::IncomingBundle(txn_index);
ChainError::ExecutionError(error, context)
};
ensure!(!self.is_closed(), ChainError::ClosedChain);
self.execution_state
.execute_message(
context,
local_time,
posted_message.message.clone(),
(grant > Amount::ZERO).then_some(&mut grant),
txn_tracker,
resource_controller,
)
.await
.map_err(with_context)?;
if grant > Amount::ZERO {
if let Some(refund_grant_to) = posted_message.refund_grant_to {
self.execution_state
.send_refund(context, grant, refund_grant_to, txn_tracker)
.await
.map_err(with_context)?;
}
}
}
MessageAction::Reject => {
let with_context = |error: ExecutionError| {
ChainError::ExecutionError(error, ChainExecutionContext::Block)
};
ensure!(
!posted_message.is_protected() || self.is_closed(),
ChainError::CannotRejectMessage {
chain_id: block.chain_id,
origin: Box::new(incoming_bundle.origin.clone()),
posted_message: posted_message.clone(),
}
);
if posted_message.is_tracked() {
self.execution_state
.bounce_message(context, grant, posted_message.message.clone(), txn_tracker)
.await
.map_err(with_context)?;
} else if grant > Amount::ZERO {
let Some(refund_grant_to) = posted_message.refund_grant_to else {
return Err(ChainError::InternalError(
"Messages with grants should have a non-empty `refund_grant_to`".into(),
));
};
self.execution_state
.send_refund(context, posted_message.grant, refund_grant_to, txn_tracker)
.await
.map_err(with_context)?;
}
}
}
Ok(())
}
async fn process_execution_outcomes(
&mut self,
height: BlockHeight,
results: Vec<ExecutionOutcome>,
) -> Result<(Vec<OutgoingMessage>, Vec<EventRecord>), ChainError> {
let mut messages = Vec::new();
let mut events = Vec::new();
for result in results {
match result {
ExecutionOutcome::System(result) => {
self.process_raw_execution_outcome(
GenericApplicationId::System,
Message::System,
&mut messages,
&mut events,
height,
result,
)
.await?;
}
ExecutionOutcome::User(application_id, result) => {
self.process_raw_execution_outcome(
GenericApplicationId::User(application_id),
|bytes| Message::User {
application_id,
bytes,
},
&mut messages,
&mut events,
height,
result,
)
.await?;
}
}
}
Ok((messages, events))
}
async fn process_raw_execution_outcome<E, F>(
&mut self,
application_id: GenericApplicationId,
lift: F,
messages: &mut Vec<OutgoingMessage>,
events: &mut Vec<EventRecord>,
height: BlockHeight,
raw_outcome: RawExecutionOutcome<E, Amount>,
) -> Result<(), ChainError>
where
F: Fn(E) -> Message,
{
events.extend(
raw_outcome
.events
.into_iter()
.map(|(stream_name, key, value)| EventRecord {
stream_id: StreamId {
application_id,
stream_name,
},
key,
value,
}),
);
let max_stream_queries = self.context().max_stream_queries();
let mut recipients = HashSet::new();
let mut channel_broadcasts = HashSet::new();
for RawOutgoingMessage {
destination,
authenticated,
grant,
kind,
message,
} in raw_outcome.messages
{
match &destination {
Destination::Recipient(id) => {
recipients.insert(*id);
}
Destination::Subscribers(name) => {
ensure!(grant == Amount::ZERO, ChainError::GrantUseOnBroadcast);
channel_broadcasts.insert(name.clone());
}
}
let authenticated_signer = raw_outcome.authenticated_signer.filter(|_| authenticated);
let refund_grant_to = raw_outcome.refund_grant_to.filter(|_| grant > Amount::ZERO);
messages.push(OutgoingMessage {
destination,
authenticated_signer,
grant,
refund_grant_to,
kind,
message: lift(message),
});
}
let outbox_counters = self.outbox_counters.get_mut();
let targets = recipients
.into_iter()
.map(Target::chain)
.collect::<Vec<_>>();
let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
for mut outbox in outboxes {
if outbox.schedule_message(height)? {
*outbox_counters.entry(height).or_default() += 1;
}
}
self.process_unsubscribes(raw_outcome.unsubscribe, application_id)
.await?;
let full_names = channel_broadcasts
.into_iter()
.map(|name| ChannelFullName {
application_id,
name,
})
.collect::<Vec<_>>();
let channels = self.channels.try_load_entries_mut(&full_names).await?;
let stream = full_names.into_iter().zip(channels);
let stream = stream::iter(stream)
.map(|(full_name, mut channel)| async move {
let recipients = channel.subscribers.indices().await?;
channel.block_heights.push(height);
let targets = recipients
.into_iter()
.map(|recipient| Target::channel(recipient, full_name.clone()))
.collect::<Vec<_>>();
Ok::<_, ChainError>(targets)
})
.buffer_unordered(max_stream_queries);
let infos = stream.try_collect::<Vec<_>>().await?;
let targets = infos.into_iter().flatten().collect::<Vec<_>>();
let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
let outbox_counters = self.outbox_counters.get_mut();
for mut outbox in outboxes {
if outbox.schedule_message(height)? {
*outbox_counters.entry(height).or_default() += 1;
}
}
self.process_subscribes(raw_outcome.subscribe, application_id)
.await?;
Ok(())
}
async fn process_subscribes(
&mut self,
names_and_ids: Vec<(ChannelName, ChainId)>,
application_id: GenericApplicationId,
) -> Result<bool, ChainError> {
if names_and_ids.is_empty() {
return Ok(false);
}
let full_names = names_and_ids
.iter()
.map(|(name, _)| ChannelFullName {
application_id,
name: name.clone(),
})
.collect::<Vec<_>>();
let channels = self.channels.try_load_entries_mut(&full_names).await?;
let subscribe_channels = names_and_ids.into_iter().zip(channels);
let max_stream_queries = self.context().max_stream_queries();
let stream = stream::iter(subscribe_channels)
.map(|((name, id), mut channel)| async move {
if channel.subscribers.contains(&id).await? {
return Ok(None); }
let full_name = ChannelFullName {
application_id,
name,
};
tracing::trace!("Adding subscriber {id:.8} for {full_name:}");
channel.subscribers.insert(&id)?;
let heights = channel.block_heights.read(..).await?;
if heights.is_empty() {
return Ok(None); }
let target = Target::channel(id, full_name.clone());
Ok::<_, ChainError>(Some((target, heights)))
})
.buffer_unordered(max_stream_queries);
let infos = stream.try_collect::<Vec<_>>().await?;
let (targets, heights): (Vec<_>, Vec<_>) = infos.into_iter().flatten().unzip();
let mut new_outbox_entries = false;
let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
let outbox_counters = self.outbox_counters.get_mut();
for (heights, mut outbox) in heights.into_iter().zip(outboxes) {
for height in heights {
if outbox.schedule_message(height)? {
*outbox_counters.entry(height).or_default() += 1;
new_outbox_entries = true;
}
}
}
Ok(new_outbox_entries)
}
async fn process_unsubscribes(
&mut self,
names_and_ids: Vec<(ChannelName, ChainId)>,
application_id: GenericApplicationId,
) -> Result<(), ChainError> {
if names_and_ids.is_empty() {
return Ok(());
}
let full_names = names_and_ids
.iter()
.map(|(name, _)| ChannelFullName {
application_id,
name: name.clone(),
})
.collect::<Vec<_>>();
let channels = self.channels.try_load_entries_mut(&full_names).await?;
for ((_name, id), mut channel) in names_and_ids.into_iter().zip(channels) {
channel.subscribers.remove(&id)?;
}
Ok(())
}
}
#[test]
fn empty_executed_block_size() {
let executed_block = crate::data_types::ExecutedBlock {
block: crate::test::make_first_block(ChainId::root(0)),
outcome: crate::data_types::BlockExecutionOutcome::default(),
};
let size = bcs::serialized_size(&executed_block).unwrap();
assert_eq!(size, EMPTY_EXECUTED_BLOCK_SIZE);
}