1#[cfg(with_metrics)]
5use std::sync::LazyLock;
6use std::{
7 collections::{BTreeMap, HashSet},
8 sync::Arc,
9};
10
11use async_graphql::SimpleObject;
12use futures::stream::{self, StreamExt, TryStreamExt};
13use linera_base::{
14 crypto::CryptoHash,
15 data_types::{
16 Amount, ArithmeticError, BlockHeight, OracleResponse, Timestamp, UserApplicationDescription,
17 },
18 ensure,
19 identifiers::{
20 ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner, StreamId,
21 UserApplicationId,
22 },
23};
24use linera_execution::{
25 system::OpenChainConfig, ExecutionOutcome, ExecutionRuntimeContext, ExecutionStateView,
26 Message, MessageContext, Operation, OperationContext, Query, QueryContext, RawExecutionOutcome,
27 RawOutgoingMessage, ResourceController, ResourceTracker, Response, ServiceRuntimeEndpoint,
28 TransactionTracker,
29};
30use linera_views::{
31 context::Context,
32 log_view::LogView,
33 queue_view::QueueView,
34 reentrant_collection_view::ReentrantCollectionView,
35 register_view::RegisterView,
36 set_view::SetView,
37 views::{ClonableView, CryptoHashView, RootView, View},
38};
39use serde::{Deserialize, Serialize};
40
41use crate::{
42 data_types::{
43 Block, BlockExecutionOutcome, ChainAndHeight, ChannelFullName, EventRecord, IncomingBundle,
44 MessageAction, MessageBundle, Origin, OutgoingMessage, PostedMessage, Target, Transaction,
45 },
46 inbox::{Cursor, InboxError, InboxStateView},
47 manager::ChainManager,
48 outbox::OutboxStateView,
49 ChainError, ChainExecutionContext, ExecutionResultExt,
50};
51
52#[cfg(test)]
53#[path = "unit_tests/chain_tests.rs"]
54mod chain_tests;
55
56#[cfg(with_metrics)]
57use {
58 linera_base::prometheus_util::{self, MeasureLatency},
59 prometheus::{HistogramVec, IntCounterVec},
60};
61
62#[cfg(with_metrics)]
63static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
64 prometheus_util::register_int_counter_vec(
65 "num_blocks_executed",
66 "Number of blocks executed",
67 &[],
68 )
69 .expect("Counter creation should not fail")
70});
71
72#[cfg(with_metrics)]
73static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
74 prometheus_util::register_histogram_vec(
75 "block_execution_latency",
76 "Block execution latency",
77 &[],
78 Some(vec![
79 0.000_1, 0.000_25, 0.000_5, 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
80 1.0, 2.5, 5.0, 10.0, 25.0, 50.0,
81 ]),
82 )
83 .expect("Histogram creation should not fail")
84});
85
86#[cfg(with_metrics)]
87static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
88 prometheus_util::register_histogram_vec(
89 "message_execution_latency",
90 "Message execution latency",
91 &[],
92 Some(vec![
93 0.000_1, 0.000_25, 0.000_5, 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
94 1.0, 2.5,
95 ]),
96 )
97 .expect("Histogram creation should not fail")
98});
99
100#[cfg(with_metrics)]
101static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
102 prometheus_util::register_histogram_vec(
103 "operation_execution_latency",
104 "Operation execution latency",
105 &[],
106 Some(vec![
107 0.000_1, 0.000_25, 0.000_5, 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
108 1.0, 2.5,
109 ]),
110 )
111 .expect("Histogram creation should not fail")
112});
113
114#[cfg(with_metrics)]
115static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
116 prometheus_util::register_histogram_vec(
117 "wasm_fuel_used_per_block",
118 "Wasm fuel used per block",
119 &[],
120 Some(vec![
121 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10_000.0, 25_000.0, 50_000.0,
122 100_000.0, 250_000.0, 500_000.0,
123 ]),
124 )
125 .expect("Histogram creation should not fail")
126});
127
128#[cfg(with_metrics)]
129static WASM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
130 prometheus_util::register_histogram_vec(
131 "wasm_num_reads_per_block",
132 "Wasm number of reads per block",
133 &[],
134 Some(vec![0.5, 1.0, 2.0, 4.0, 8.0, 15.0, 30.0, 50.0, 100.0]),
135 )
136 .expect("Histogram creation should not fail")
137});
138
139#[cfg(with_metrics)]
140static WASM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
141 prometheus_util::register_histogram_vec(
142 "wasm_bytes_read_per_block",
143 "Wasm number of bytes read per block",
144 &[],
145 Some(vec![
146 0.5,
147 1.0,
148 10.0,
149 100.0,
150 256.0,
151 512.0,
152 1024.0,
153 2048.0,
154 4096.0,
155 8192.0,
156 16384.0,
157 65_536.0,
158 524_288.0,
159 1_048_576.0,
160 8_388_608.0,
161 ]),
162 )
163 .expect("Histogram creation should not fail")
164});
165
166#[cfg(with_metrics)]
167static WASM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
168 prometheus_util::register_histogram_vec(
169 "wasm_bytes_written_per_block",
170 "Wasm number of bytes written per block",
171 &[],
172 Some(vec![
173 0.5,
174 1.0,
175 10.0,
176 100.0,
177 256.0,
178 512.0,
179 1024.0,
180 2048.0,
181 4096.0,
182 8192.0,
183 16384.0,
184 65_536.0,
185 524_288.0,
186 1_048_576.0,
187 8_388_608.0,
188 ]),
189 )
190 .expect("Histogram creation should not fail")
191});
192
193#[cfg(with_metrics)]
194static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
195 prometheus_util::register_histogram_vec(
196 "state_hash_computation_latency",
197 "Time to recompute the state hash",
198 &[],
199 Some(vec![
200 0.001, 0.003, 0.01, 0.03, 0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1.0, 2.0, 5.0,
201 ]),
202 )
203 .expect("Histogram can be created")
204});
205
206const EMPTY_EXECUTED_BLOCK_SIZE: usize = 91;
208
209#[derive(Debug, Clone, Serialize, Deserialize, async_graphql::SimpleObject)]
211pub struct TimestampedBundleInInbox {
212 pub entry: BundleInInbox,
214 pub seen: Timestamp,
216}
217
218#[derive(
220 Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, async_graphql::SimpleObject,
221)]
222pub struct BundleInInbox {
223 pub origin: Origin,
225 pub cursor: Cursor,
227}
228
229impl BundleInInbox {
230 fn new(origin: Origin, bundle: &MessageBundle) -> Self {
231 BundleInInbox {
232 cursor: Cursor::from(bundle),
233 origin,
234 }
235 }
236}
237
238#[derive(Debug, RootView, ClonableView, SimpleObject)]
240#[graphql(cache_control(no_cache))]
241pub struct ChainStateView<C>
242where
243 C: Clone + Context + Send + Sync + 'static,
244{
245 pub execution_state: ExecutionStateView<C>,
247 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
249
250 pub tip_state: RegisterView<C, ChainTipState>,
252
253 pub manager: RegisterView<C, ChainManager>,
255
256 pub confirmed_log: LogView<C, CryptoHash>,
259 pub received_log: LogView<C, ChainAndHeight>,
261
262 pub inboxes: ReentrantCollectionView<C, Origin, InboxStateView<C>>,
264 pub unskippable_bundles: QueueView<C, TimestampedBundleInInbox>,
266 pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
268 pub outboxes: ReentrantCollectionView<C, Target, OutboxStateView<C>>,
270 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
273 pub channels: ReentrantCollectionView<C, ChannelFullName, ChannelStateView<C>>,
275}
276
277#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, SimpleObject)]
279pub struct ChainTipState {
280 pub block_hash: Option<CryptoHash>,
282 pub next_block_height: BlockHeight,
284 pub num_incoming_bundles: u32,
286 pub num_operations: u32,
288 pub num_outgoing_messages: u32,
290}
291
292impl ChainTipState {
293 pub fn verify_block_chaining(&self, new_block: &Block) -> Result<(), ChainError> {
296 ensure!(
297 new_block.height == self.next_block_height,
298 ChainError::UnexpectedBlockHeight {
299 expected_block_height: self.next_block_height,
300 found_block_height: new_block.height
301 }
302 );
303 ensure!(
304 new_block.previous_block_hash == self.block_hash,
305 ChainError::UnexpectedPreviousBlockHash
306 );
307 Ok(())
308 }
309
310 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
313 ensure!(
314 self.next_block_height >= height,
315 ChainError::MissingEarlierBlocks {
316 current_block_height: self.next_block_height,
317 }
318 );
319 Ok(self.next_block_height > height)
320 }
321
322 pub fn is_first_block(&self) -> bool {
324 self.next_block_height == BlockHeight::ZERO
325 }
326
327 pub fn verify_counters(
329 &self,
330 new_block: &Block,
331 outcome: &BlockExecutionOutcome,
332 ) -> Result<(), ChainError> {
333 let num_incoming_bundles = u32::try_from(new_block.incoming_bundles.len())
334 .map_err(|_| ArithmeticError::Overflow)?;
335 self.num_incoming_bundles
336 .checked_add(num_incoming_bundles)
337 .ok_or(ArithmeticError::Overflow)?;
338
339 let num_operations =
340 u32::try_from(new_block.operations.len()).map_err(|_| ArithmeticError::Overflow)?;
341 self.num_operations
342 .checked_add(num_operations)
343 .ok_or(ArithmeticError::Overflow)?;
344
345 let num_outgoing_messages =
346 u32::try_from(outcome.messages.len()).map_err(|_| ArithmeticError::Overflow)?;
347 self.num_outgoing_messages
348 .checked_add(num_outgoing_messages)
349 .ok_or(ArithmeticError::Overflow)?;
350
351 Ok(())
352 }
353}
354
355#[derive(Debug, ClonableView, View, SimpleObject)]
357pub struct ChannelStateView<C>
358where
359 C: Context + Send + Sync,
360{
361 pub subscribers: SetView<C, ChainId>,
363 pub block_heights: LogView<C, BlockHeight>,
365}
366
367impl<C> ChainStateView<C>
368where
369 C: Context + Clone + Send + Sync + 'static,
370 C::Extra: ExecutionRuntimeContext,
371{
372 pub fn chain_id(&self) -> ChainId {
374 self.context().extra().chain_id()
375 }
376
377 pub async fn query_application(
378 &mut self,
379 local_time: Timestamp,
380 query: Query,
381 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
382 ) -> Result<Response, ChainError> {
383 let context = QueryContext {
384 chain_id: self.chain_id(),
385 next_block_height: self.tip_state.get().next_block_height,
386 local_time,
387 };
388 self.execution_state
389 .query_application(context, query, service_runtime_endpoint)
390 .await
391 .with_execution_context(ChainExecutionContext::Query)
392 }
393
394 pub async fn describe_application(
395 &mut self,
396 application_id: UserApplicationId,
397 ) -> Result<UserApplicationDescription, ChainError> {
398 self.execution_state
399 .system
400 .registry
401 .describe_application(application_id)
402 .await
403 .with_execution_context(ChainExecutionContext::DescribeApplication)
404 }
405
406 pub async fn mark_messages_as_received(
407 &mut self,
408 target: &Target,
409 height: BlockHeight,
410 ) -> Result<bool, ChainError> {
411 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
412 let updates = outbox.mark_messages_as_received(height).await?;
413 if updates.is_empty() {
414 return Ok(false);
415 }
416 for update in updates {
417 let counter = self
418 .outbox_counters
419 .get_mut()
420 .get_mut(&update)
421 .expect("message counter should be present");
422 *counter = counter
423 .checked_sub(1)
424 .expect("message counter should not underflow");
425 if *counter == 0 {
426 self.outbox_counters.get_mut().remove(&update);
428 }
429 }
430 if outbox.queue.count() == 0 {
431 self.outboxes.remove_entry(target)?;
432 }
433 Ok(true)
434 }
435
436 pub fn all_messages_delivered_up_to(&mut self, height: BlockHeight) -> bool {
439 tracing::debug!(
440 "Messages left in {:.8}'s outbox: {:?}",
441 self.chain_id(),
442 self.outbox_counters.get()
443 );
444 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
445 key > &height
446 } else {
447 true
448 }
449 }
450
451 pub fn is_active(&self) -> bool {
453 self.execution_state.system.is_active()
454 }
455
456 pub fn is_closed(&self) -> bool {
458 *self.execution_state.system.closed.get()
459 }
460
461 pub fn ensure_is_active(&self) -> Result<(), ChainError> {
463 if self.is_active() {
464 Ok(())
465 } else {
466 Err(ChainError::InactiveChain(self.chain_id()))
467 }
468 }
469
470 pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
473 let chain_id = self.chain_id();
474 let pairs = self.inboxes.try_load_all_entries().await?;
475 let max_stream_queries = self.context().max_stream_queries();
476 let stream = stream::iter(pairs)
477 .map(|(origin, inbox)| async move {
478 if let Some(bundle) = inbox.removed_bundles.front().await? {
479 return Err(ChainError::MissingCrossChainUpdate {
480 chain_id,
481 origin: origin.into(),
482 height: bundle.height,
483 });
484 }
485 Ok::<(), ChainError>(())
486 })
487 .buffer_unordered(max_stream_queries);
488 stream.try_collect::<Vec<_>>().await?;
489 Ok(())
490 }
491
492 pub async fn next_block_height_to_receive(
493 &self,
494 origin: &Origin,
495 ) -> Result<BlockHeight, ChainError> {
496 let inbox = self.inboxes.try_load_entry(origin).await?;
497 match inbox {
498 Some(inbox) => inbox.next_block_height_to_receive(),
499 None => Ok(BlockHeight::from(0)),
500 }
501 }
502
503 pub async fn last_anticipated_block_height(
504 &self,
505 origin: &Origin,
506 ) -> Result<Option<BlockHeight>, ChainError> {
507 let inbox = self.inboxes.try_load_entry(origin).await?;
508 match inbox {
509 Some(inbox) => match inbox.removed_bundles.back().await? {
510 Some(bundle) => Ok(Some(bundle.height)),
511 None => Ok(None),
512 },
513 None => Ok(None),
514 }
515 }
516
517 pub async fn receive_message_bundle(
524 &mut self,
525 origin: &Origin,
526 bundle: MessageBundle,
527 local_time: Timestamp,
528 add_to_received_log: bool,
529 ) -> Result<bool, ChainError> {
530 assert!(!bundle.messages.is_empty());
531 let chain_id = self.chain_id();
532 tracing::trace!(
533 "Processing new messages to {chain_id:.8} from {origin} at height {}",
534 bundle.height,
535 );
536 let chain_and_height = ChainAndHeight {
537 chain_id: origin.sender,
538 height: bundle.height,
539 };
540 let mut subscribe_names_and_ids = Vec::new();
541 let mut unsubscribe_names_and_ids = Vec::new();
542
543 for posted_message in &bundle.messages {
545 if let Some(config) = posted_message.message.matches_open_chain() {
546 if self.execution_state.system.description.get().is_none() {
547 let message_id = chain_and_height.to_message_id(posted_message.index);
548 self.execute_init_message(message_id, config, bundle.timestamp, local_time)
549 .await?;
550 }
551 } else if let Some((id, subscription)) = posted_message.message.matches_subscribe() {
552 subscribe_names_and_ids.push((subscription.name.clone(), *id));
553 }
554 if let Some((id, subscription)) = posted_message.message.matches_unsubscribe() {
555 unsubscribe_names_and_ids.push((subscription.name.clone(), *id));
556 }
557 }
558 self.process_unsubscribes(unsubscribe_names_and_ids, GenericApplicationId::System)
559 .await?;
560 let new_outbox_entries = self
561 .process_subscribes(subscribe_names_and_ids, GenericApplicationId::System)
562 .await?;
563
564 if bundle.goes_to_inbox() {
565 let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
567 let entry = BundleInInbox::new(origin.clone(), &bundle);
568 let skippable = bundle.is_skippable();
569 let newly_added = inbox
570 .add_bundle(bundle)
571 .await
572 .map_err(|error| match error {
573 InboxError::ViewError(error) => ChainError::ViewError(error),
574 error => ChainError::InternalError(format!(
575 "while processing messages in certified block: {error}"
576 )),
577 })?;
578 if newly_added && !skippable {
579 let seen = local_time;
580 self.unskippable_bundles
581 .push_back(TimestampedBundleInInbox { entry, seen });
582 }
583 }
584
585 if add_to_received_log {
587 self.received_log.push(chain_and_height);
588 }
589 Ok(new_outbox_entries)
590 }
591
592 pub async fn execute_init_message(
593 &mut self,
594 message_id: MessageId,
595 config: &OpenChainConfig,
596 timestamp: Timestamp,
597 local_time: Timestamp,
598 ) -> Result<bool, ChainError> {
599 self.execution_state
601 .system
602 .initialize_chain(message_id, timestamp, config.clone());
603 let hash = self.execution_state.crypto_hash().await?;
605 self.execution_state_hash.set(Some(hash));
606 let maybe_committee = self.execution_state.system.current_committee().into_iter();
607 self.manager.get_mut().reset(
609 self.execution_state.system.ownership.get(),
610 BlockHeight(0),
611 local_time,
612 maybe_committee.flat_map(|(_, committee)| committee.keys_and_weights()),
613 )?;
614 Ok(true)
615 }
616
617 pub async fn remove_bundles_from_inboxes(&mut self, block: &Block) -> Result<(), ChainError> {
619 let chain_id = self.chain_id();
620 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
621 for IncomingBundle { bundle, origin, .. } in &block.incoming_bundles {
622 ensure!(
623 bundle.timestamp <= block.timestamp,
624 ChainError::IncorrectBundleTimestamp {
625 chain_id,
626 bundle_timestamp: bundle.timestamp,
627 block_timestamp: block.timestamp,
628 }
629 );
630 let bundles = bundles_by_origin.entry(origin).or_default();
631 bundles.push(bundle);
632 }
633 let origins = bundles_by_origin.keys().copied();
634 let inboxes = self.inboxes.try_load_entries_mut(origins).await?;
635 let mut removed_unskippable = HashSet::new();
636 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
637 tracing::trace!(
638 "Removing {:?} from {chain_id:.8}'s inbox for {origin:}",
639 bundles
640 .iter()
641 .map(|bundle| bundle.height)
642 .collect::<Vec<_>>()
643 );
644 for bundle in bundles {
645 let was_present = inbox
647 .remove_bundle(bundle)
648 .await
649 .map_err(|error| ChainError::from((chain_id, origin.clone(), error)))?;
650 if was_present && !bundle.is_skippable() {
651 removed_unskippable.insert(BundleInInbox::new(origin.clone(), bundle));
652 }
653 }
654 }
655 if !removed_unskippable.is_empty() {
656 let maybe_front = self.unskippable_bundles.front().await?;
658 if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
659 self.unskippable_bundles.delete_front();
660 while let Some(ts_entry) = self.unskippable_bundles.front().await? {
661 if !removed_unskippable.remove(&ts_entry.entry) {
662 if !self
663 .removed_unskippable_bundles
664 .contains(&ts_entry.entry)
665 .await?
666 {
667 break;
668 }
669 self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
670 }
671 self.unskippable_bundles.delete_front();
672 }
673 }
674 for entry in removed_unskippable {
675 self.removed_unskippable_bundles.insert(&entry)?;
676 }
677 }
678 Ok(())
679 }
680
681 pub async fn execute_block(
687 &mut self,
688 block: &Block,
689 local_time: Timestamp,
690 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
691 ) -> Result<BlockExecutionOutcome, ChainError> {
692 #[cfg(with_metrics)]
693 let _execution_latency = BLOCK_EXECUTION_LATENCY.measure_latency();
694
695 let chain_id = self.chain_id();
696 assert_eq!(block.chain_id, chain_id);
697 if block.height == BlockHeight::ZERO
700 && self
701 .execution_state
702 .system
703 .description
704 .get()
705 .map_or(true, |description| description.is_child())
706 {
707 let (in_bundle, posted_message, config) = block
708 .starts_with_open_chain_message()
709 .ok_or_else(|| ChainError::InactiveChain(chain_id))?;
710 if !self.is_active() {
711 let message_id = MessageId {
712 chain_id: in_bundle.origin.sender,
713 height: in_bundle.bundle.height,
714 index: posted_message.index,
715 };
716 self.execute_init_message(message_id, config, block.timestamp, local_time)
717 .await?;
718 }
719 }
720
721 ensure!(
722 *self.execution_state.system.timestamp.get() <= block.timestamp,
723 ChainError::InvalidBlockTimestamp
724 );
725 self.execution_state.system.timestamp.set(block.timestamp);
726 let Some((_, committee)) = self.execution_state.system.current_committee() else {
727 return Err(ChainError::InactiveChain(chain_id));
728 };
729 let mut resource_controller = ResourceController {
730 policy: Arc::new(committee.policy().clone()),
731 tracker: ResourceTracker::default(),
732 account: block.authenticated_signer,
733 };
734 resource_controller
735 .track_executed_block_size(EMPTY_EXECUTED_BLOCK_SIZE)
736 .and_then(|()| {
737 resource_controller
738 .track_executed_block_size_sequence_extension(0, block.incoming_bundles.len())
739 })
740 .and_then(|()| {
741 resource_controller
742 .track_executed_block_size_sequence_extension(0, block.operations.len())
743 })
744 .with_execution_context(ChainExecutionContext::Block)?;
745
746 if self.is_closed() {
747 ensure!(
748 !block.incoming_bundles.is_empty() && block.has_only_rejected_messages(),
749 ChainError::ClosedChain
750 );
751 }
752 let app_permissions = self.execution_state.system.application_permissions.get();
753 let mut mandatory = HashSet::<UserApplicationId>::from_iter(
754 app_permissions.mandatory_applications.iter().cloned(),
755 );
756 for operation in &block.operations {
757 ensure!(
758 app_permissions.can_execute_operations(&operation.application_id()),
759 ChainError::AuthorizedApplications(
760 app_permissions.execute_operations.clone().unwrap()
761 )
762 );
763 if let Operation::User { application_id, .. } = operation {
764 mandatory.remove(application_id);
765 }
766 }
767 for pending in block.incoming_messages() {
768 if mandatory.is_empty() {
769 break;
770 }
771 if let Message::User { application_id, .. } = &pending.message {
772 mandatory.remove(application_id);
773 }
774 }
775 ensure!(
776 mandatory.is_empty(),
777 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
778 );
779
780 let mut replaying_oracle_responses = replaying_oracle_responses.map(Vec::into_iter);
783 let mut next_message_index = 0;
784 let mut oracle_responses = Vec::new();
785 let mut events = Vec::new();
786 let mut messages = Vec::new();
787 for (txn_index, transaction) in block.transactions() {
788 let chain_execution_context = match transaction {
789 Transaction::ReceiveMessages(_) => ChainExecutionContext::IncomingBundle(txn_index),
790 Transaction::ExecuteOperation(_) => ChainExecutionContext::Operation(txn_index),
791 };
792 let maybe_responses = match replaying_oracle_responses.as_mut().map(Iterator::next) {
793 Some(Some(responses)) => Some(responses),
794 Some(None) => return Err(ChainError::MissingOracleResponseList),
795 None => None,
796 };
797 let mut txn_tracker = TransactionTracker::new(next_message_index, maybe_responses);
798 match transaction {
799 Transaction::ReceiveMessages(incoming_bundle) => {
800 resource_controller
801 .track_executed_block_size_of(&incoming_bundle)
802 .with_execution_context(chain_execution_context)?;
803 for (message_id, posted_message) in incoming_bundle.messages_and_ids() {
804 self.execute_message_in_block(
805 message_id,
806 posted_message,
807 incoming_bundle,
808 block,
809 txn_index,
810 local_time,
811 &mut txn_tracker,
812 &mut resource_controller,
813 )
814 .await?;
815 }
816 }
817 Transaction::ExecuteOperation(operation) => {
818 resource_controller
819 .track_executed_block_size_of(&operation)
820 .with_execution_context(chain_execution_context)?;
821 #[cfg(with_metrics)]
822 let _operation_latency = OPERATION_EXECUTION_LATENCY.measure_latency();
823 let context = OperationContext {
824 chain_id,
825 height: block.height,
826 index: Some(txn_index),
827 authenticated_signer: block.authenticated_signer,
828 authenticated_caller_id: None,
829 };
830 self.execution_state
831 .execute_operation(
832 context,
833 local_time,
834 operation.clone(),
835 &mut txn_tracker,
836 &mut resource_controller,
837 )
838 .await
839 .with_execution_context(chain_execution_context)?;
840 resource_controller
841 .with_state(&mut self.execution_state)
842 .await?
843 .track_operation(operation)
844 .with_execution_context(chain_execution_context)?;
845 }
846 }
847
848 self.execution_state
849 .update_execution_outcomes_with_app_registrations(&mut txn_tracker)
850 .await
851 .with_execution_context(chain_execution_context)?;
852 let (txn_outcomes, txn_oracle_responses, new_next_message_index) = txn_tracker
853 .destructure()
854 .with_execution_context(chain_execution_context)?;
855 next_message_index = new_next_message_index;
856 let (txn_messages, txn_events) = self
857 .process_execution_outcomes(block.height, txn_outcomes)
858 .await?;
859 if matches!(
860 transaction,
861 Transaction::ExecuteOperation(_)
862 | Transaction::ReceiveMessages(IncomingBundle {
863 action: MessageAction::Accept,
864 ..
865 })
866 ) {
867 for message_out in &txn_messages {
868 resource_controller
869 .with_state(&mut self.execution_state)
870 .await?
871 .track_message(&message_out.message)
872 .with_execution_context(chain_execution_context)?;
873 }
874 }
875 resource_controller
876 .track_executed_block_size_of(&(&txn_oracle_responses, &txn_messages, &txn_events))
877 .with_execution_context(chain_execution_context)?;
878 resource_controller
879 .track_executed_block_size_sequence_extension(oracle_responses.len(), 1)
880 .with_execution_context(chain_execution_context)?;
881 resource_controller
882 .track_executed_block_size_sequence_extension(messages.len(), 1)
883 .with_execution_context(chain_execution_context)?;
884 resource_controller
885 .track_executed_block_size_sequence_extension(events.len(), 1)
886 .with_execution_context(chain_execution_context)?;
887 oracle_responses.push(txn_oracle_responses);
888 messages.push(txn_messages);
889 events.push(txn_events);
890 }
891
892 if !self.is_closed() {
895 resource_controller
896 .with_state(&mut self.execution_state)
897 .await?
898 .track_block()
899 .with_execution_context(ChainExecutionContext::Block)?;
900 }
901
902 let state_hash = {
904 #[cfg(with_metrics)]
905 let _hash_latency = STATE_HASH_COMPUTATION_LATENCY.measure_latency();
906 self.execution_state.crypto_hash().await?
907 };
908 self.execution_state_hash.set(Some(state_hash));
909 let maybe_committee = self.execution_state.system.current_committee().into_iter();
911 self.manager.get_mut().reset(
912 self.execution_state.system.ownership.get(),
913 block.height.try_add_one()?,
914 local_time,
915 maybe_committee.flat_map(|(_, committee)| committee.keys_and_weights()),
916 )?;
917
918 #[cfg(with_metrics)]
919 {
920 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
922 WASM_FUEL_USED_PER_BLOCK
923 .with_label_values(&[])
924 .observe(resource_controller.tracker.fuel as f64);
925 WASM_NUM_READS_PER_BLOCK
926 .with_label_values(&[])
927 .observe(resource_controller.tracker.read_operations as f64);
928 WASM_BYTES_READ_PER_BLOCK
929 .with_label_values(&[])
930 .observe(resource_controller.tracker.bytes_read as f64);
931 WASM_BYTES_WRITTEN_PER_BLOCK
932 .with_label_values(&[])
933 .observe(resource_controller.tracker.bytes_written as f64);
934 }
935
936 assert_eq!(
937 messages.len(),
938 block.incoming_bundles.len() + block.operations.len()
939 );
940 let outcome = BlockExecutionOutcome {
941 messages,
942 state_hash,
943 oracle_responses,
944 events,
945 };
946 Ok(outcome)
947 }
948
949 #[expect(clippy::too_many_arguments)]
951 async fn execute_message_in_block(
952 &mut self,
953 message_id: MessageId,
954 posted_message: &PostedMessage,
955 incoming_bundle: &IncomingBundle,
956 block: &Block,
957 txn_index: u32,
958 local_time: Timestamp,
959 txn_tracker: &mut TransactionTracker,
960 resource_controller: &mut ResourceController<Option<Owner>>,
961 ) -> Result<(), ChainError> {
962 #[cfg(with_metrics)]
963 let _message_latency = MESSAGE_EXECUTION_LATENCY.measure_latency();
964 let context = MessageContext {
965 chain_id: block.chain_id,
966 is_bouncing: posted_message.is_bouncing(),
967 height: block.height,
968 certificate_hash: incoming_bundle.bundle.certificate_hash,
969 message_id,
970 authenticated_signer: posted_message.authenticated_signer,
971 refund_grant_to: posted_message.refund_grant_to,
972 };
973 let mut grant = posted_message.grant;
974 match incoming_bundle.action {
975 MessageAction::Accept => {
976 ensure!(!self.is_closed(), ChainError::ClosedChain);
978
979 self.execution_state
980 .execute_message(
981 context,
982 local_time,
983 posted_message.message.clone(),
984 (grant > Amount::ZERO).then_some(&mut grant),
985 txn_tracker,
986 resource_controller,
987 )
988 .await
989 .with_execution_context(ChainExecutionContext::IncomingBundle(txn_index))?;
990 if grant > Amount::ZERO {
991 if let Some(refund_grant_to) = posted_message.refund_grant_to {
992 self.execution_state
993 .send_refund(context, grant, refund_grant_to, txn_tracker)
994 .await
995 .with_execution_context(ChainExecutionContext::IncomingBundle(
996 txn_index,
997 ))?;
998 }
999 }
1000 }
1001 MessageAction::Reject => {
1002 ensure!(
1005 !posted_message.is_protected() || self.is_closed(),
1006 ChainError::CannotRejectMessage {
1007 chain_id: block.chain_id,
1008 origin: Box::new(incoming_bundle.origin.clone()),
1009 posted_message: Box::new(posted_message.clone()),
1010 }
1011 );
1012 if posted_message.is_tracked() {
1013 self.execution_state
1015 .bounce_message(context, grant, posted_message.message.clone(), txn_tracker)
1016 .await
1017 .with_execution_context(ChainExecutionContext::Block)?;
1018 } else if grant > Amount::ZERO {
1019 let Some(refund_grant_to) = posted_message.refund_grant_to else {
1021 return Err(ChainError::InternalError(
1023 "Messages with grants should have a non-empty `refund_grant_to`".into(),
1024 ));
1025 };
1026 self.execution_state
1028 .send_refund(context, posted_message.grant, refund_grant_to, txn_tracker)
1029 .await
1030 .with_execution_context(ChainExecutionContext::Block)?;
1031 }
1032 }
1033 }
1034 Ok(())
1035 }
1036
1037 async fn process_execution_outcomes(
1038 &mut self,
1039 height: BlockHeight,
1040 results: Vec<ExecutionOutcome>,
1041 ) -> Result<(Vec<OutgoingMessage>, Vec<EventRecord>), ChainError> {
1042 let mut messages = Vec::new();
1043 let mut events = Vec::new();
1044 for result in results {
1045 match result {
1046 ExecutionOutcome::System(result) => {
1047 self.process_raw_execution_outcome(
1048 GenericApplicationId::System,
1049 Message::System,
1050 &mut messages,
1051 &mut events,
1052 height,
1053 result,
1054 )
1055 .await?;
1056 }
1057 ExecutionOutcome::User(application_id, result) => {
1058 self.process_raw_execution_outcome(
1059 GenericApplicationId::User(application_id),
1060 |bytes| Message::User {
1061 application_id,
1062 bytes,
1063 },
1064 &mut messages,
1065 &mut events,
1066 height,
1067 result,
1068 )
1069 .await?;
1070 }
1071 }
1072 }
1073 Ok((messages, events))
1074 }
1075
1076 async fn process_raw_execution_outcome<E, F>(
1077 &mut self,
1078 application_id: GenericApplicationId,
1079 lift: F,
1080 messages: &mut Vec<OutgoingMessage>,
1081 events: &mut Vec<EventRecord>,
1082 height: BlockHeight,
1083 raw_outcome: RawExecutionOutcome<E, Amount>,
1084 ) -> Result<(), ChainError>
1085 where
1086 F: Fn(E) -> Message,
1087 {
1088 events.extend(
1089 raw_outcome
1090 .events
1091 .into_iter()
1092 .map(|(stream_name, key, value)| EventRecord {
1093 stream_id: StreamId {
1094 application_id,
1095 stream_name,
1096 },
1097 key,
1098 value,
1099 }),
1100 );
1101 let max_stream_queries = self.context().max_stream_queries();
1102 let mut recipients = HashSet::new();
1105 let mut channel_broadcasts = HashSet::new();
1106 for RawOutgoingMessage {
1107 destination,
1108 authenticated,
1109 grant,
1110 kind,
1111 message,
1112 } in raw_outcome.messages
1113 {
1114 match &destination {
1115 Destination::Recipient(id) => {
1116 recipients.insert(*id);
1117 }
1118 Destination::Subscribers(name) => {
1119 ensure!(grant == Amount::ZERO, ChainError::GrantUseOnBroadcast);
1120 channel_broadcasts.insert(name.clone());
1121 }
1122 }
1123 let authenticated_signer = raw_outcome.authenticated_signer.filter(|_| authenticated);
1124 let refund_grant_to = raw_outcome.refund_grant_to.filter(|_| grant > Amount::ZERO);
1125 messages.push(OutgoingMessage {
1126 destination,
1127 authenticated_signer,
1128 grant,
1129 refund_grant_to,
1130 kind,
1131 message: lift(message),
1132 });
1133 }
1134
1135 let outbox_counters = self.outbox_counters.get_mut();
1137 let targets = recipients
1138 .into_iter()
1139 .map(Target::chain)
1140 .collect::<Vec<_>>();
1141 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1142 for mut outbox in outboxes {
1143 if outbox.schedule_message(height)? {
1144 *outbox_counters.entry(height).or_default() += 1;
1145 }
1146 }
1147
1148 self.process_unsubscribes(raw_outcome.unsubscribe, application_id)
1150 .await?;
1151
1152 let full_names = channel_broadcasts
1153 .into_iter()
1154 .map(|name| ChannelFullName {
1155 application_id,
1156 name,
1157 })
1158 .collect::<Vec<_>>();
1159 let channels = self.channels.try_load_entries_mut(&full_names).await?;
1160 let stream = full_names.into_iter().zip(channels);
1161 let stream = stream::iter(stream)
1162 .map(|(full_name, mut channel)| async move {
1163 let recipients = channel.subscribers.indices().await?;
1164 channel.block_heights.push(height);
1165 let targets = recipients
1166 .into_iter()
1167 .map(|recipient| Target::channel(recipient, full_name.clone()))
1168 .collect::<Vec<_>>();
1169 Ok::<_, ChainError>(targets)
1170 })
1171 .buffer_unordered(max_stream_queries);
1172 let infos = stream.try_collect::<Vec<_>>().await?;
1173 let targets = infos.into_iter().flatten().collect::<Vec<_>>();
1174 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1175 let outbox_counters = self.outbox_counters.get_mut();
1176 for mut outbox in outboxes {
1177 if outbox.schedule_message(height)? {
1178 *outbox_counters.entry(height).or_default() += 1;
1179 }
1180 }
1181
1182 self.process_subscribes(raw_outcome.subscribe, application_id)
1183 .await?;
1184 Ok(())
1185 }
1186
1187 async fn process_subscribes(
1190 &mut self,
1191 names_and_ids: Vec<(ChannelName, ChainId)>,
1192 application_id: GenericApplicationId,
1193 ) -> Result<bool, ChainError> {
1194 if names_and_ids.is_empty() {
1195 return Ok(false);
1196 }
1197 let full_names = names_and_ids
1198 .iter()
1199 .map(|(name, _)| ChannelFullName {
1200 application_id,
1201 name: name.clone(),
1202 })
1203 .collect::<Vec<_>>();
1204 let channels = self.channels.try_load_entries_mut(&full_names).await?;
1205 let subscribe_channels = names_and_ids.into_iter().zip(channels);
1206 let max_stream_queries = self.context().max_stream_queries();
1207 let stream = stream::iter(subscribe_channels)
1208 .map(|((name, id), mut channel)| async move {
1209 if channel.subscribers.contains(&id).await? {
1210 return Ok(None); }
1212 let full_name = ChannelFullName {
1213 application_id,
1214 name,
1215 };
1216 tracing::trace!("Adding subscriber {id:.8} for {full_name:}");
1217 channel.subscribers.insert(&id)?;
1218 let heights = channel.block_heights.read(..).await?;
1220 if heights.is_empty() {
1221 return Ok(None); }
1223 let target = Target::channel(id, full_name.clone());
1224 Ok::<_, ChainError>(Some((target, heights)))
1225 })
1226 .buffer_unordered(max_stream_queries);
1227 let infos = stream.try_collect::<Vec<_>>().await?;
1228 let (targets, heights): (Vec<_>, Vec<_>) = infos.into_iter().flatten().unzip();
1229 let mut new_outbox_entries = false;
1230 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1231 let outbox_counters = self.outbox_counters.get_mut();
1232 for (heights, mut outbox) in heights.into_iter().zip(outboxes) {
1233 for height in heights {
1234 if outbox.schedule_message(height)? {
1235 *outbox_counters.entry(height).or_default() += 1;
1236 new_outbox_entries = true;
1237 }
1238 }
1239 }
1240 Ok(new_outbox_entries)
1241 }
1242
1243 async fn process_unsubscribes(
1244 &mut self,
1245 names_and_ids: Vec<(ChannelName, ChainId)>,
1246 application_id: GenericApplicationId,
1247 ) -> Result<(), ChainError> {
1248 if names_and_ids.is_empty() {
1249 return Ok(());
1250 }
1251 let full_names = names_and_ids
1252 .iter()
1253 .map(|(name, _)| ChannelFullName {
1254 application_id,
1255 name: name.clone(),
1256 })
1257 .collect::<Vec<_>>();
1258 let channels = self.channels.try_load_entries_mut(&full_names).await?;
1259 for ((_name, id), mut channel) in names_and_ids.into_iter().zip(channels) {
1260 channel.subscribers.remove(&id)?;
1262 }
1263 Ok(())
1264 }
1265}
1266
1267#[test]
1268fn empty_executed_block_size() {
1269 let executed_block = crate::data_types::ExecutedBlock {
1270 block: crate::test::make_first_block(ChainId::root(0)),
1271 outcome: crate::data_types::BlockExecutionOutcome::default(),
1272 };
1273 let size = bcs::serialized_size(&executed_block).unwrap();
1274 assert_eq!(size, EMPTY_EXECUTED_BLOCK_SIZE);
1275}