linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(with_metrics)]
5use std::sync::LazyLock;
6use std::{
7    collections::{BTreeMap, HashSet},
8    sync::Arc,
9};
10
11use async_graphql::SimpleObject;
12use futures::stream::{self, StreamExt, TryStreamExt};
13use linera_base::{
14    crypto::CryptoHash,
15    data_types::{
16        Amount, ArithmeticError, BlockHeight, OracleResponse, Timestamp, UserApplicationDescription,
17    },
18    ensure,
19    identifiers::{
20        ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner, StreamId,
21        UserApplicationId,
22    },
23};
24use linera_execution::{
25    system::OpenChainConfig, ExecutionOutcome, ExecutionRuntimeContext, ExecutionStateView,
26    Message, MessageContext, Operation, OperationContext, Query, QueryContext, RawExecutionOutcome,
27    RawOutgoingMessage, ResourceController, ResourceTracker, Response, ServiceRuntimeEndpoint,
28    TransactionTracker,
29};
30use linera_views::{
31    context::Context,
32    log_view::LogView,
33    queue_view::QueueView,
34    reentrant_collection_view::ReentrantCollectionView,
35    register_view::RegisterView,
36    set_view::SetView,
37    views::{ClonableView, CryptoHashView, RootView, View},
38};
39use serde::{Deserialize, Serialize};
40
41use crate::{
42    data_types::{
43        Block, BlockExecutionOutcome, ChainAndHeight, ChannelFullName, EventRecord, IncomingBundle,
44        MessageAction, MessageBundle, Origin, OutgoingMessage, PostedMessage, Target, Transaction,
45    },
46    inbox::{Cursor, InboxError, InboxStateView},
47    manager::ChainManager,
48    outbox::OutboxStateView,
49    ChainError, ChainExecutionContext, ExecutionResultExt,
50};
51
52#[cfg(test)]
53#[path = "unit_tests/chain_tests.rs"]
54mod chain_tests;
55
56#[cfg(with_metrics)]
57use {
58    linera_base::prometheus_util::{self, MeasureLatency},
59    prometheus::{HistogramVec, IntCounterVec},
60};
61
62#[cfg(with_metrics)]
63static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
64    prometheus_util::register_int_counter_vec(
65        "num_blocks_executed",
66        "Number of blocks executed",
67        &[],
68    )
69    .expect("Counter creation should not fail")
70});
71
72#[cfg(with_metrics)]
73static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
74    prometheus_util::register_histogram_vec(
75        "block_execution_latency",
76        "Block execution latency",
77        &[],
78        Some(vec![
79            0.000_1, 0.000_25, 0.000_5, 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
80            1.0, 2.5, 5.0, 10.0, 25.0, 50.0,
81        ]),
82    )
83    .expect("Histogram creation should not fail")
84});
85
86#[cfg(with_metrics)]
87static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
88    prometheus_util::register_histogram_vec(
89        "message_execution_latency",
90        "Message execution latency",
91        &[],
92        Some(vec![
93            0.000_1, 0.000_25, 0.000_5, 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
94            1.0, 2.5,
95        ]),
96    )
97    .expect("Histogram creation should not fail")
98});
99
100#[cfg(with_metrics)]
101static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
102    prometheus_util::register_histogram_vec(
103        "operation_execution_latency",
104        "Operation execution latency",
105        &[],
106        Some(vec![
107            0.000_1, 0.000_25, 0.000_5, 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
108            1.0, 2.5,
109        ]),
110    )
111    .expect("Histogram creation should not fail")
112});
113
114#[cfg(with_metrics)]
115static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
116    prometheus_util::register_histogram_vec(
117        "wasm_fuel_used_per_block",
118        "Wasm fuel used per block",
119        &[],
120        Some(vec![
121            50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10_000.0, 25_000.0, 50_000.0,
122            100_000.0, 250_000.0, 500_000.0,
123        ]),
124    )
125    .expect("Histogram creation should not fail")
126});
127
128#[cfg(with_metrics)]
129static WASM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
130    prometheus_util::register_histogram_vec(
131        "wasm_num_reads_per_block",
132        "Wasm number of reads per block",
133        &[],
134        Some(vec![0.5, 1.0, 2.0, 4.0, 8.0, 15.0, 30.0, 50.0, 100.0]),
135    )
136    .expect("Histogram creation should not fail")
137});
138
139#[cfg(with_metrics)]
140static WASM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
141    prometheus_util::register_histogram_vec(
142        "wasm_bytes_read_per_block",
143        "Wasm number of bytes read per block",
144        &[],
145        Some(vec![
146            0.5,
147            1.0,
148            10.0,
149            100.0,
150            256.0,
151            512.0,
152            1024.0,
153            2048.0,
154            4096.0,
155            8192.0,
156            16384.0,
157            65_536.0,
158            524_288.0,
159            1_048_576.0,
160            8_388_608.0,
161        ]),
162    )
163    .expect("Histogram creation should not fail")
164});
165
166#[cfg(with_metrics)]
167static WASM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
168    prometheus_util::register_histogram_vec(
169        "wasm_bytes_written_per_block",
170        "Wasm number of bytes written per block",
171        &[],
172        Some(vec![
173            0.5,
174            1.0,
175            10.0,
176            100.0,
177            256.0,
178            512.0,
179            1024.0,
180            2048.0,
181            4096.0,
182            8192.0,
183            16384.0,
184            65_536.0,
185            524_288.0,
186            1_048_576.0,
187            8_388_608.0,
188        ]),
189    )
190    .expect("Histogram creation should not fail")
191});
192
193#[cfg(with_metrics)]
194static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
195    prometheus_util::register_histogram_vec(
196        "state_hash_computation_latency",
197        "Time to recompute the state hash",
198        &[],
199        Some(vec![
200            0.001, 0.003, 0.01, 0.03, 0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1.0, 2.0, 5.0,
201        ]),
202    )
203    .expect("Histogram can be created")
204});
205
206/// The BCS-serialized size of an empty `ExecutedBlock`.
207const EMPTY_EXECUTED_BLOCK_SIZE: usize = 91;
208
209/// An origin, cursor and timestamp of a unskippable bundle in our inbox.
210#[derive(Debug, Clone, Serialize, Deserialize, async_graphql::SimpleObject)]
211pub struct TimestampedBundleInInbox {
212    /// The origin and cursor of the bundle.
213    pub entry: BundleInInbox,
214    /// The timestamp when the bundle was added to the inbox.
215    pub seen: Timestamp,
216}
217
218/// An origin and cursor of a unskippable bundle that is no longer in our inbox.
219#[derive(
220    Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, async_graphql::SimpleObject,
221)]
222pub struct BundleInInbox {
223    /// The origin from which we received the bundle.
224    pub origin: Origin,
225    /// The cursor of the bundle in the inbox.
226    pub cursor: Cursor,
227}
228
229impl BundleInInbox {
230    fn new(origin: Origin, bundle: &MessageBundle) -> Self {
231        BundleInInbox {
232            cursor: Cursor::from(bundle),
233            origin,
234        }
235    }
236}
237
238/// A view accessing the state of a chain.
239#[derive(Debug, RootView, ClonableView, SimpleObject)]
240#[graphql(cache_control(no_cache))]
241pub struct ChainStateView<C>
242where
243    C: Clone + Context + Send + Sync + 'static,
244{
245    /// Execution state, including system and user applications.
246    pub execution_state: ExecutionStateView<C>,
247    /// Hash of the execution state.
248    pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
249
250    /// Block-chaining state.
251    pub tip_state: RegisterView<C, ChainTipState>,
252
253    /// Consensus state.
254    pub manager: RegisterView<C, ChainManager>,
255
256    /// Hashes of all certified blocks for this sender.
257    /// This ends with `block_hash` and has length `usize::from(next_block_height)`.
258    pub confirmed_log: LogView<C, CryptoHash>,
259    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
260    pub received_log: LogView<C, ChainAndHeight>,
261
262    /// Mailboxes used to receive messages indexed by their origin.
263    pub inboxes: ReentrantCollectionView<C, Origin, InboxStateView<C>>,
264    /// A queue of unskippable bundles, with the timestamp when we added them to the inbox.
265    pub unskippable_bundles: QueueView<C, TimestampedBundleInInbox>,
266    /// Unskippable bundles that have been removed but are still in the queue.
267    pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
268    /// Mailboxes used to send messages, indexed by their target.
269    pub outboxes: ReentrantCollectionView<C, Target, OutboxStateView<C>>,
270    /// Number of outgoing messages in flight for each block height.
271    /// We use a `RegisterView` to prioritize speed for small maps.
272    pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
273    /// Channels able to multicast messages to subscribers.
274    pub channels: ReentrantCollectionView<C, ChannelFullName, ChannelStateView<C>>,
275}
276
277/// Block-chaining state.
278#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, SimpleObject)]
279pub struct ChainTipState {
280    /// Hash of the latest certified block in this chain, if any.
281    pub block_hash: Option<CryptoHash>,
282    /// Sequence number tracking blocks.
283    pub next_block_height: BlockHeight,
284    /// Number of incoming message bundles.
285    pub num_incoming_bundles: u32,
286    /// Number of operations.
287    pub num_operations: u32,
288    /// Number of outgoing messages.
289    pub num_outgoing_messages: u32,
290}
291
292impl ChainTipState {
293    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
294    /// expected parent.
295    pub fn verify_block_chaining(&self, new_block: &Block) -> Result<(), ChainError> {
296        ensure!(
297            new_block.height == self.next_block_height,
298            ChainError::UnexpectedBlockHeight {
299                expected_block_height: self.next_block_height,
300                found_block_height: new_block.height
301            }
302        );
303        ensure!(
304            new_block.previous_block_hash == self.block_hash,
305            ChainError::UnexpectedPreviousBlockHash
306        );
307        Ok(())
308    }
309
310    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
311    /// it is higher than the tip.
312    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
313        ensure!(
314            self.next_block_height >= height,
315            ChainError::MissingEarlierBlocks {
316                current_block_height: self.next_block_height,
317            }
318        );
319        Ok(self.next_block_height > height)
320    }
321
322    /// Returns `true` if the next block will be the first, i.e. the chain doesn't have any blocks.
323    pub fn is_first_block(&self) -> bool {
324        self.next_block_height == BlockHeight::ZERO
325    }
326
327    /// Checks if the measurement counters would be valid.
328    pub fn verify_counters(
329        &self,
330        new_block: &Block,
331        outcome: &BlockExecutionOutcome,
332    ) -> Result<(), ChainError> {
333        let num_incoming_bundles = u32::try_from(new_block.incoming_bundles.len())
334            .map_err(|_| ArithmeticError::Overflow)?;
335        self.num_incoming_bundles
336            .checked_add(num_incoming_bundles)
337            .ok_or(ArithmeticError::Overflow)?;
338
339        let num_operations =
340            u32::try_from(new_block.operations.len()).map_err(|_| ArithmeticError::Overflow)?;
341        self.num_operations
342            .checked_add(num_operations)
343            .ok_or(ArithmeticError::Overflow)?;
344
345        let num_outgoing_messages =
346            u32::try_from(outcome.messages.len()).map_err(|_| ArithmeticError::Overflow)?;
347        self.num_outgoing_messages
348            .checked_add(num_outgoing_messages)
349            .ok_or(ArithmeticError::Overflow)?;
350
351        Ok(())
352    }
353}
354
355/// The state of a channel followed by subscribers.
356#[derive(Debug, ClonableView, View, SimpleObject)]
357pub struct ChannelStateView<C>
358where
359    C: Context + Send + Sync,
360{
361    /// The current subscribers.
362    pub subscribers: SetView<C, ChainId>,
363    /// The block heights so far, to be sent to future subscribers.
364    pub block_heights: LogView<C, BlockHeight>,
365}
366
367impl<C> ChainStateView<C>
368where
369    C: Context + Clone + Send + Sync + 'static,
370    C::Extra: ExecutionRuntimeContext,
371{
372    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
373    pub fn chain_id(&self) -> ChainId {
374        self.context().extra().chain_id()
375    }
376
377    pub async fn query_application(
378        &mut self,
379        local_time: Timestamp,
380        query: Query,
381        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
382    ) -> Result<Response, ChainError> {
383        let context = QueryContext {
384            chain_id: self.chain_id(),
385            next_block_height: self.tip_state.get().next_block_height,
386            local_time,
387        };
388        self.execution_state
389            .query_application(context, query, service_runtime_endpoint)
390            .await
391            .with_execution_context(ChainExecutionContext::Query)
392    }
393
394    pub async fn describe_application(
395        &mut self,
396        application_id: UserApplicationId,
397    ) -> Result<UserApplicationDescription, ChainError> {
398        self.execution_state
399            .system
400            .registry
401            .describe_application(application_id)
402            .await
403            .with_execution_context(ChainExecutionContext::DescribeApplication)
404    }
405
406    pub async fn mark_messages_as_received(
407        &mut self,
408        target: &Target,
409        height: BlockHeight,
410    ) -> Result<bool, ChainError> {
411        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
412        let updates = outbox.mark_messages_as_received(height).await?;
413        if updates.is_empty() {
414            return Ok(false);
415        }
416        for update in updates {
417            let counter = self
418                .outbox_counters
419                .get_mut()
420                .get_mut(&update)
421                .expect("message counter should be present");
422            *counter = counter
423                .checked_sub(1)
424                .expect("message counter should not underflow");
425            if *counter == 0 {
426                // Important for the test in `all_messages_delivered_up_to`.
427                self.outbox_counters.get_mut().remove(&update);
428            }
429        }
430        if outbox.queue.count() == 0 {
431            self.outboxes.remove_entry(target)?;
432        }
433        Ok(true)
434    }
435
436    /// Returns true if there are no more outgoing messages in flight up to the given
437    /// block height.
438    pub fn all_messages_delivered_up_to(&mut self, height: BlockHeight) -> bool {
439        tracing::debug!(
440            "Messages left in {:.8}'s outbox: {:?}",
441            self.chain_id(),
442            self.outbox_counters.get()
443        );
444        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
445            key > &height
446        } else {
447            true
448        }
449    }
450
451    /// Invariant for the states of active chains.
452    pub fn is_active(&self) -> bool {
453        self.execution_state.system.is_active()
454    }
455
456    /// Returns whether this chain has been closed.
457    pub fn is_closed(&self) -> bool {
458        *self.execution_state.system.closed.get()
459    }
460
461    /// Invariant for the states of active chains.
462    pub fn ensure_is_active(&self) -> Result<(), ChainError> {
463        if self.is_active() {
464            Ok(())
465        } else {
466            Err(ChainError::InactiveChain(self.chain_id()))
467        }
468    }
469
470    /// Verifies that this chain is up-to-date and all the messages executed ahead of time
471    /// have been properly received by now.
472    pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
473        let chain_id = self.chain_id();
474        let pairs = self.inboxes.try_load_all_entries().await?;
475        let max_stream_queries = self.context().max_stream_queries();
476        let stream = stream::iter(pairs)
477            .map(|(origin, inbox)| async move {
478                if let Some(bundle) = inbox.removed_bundles.front().await? {
479                    return Err(ChainError::MissingCrossChainUpdate {
480                        chain_id,
481                        origin: origin.into(),
482                        height: bundle.height,
483                    });
484                }
485                Ok::<(), ChainError>(())
486            })
487            .buffer_unordered(max_stream_queries);
488        stream.try_collect::<Vec<_>>().await?;
489        Ok(())
490    }
491
492    pub async fn next_block_height_to_receive(
493        &self,
494        origin: &Origin,
495    ) -> Result<BlockHeight, ChainError> {
496        let inbox = self.inboxes.try_load_entry(origin).await?;
497        match inbox {
498            Some(inbox) => inbox.next_block_height_to_receive(),
499            None => Ok(BlockHeight::from(0)),
500        }
501    }
502
503    pub async fn last_anticipated_block_height(
504        &self,
505        origin: &Origin,
506    ) -> Result<Option<BlockHeight>, ChainError> {
507        let inbox = self.inboxes.try_load_entry(origin).await?;
508        match inbox {
509            Some(inbox) => match inbox.removed_bundles.back().await? {
510                Some(bundle) => Ok(Some(bundle.height)),
511                None => Ok(None),
512            },
513            None => Ok(None),
514        }
515    }
516
517    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
518    /// internal error if the bundle doesn't appear to be new, based on the sender's
519    /// height. The value `local_time` is specific to each validator and only used for
520    /// round timeouts.
521    ///
522    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
523    pub async fn receive_message_bundle(
524        &mut self,
525        origin: &Origin,
526        bundle: MessageBundle,
527        local_time: Timestamp,
528        add_to_received_log: bool,
529    ) -> Result<bool, ChainError> {
530        assert!(!bundle.messages.is_empty());
531        let chain_id = self.chain_id();
532        tracing::trace!(
533            "Processing new messages to {chain_id:.8} from {origin} at height {}",
534            bundle.height,
535        );
536        let chain_and_height = ChainAndHeight {
537            chain_id: origin.sender,
538            height: bundle.height,
539        };
540        let mut subscribe_names_and_ids = Vec::new();
541        let mut unsubscribe_names_and_ids = Vec::new();
542
543        // Handle immediate messages.
544        for posted_message in &bundle.messages {
545            if let Some(config) = posted_message.message.matches_open_chain() {
546                if self.execution_state.system.description.get().is_none() {
547                    let message_id = chain_and_height.to_message_id(posted_message.index);
548                    self.execute_init_message(message_id, config, bundle.timestamp, local_time)
549                        .await?;
550                }
551            } else if let Some((id, subscription)) = posted_message.message.matches_subscribe() {
552                subscribe_names_and_ids.push((subscription.name.clone(), *id));
553            }
554            if let Some((id, subscription)) = posted_message.message.matches_unsubscribe() {
555                unsubscribe_names_and_ids.push((subscription.name.clone(), *id));
556            }
557        }
558        self.process_unsubscribes(unsubscribe_names_and_ids, GenericApplicationId::System)
559            .await?;
560        let new_outbox_entries = self
561            .process_subscribes(subscribe_names_and_ids, GenericApplicationId::System)
562            .await?;
563
564        if bundle.goes_to_inbox() {
565            // Process the inbox bundle and update the inbox state.
566            let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
567            let entry = BundleInInbox::new(origin.clone(), &bundle);
568            let skippable = bundle.is_skippable();
569            let newly_added = inbox
570                .add_bundle(bundle)
571                .await
572                .map_err(|error| match error {
573                    InboxError::ViewError(error) => ChainError::ViewError(error),
574                    error => ChainError::InternalError(format!(
575                        "while processing messages in certified block: {error}"
576                    )),
577                })?;
578            if newly_added && !skippable {
579                let seen = local_time;
580                self.unskippable_bundles
581                    .push_back(TimestampedBundleInInbox { entry, seen });
582            }
583        }
584
585        // Remember the certificate for future validator/client synchronizations.
586        if add_to_received_log {
587            self.received_log.push(chain_and_height);
588        }
589        Ok(new_outbox_entries)
590    }
591
592    pub async fn execute_init_message(
593        &mut self,
594        message_id: MessageId,
595        config: &OpenChainConfig,
596        timestamp: Timestamp,
597        local_time: Timestamp,
598    ) -> Result<bool, ChainError> {
599        // Initialize ourself.
600        self.execution_state
601            .system
602            .initialize_chain(message_id, timestamp, config.clone());
603        // Recompute the state hash.
604        let hash = self.execution_state.crypto_hash().await?;
605        self.execution_state_hash.set(Some(hash));
606        let maybe_committee = self.execution_state.system.current_committee().into_iter();
607        // Last, reset the consensus state based on the current ownership.
608        self.manager.get_mut().reset(
609            self.execution_state.system.ownership.get(),
610            BlockHeight(0),
611            local_time,
612            maybe_committee.flat_map(|(_, committee)| committee.keys_and_weights()),
613        )?;
614        Ok(true)
615    }
616
617    /// Removes the incoming message bundles in the block from the inboxes.
618    pub async fn remove_bundles_from_inboxes(&mut self, block: &Block) -> Result<(), ChainError> {
619        let chain_id = self.chain_id();
620        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
621        for IncomingBundle { bundle, origin, .. } in &block.incoming_bundles {
622            ensure!(
623                bundle.timestamp <= block.timestamp,
624                ChainError::IncorrectBundleTimestamp {
625                    chain_id,
626                    bundle_timestamp: bundle.timestamp,
627                    block_timestamp: block.timestamp,
628                }
629            );
630            let bundles = bundles_by_origin.entry(origin).or_default();
631            bundles.push(bundle);
632        }
633        let origins = bundles_by_origin.keys().copied();
634        let inboxes = self.inboxes.try_load_entries_mut(origins).await?;
635        let mut removed_unskippable = HashSet::new();
636        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
637            tracing::trace!(
638                "Removing {:?} from {chain_id:.8}'s inbox for {origin:}",
639                bundles
640                    .iter()
641                    .map(|bundle| bundle.height)
642                    .collect::<Vec<_>>()
643            );
644            for bundle in bundles {
645                // Mark the message as processed in the inbox.
646                let was_present = inbox
647                    .remove_bundle(bundle)
648                    .await
649                    .map_err(|error| ChainError::from((chain_id, origin.clone(), error)))?;
650                if was_present && !bundle.is_skippable() {
651                    removed_unskippable.insert(BundleInInbox::new(origin.clone(), bundle));
652                }
653            }
654        }
655        if !removed_unskippable.is_empty() {
656            // Delete all removed bundles from the front of the unskippable queue.
657            let maybe_front = self.unskippable_bundles.front().await?;
658            if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
659                self.unskippable_bundles.delete_front();
660                while let Some(ts_entry) = self.unskippable_bundles.front().await? {
661                    if !removed_unskippable.remove(&ts_entry.entry) {
662                        if !self
663                            .removed_unskippable_bundles
664                            .contains(&ts_entry.entry)
665                            .await?
666                        {
667                            break;
668                        }
669                        self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
670                    }
671                    self.unskippable_bundles.delete_front();
672                }
673            }
674            for entry in removed_unskippable {
675                self.removed_unskippable_bundles.insert(&entry)?;
676            }
677        }
678        Ok(())
679    }
680
681    /// Executes a block: first the incoming messages, then the main operation.
682    /// * Modifies the state of outboxes and channels, if needed.
683    /// * As usual, in case of errors, `self` may not be consistent any more and should be thrown
684    ///   away.
685    /// * Returns the outcome of the execution.
686    pub async fn execute_block(
687        &mut self,
688        block: &Block,
689        local_time: Timestamp,
690        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
691    ) -> Result<BlockExecutionOutcome, ChainError> {
692        #[cfg(with_metrics)]
693        let _execution_latency = BLOCK_EXECUTION_LATENCY.measure_latency();
694
695        let chain_id = self.chain_id();
696        assert_eq!(block.chain_id, chain_id);
697        // The first incoming message of any child chain must be `OpenChain`. A root chain must
698        // already be initialized
699        if block.height == BlockHeight::ZERO
700            && self
701                .execution_state
702                .system
703                .description
704                .get()
705                .map_or(true, |description| description.is_child())
706        {
707            let (in_bundle, posted_message, config) = block
708                .starts_with_open_chain_message()
709                .ok_or_else(|| ChainError::InactiveChain(chain_id))?;
710            if !self.is_active() {
711                let message_id = MessageId {
712                    chain_id: in_bundle.origin.sender,
713                    height: in_bundle.bundle.height,
714                    index: posted_message.index,
715                };
716                self.execute_init_message(message_id, config, block.timestamp, local_time)
717                    .await?;
718            }
719        }
720
721        ensure!(
722            *self.execution_state.system.timestamp.get() <= block.timestamp,
723            ChainError::InvalidBlockTimestamp
724        );
725        self.execution_state.system.timestamp.set(block.timestamp);
726        let Some((_, committee)) = self.execution_state.system.current_committee() else {
727            return Err(ChainError::InactiveChain(chain_id));
728        };
729        let mut resource_controller = ResourceController {
730            policy: Arc::new(committee.policy().clone()),
731            tracker: ResourceTracker::default(),
732            account: block.authenticated_signer,
733        };
734        resource_controller
735            .track_executed_block_size(EMPTY_EXECUTED_BLOCK_SIZE)
736            .and_then(|()| {
737                resource_controller
738                    .track_executed_block_size_sequence_extension(0, block.incoming_bundles.len())
739            })
740            .and_then(|()| {
741                resource_controller
742                    .track_executed_block_size_sequence_extension(0, block.operations.len())
743            })
744            .with_execution_context(ChainExecutionContext::Block)?;
745
746        if self.is_closed() {
747            ensure!(
748                !block.incoming_bundles.is_empty() && block.has_only_rejected_messages(),
749                ChainError::ClosedChain
750            );
751        }
752        let app_permissions = self.execution_state.system.application_permissions.get();
753        let mut mandatory = HashSet::<UserApplicationId>::from_iter(
754            app_permissions.mandatory_applications.iter().cloned(),
755        );
756        for operation in &block.operations {
757            ensure!(
758                app_permissions.can_execute_operations(&operation.application_id()),
759                ChainError::AuthorizedApplications(
760                    app_permissions.execute_operations.clone().unwrap()
761                )
762            );
763            if let Operation::User { application_id, .. } = operation {
764                mandatory.remove(application_id);
765            }
766        }
767        for pending in block.incoming_messages() {
768            if mandatory.is_empty() {
769                break;
770            }
771            if let Message::User { application_id, .. } = &pending.message {
772                mandatory.remove(application_id);
773            }
774        }
775        ensure!(
776            mandatory.is_empty(),
777            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
778        );
779
780        // Execute each incoming bundle as a transaction, then each operation.
781        // Collect messages, events and oracle responses, each as one list per transaction.
782        let mut replaying_oracle_responses = replaying_oracle_responses.map(Vec::into_iter);
783        let mut next_message_index = 0;
784        let mut oracle_responses = Vec::new();
785        let mut events = Vec::new();
786        let mut messages = Vec::new();
787        for (txn_index, transaction) in block.transactions() {
788            let chain_execution_context = match transaction {
789                Transaction::ReceiveMessages(_) => ChainExecutionContext::IncomingBundle(txn_index),
790                Transaction::ExecuteOperation(_) => ChainExecutionContext::Operation(txn_index),
791            };
792            let maybe_responses = match replaying_oracle_responses.as_mut().map(Iterator::next) {
793                Some(Some(responses)) => Some(responses),
794                Some(None) => return Err(ChainError::MissingOracleResponseList),
795                None => None,
796            };
797            let mut txn_tracker = TransactionTracker::new(next_message_index, maybe_responses);
798            match transaction {
799                Transaction::ReceiveMessages(incoming_bundle) => {
800                    resource_controller
801                        .track_executed_block_size_of(&incoming_bundle)
802                        .with_execution_context(chain_execution_context)?;
803                    for (message_id, posted_message) in incoming_bundle.messages_and_ids() {
804                        self.execute_message_in_block(
805                            message_id,
806                            posted_message,
807                            incoming_bundle,
808                            block,
809                            txn_index,
810                            local_time,
811                            &mut txn_tracker,
812                            &mut resource_controller,
813                        )
814                        .await?;
815                    }
816                }
817                Transaction::ExecuteOperation(operation) => {
818                    resource_controller
819                        .track_executed_block_size_of(&operation)
820                        .with_execution_context(chain_execution_context)?;
821                    #[cfg(with_metrics)]
822                    let _operation_latency = OPERATION_EXECUTION_LATENCY.measure_latency();
823                    let context = OperationContext {
824                        chain_id,
825                        height: block.height,
826                        index: Some(txn_index),
827                        authenticated_signer: block.authenticated_signer,
828                        authenticated_caller_id: None,
829                    };
830                    self.execution_state
831                        .execute_operation(
832                            context,
833                            local_time,
834                            operation.clone(),
835                            &mut txn_tracker,
836                            &mut resource_controller,
837                        )
838                        .await
839                        .with_execution_context(chain_execution_context)?;
840                    resource_controller
841                        .with_state(&mut self.execution_state)
842                        .await?
843                        .track_operation(operation)
844                        .with_execution_context(chain_execution_context)?;
845                }
846            }
847
848            self.execution_state
849                .update_execution_outcomes_with_app_registrations(&mut txn_tracker)
850                .await
851                .with_execution_context(chain_execution_context)?;
852            let (txn_outcomes, txn_oracle_responses, new_next_message_index) = txn_tracker
853                .destructure()
854                .with_execution_context(chain_execution_context)?;
855            next_message_index = new_next_message_index;
856            let (txn_messages, txn_events) = self
857                .process_execution_outcomes(block.height, txn_outcomes)
858                .await?;
859            if matches!(
860                transaction,
861                Transaction::ExecuteOperation(_)
862                    | Transaction::ReceiveMessages(IncomingBundle {
863                        action: MessageAction::Accept,
864                        ..
865                    })
866            ) {
867                for message_out in &txn_messages {
868                    resource_controller
869                        .with_state(&mut self.execution_state)
870                        .await?
871                        .track_message(&message_out.message)
872                        .with_execution_context(chain_execution_context)?;
873                }
874            }
875            resource_controller
876                .track_executed_block_size_of(&(&txn_oracle_responses, &txn_messages, &txn_events))
877                .with_execution_context(chain_execution_context)?;
878            resource_controller
879                .track_executed_block_size_sequence_extension(oracle_responses.len(), 1)
880                .with_execution_context(chain_execution_context)?;
881            resource_controller
882                .track_executed_block_size_sequence_extension(messages.len(), 1)
883                .with_execution_context(chain_execution_context)?;
884            resource_controller
885                .track_executed_block_size_sequence_extension(events.len(), 1)
886                .with_execution_context(chain_execution_context)?;
887            oracle_responses.push(txn_oracle_responses);
888            messages.push(txn_messages);
889            events.push(txn_events);
890        }
891
892        // Finally, charge for the block fee, except if the chain is closed. Closed chains should
893        // always be able to reject incoming messages.
894        if !self.is_closed() {
895            resource_controller
896                .with_state(&mut self.execution_state)
897                .await?
898                .track_block()
899                .with_execution_context(ChainExecutionContext::Block)?;
900        }
901
902        // Recompute the state hash.
903        let state_hash = {
904            #[cfg(with_metrics)]
905            let _hash_latency = STATE_HASH_COMPUTATION_LATENCY.measure_latency();
906            self.execution_state.crypto_hash().await?
907        };
908        self.execution_state_hash.set(Some(state_hash));
909        // Last, reset the consensus state based on the current ownership.
910        let maybe_committee = self.execution_state.system.current_committee().into_iter();
911        self.manager.get_mut().reset(
912            self.execution_state.system.ownership.get(),
913            block.height.try_add_one()?,
914            local_time,
915            maybe_committee.flat_map(|(_, committee)| committee.keys_and_weights()),
916        )?;
917
918        #[cfg(with_metrics)]
919        {
920            // Log Prometheus metrics
921            NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
922            WASM_FUEL_USED_PER_BLOCK
923                .with_label_values(&[])
924                .observe(resource_controller.tracker.fuel as f64);
925            WASM_NUM_READS_PER_BLOCK
926                .with_label_values(&[])
927                .observe(resource_controller.tracker.read_operations as f64);
928            WASM_BYTES_READ_PER_BLOCK
929                .with_label_values(&[])
930                .observe(resource_controller.tracker.bytes_read as f64);
931            WASM_BYTES_WRITTEN_PER_BLOCK
932                .with_label_values(&[])
933                .observe(resource_controller.tracker.bytes_written as f64);
934        }
935
936        assert_eq!(
937            messages.len(),
938            block.incoming_bundles.len() + block.operations.len()
939        );
940        let outcome = BlockExecutionOutcome {
941            messages,
942            state_hash,
943            oracle_responses,
944            events,
945        };
946        Ok(outcome)
947    }
948
949    /// Executes a message as part of an incoming bundle in a block.
950    #[expect(clippy::too_many_arguments)]
951    async fn execute_message_in_block(
952        &mut self,
953        message_id: MessageId,
954        posted_message: &PostedMessage,
955        incoming_bundle: &IncomingBundle,
956        block: &Block,
957        txn_index: u32,
958        local_time: Timestamp,
959        txn_tracker: &mut TransactionTracker,
960        resource_controller: &mut ResourceController<Option<Owner>>,
961    ) -> Result<(), ChainError> {
962        #[cfg(with_metrics)]
963        let _message_latency = MESSAGE_EXECUTION_LATENCY.measure_latency();
964        let context = MessageContext {
965            chain_id: block.chain_id,
966            is_bouncing: posted_message.is_bouncing(),
967            height: block.height,
968            certificate_hash: incoming_bundle.bundle.certificate_hash,
969            message_id,
970            authenticated_signer: posted_message.authenticated_signer,
971            refund_grant_to: posted_message.refund_grant_to,
972        };
973        let mut grant = posted_message.grant;
974        match incoming_bundle.action {
975            MessageAction::Accept => {
976                // Once a chain is closed, accepting incoming messages is not allowed.
977                ensure!(!self.is_closed(), ChainError::ClosedChain);
978
979                self.execution_state
980                    .execute_message(
981                        context,
982                        local_time,
983                        posted_message.message.clone(),
984                        (grant > Amount::ZERO).then_some(&mut grant),
985                        txn_tracker,
986                        resource_controller,
987                    )
988                    .await
989                    .with_execution_context(ChainExecutionContext::IncomingBundle(txn_index))?;
990                if grant > Amount::ZERO {
991                    if let Some(refund_grant_to) = posted_message.refund_grant_to {
992                        self.execution_state
993                            .send_refund(context, grant, refund_grant_to, txn_tracker)
994                            .await
995                            .with_execution_context(ChainExecutionContext::IncomingBundle(
996                                txn_index,
997                            ))?;
998                    }
999                }
1000            }
1001            MessageAction::Reject => {
1002                // If rejecting a message fails, the entire block proposal should be
1003                // scrapped.
1004                ensure!(
1005                    !posted_message.is_protected() || self.is_closed(),
1006                    ChainError::CannotRejectMessage {
1007                        chain_id: block.chain_id,
1008                        origin: Box::new(incoming_bundle.origin.clone()),
1009                        posted_message: Box::new(posted_message.clone()),
1010                    }
1011                );
1012                if posted_message.is_tracked() {
1013                    // Bounce the message.
1014                    self.execution_state
1015                        .bounce_message(context, grant, posted_message.message.clone(), txn_tracker)
1016                        .await
1017                        .with_execution_context(ChainExecutionContext::Block)?;
1018                } else if grant > Amount::ZERO {
1019                    // Nothing to do except maybe refund the grant.
1020                    let Some(refund_grant_to) = posted_message.refund_grant_to else {
1021                        // See OperationContext::refund_grant_to()
1022                        return Err(ChainError::InternalError(
1023                            "Messages with grants should have a non-empty `refund_grant_to`".into(),
1024                        ));
1025                    };
1026                    // Refund grant.
1027                    self.execution_state
1028                        .send_refund(context, posted_message.grant, refund_grant_to, txn_tracker)
1029                        .await
1030                        .with_execution_context(ChainExecutionContext::Block)?;
1031                }
1032            }
1033        }
1034        Ok(())
1035    }
1036
1037    async fn process_execution_outcomes(
1038        &mut self,
1039        height: BlockHeight,
1040        results: Vec<ExecutionOutcome>,
1041    ) -> Result<(Vec<OutgoingMessage>, Vec<EventRecord>), ChainError> {
1042        let mut messages = Vec::new();
1043        let mut events = Vec::new();
1044        for result in results {
1045            match result {
1046                ExecutionOutcome::System(result) => {
1047                    self.process_raw_execution_outcome(
1048                        GenericApplicationId::System,
1049                        Message::System,
1050                        &mut messages,
1051                        &mut events,
1052                        height,
1053                        result,
1054                    )
1055                    .await?;
1056                }
1057                ExecutionOutcome::User(application_id, result) => {
1058                    self.process_raw_execution_outcome(
1059                        GenericApplicationId::User(application_id),
1060                        |bytes| Message::User {
1061                            application_id,
1062                            bytes,
1063                        },
1064                        &mut messages,
1065                        &mut events,
1066                        height,
1067                        result,
1068                    )
1069                    .await?;
1070                }
1071            }
1072        }
1073        Ok((messages, events))
1074    }
1075
1076    async fn process_raw_execution_outcome<E, F>(
1077        &mut self,
1078        application_id: GenericApplicationId,
1079        lift: F,
1080        messages: &mut Vec<OutgoingMessage>,
1081        events: &mut Vec<EventRecord>,
1082        height: BlockHeight,
1083        raw_outcome: RawExecutionOutcome<E, Amount>,
1084    ) -> Result<(), ChainError>
1085    where
1086        F: Fn(E) -> Message,
1087    {
1088        events.extend(
1089            raw_outcome
1090                .events
1091                .into_iter()
1092                .map(|(stream_name, key, value)| EventRecord {
1093                    stream_id: StreamId {
1094                        application_id,
1095                        stream_name,
1096                    },
1097                    key,
1098                    value,
1099                }),
1100        );
1101        let max_stream_queries = self.context().max_stream_queries();
1102        // Record the messages of the execution. Messages are understood within an
1103        // application.
1104        let mut recipients = HashSet::new();
1105        let mut channel_broadcasts = HashSet::new();
1106        for RawOutgoingMessage {
1107            destination,
1108            authenticated,
1109            grant,
1110            kind,
1111            message,
1112        } in raw_outcome.messages
1113        {
1114            match &destination {
1115                Destination::Recipient(id) => {
1116                    recipients.insert(*id);
1117                }
1118                Destination::Subscribers(name) => {
1119                    ensure!(grant == Amount::ZERO, ChainError::GrantUseOnBroadcast);
1120                    channel_broadcasts.insert(name.clone());
1121                }
1122            }
1123            let authenticated_signer = raw_outcome.authenticated_signer.filter(|_| authenticated);
1124            let refund_grant_to = raw_outcome.refund_grant_to.filter(|_| grant > Amount::ZERO);
1125            messages.push(OutgoingMessage {
1126                destination,
1127                authenticated_signer,
1128                grant,
1129                refund_grant_to,
1130                kind,
1131                message: lift(message),
1132            });
1133        }
1134
1135        // Update the (regular) outboxes.
1136        let outbox_counters = self.outbox_counters.get_mut();
1137        let targets = recipients
1138            .into_iter()
1139            .map(Target::chain)
1140            .collect::<Vec<_>>();
1141        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1142        for mut outbox in outboxes {
1143            if outbox.schedule_message(height)? {
1144                *outbox_counters.entry(height).or_default() += 1;
1145            }
1146        }
1147
1148        // Update the channels.
1149        self.process_unsubscribes(raw_outcome.unsubscribe, application_id)
1150            .await?;
1151
1152        let full_names = channel_broadcasts
1153            .into_iter()
1154            .map(|name| ChannelFullName {
1155                application_id,
1156                name,
1157            })
1158            .collect::<Vec<_>>();
1159        let channels = self.channels.try_load_entries_mut(&full_names).await?;
1160        let stream = full_names.into_iter().zip(channels);
1161        let stream = stream::iter(stream)
1162            .map(|(full_name, mut channel)| async move {
1163                let recipients = channel.subscribers.indices().await?;
1164                channel.block_heights.push(height);
1165                let targets = recipients
1166                    .into_iter()
1167                    .map(|recipient| Target::channel(recipient, full_name.clone()))
1168                    .collect::<Vec<_>>();
1169                Ok::<_, ChainError>(targets)
1170            })
1171            .buffer_unordered(max_stream_queries);
1172        let infos = stream.try_collect::<Vec<_>>().await?;
1173        let targets = infos.into_iter().flatten().collect::<Vec<_>>();
1174        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1175        let outbox_counters = self.outbox_counters.get_mut();
1176        for mut outbox in outboxes {
1177            if outbox.schedule_message(height)? {
1178                *outbox_counters.entry(height).or_default() += 1;
1179            }
1180        }
1181
1182        self.process_subscribes(raw_outcome.subscribe, application_id)
1183            .await?;
1184        Ok(())
1185    }
1186
1187    /// Processes new subscriptions. Returns `true` if at least one new subscriber was added for
1188    /// which we have outgoing messages.
1189    async fn process_subscribes(
1190        &mut self,
1191        names_and_ids: Vec<(ChannelName, ChainId)>,
1192        application_id: GenericApplicationId,
1193    ) -> Result<bool, ChainError> {
1194        if names_and_ids.is_empty() {
1195            return Ok(false);
1196        }
1197        let full_names = names_and_ids
1198            .iter()
1199            .map(|(name, _)| ChannelFullName {
1200                application_id,
1201                name: name.clone(),
1202            })
1203            .collect::<Vec<_>>();
1204        let channels = self.channels.try_load_entries_mut(&full_names).await?;
1205        let subscribe_channels = names_and_ids.into_iter().zip(channels);
1206        let max_stream_queries = self.context().max_stream_queries();
1207        let stream = stream::iter(subscribe_channels)
1208            .map(|((name, id), mut channel)| async move {
1209                if channel.subscribers.contains(&id).await? {
1210                    return Ok(None); // Was already a subscriber.
1211                }
1212                let full_name = ChannelFullName {
1213                    application_id,
1214                    name,
1215                };
1216                tracing::trace!("Adding subscriber {id:.8} for {full_name:}");
1217                channel.subscribers.insert(&id)?;
1218                // Send all messages.
1219                let heights = channel.block_heights.read(..).await?;
1220                if heights.is_empty() {
1221                    return Ok(None); // No messages on this channel yet.
1222                }
1223                let target = Target::channel(id, full_name.clone());
1224                Ok::<_, ChainError>(Some((target, heights)))
1225            })
1226            .buffer_unordered(max_stream_queries);
1227        let infos = stream.try_collect::<Vec<_>>().await?;
1228        let (targets, heights): (Vec<_>, Vec<_>) = infos.into_iter().flatten().unzip();
1229        let mut new_outbox_entries = false;
1230        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1231        let outbox_counters = self.outbox_counters.get_mut();
1232        for (heights, mut outbox) in heights.into_iter().zip(outboxes) {
1233            for height in heights {
1234                if outbox.schedule_message(height)? {
1235                    *outbox_counters.entry(height).or_default() += 1;
1236                    new_outbox_entries = true;
1237                }
1238            }
1239        }
1240        Ok(new_outbox_entries)
1241    }
1242
1243    async fn process_unsubscribes(
1244        &mut self,
1245        names_and_ids: Vec<(ChannelName, ChainId)>,
1246        application_id: GenericApplicationId,
1247    ) -> Result<(), ChainError> {
1248        if names_and_ids.is_empty() {
1249            return Ok(());
1250        }
1251        let full_names = names_and_ids
1252            .iter()
1253            .map(|(name, _)| ChannelFullName {
1254                application_id,
1255                name: name.clone(),
1256            })
1257            .collect::<Vec<_>>();
1258        let channels = self.channels.try_load_entries_mut(&full_names).await?;
1259        for ((_name, id), mut channel) in names_and_ids.into_iter().zip(channels) {
1260            // Remove subscriber. Do not remove the channel outbox yet.
1261            channel.subscribers.remove(&id)?;
1262        }
1263        Ok(())
1264    }
1265}
1266
1267#[test]
1268fn empty_executed_block_size() {
1269    let executed_block = crate::data_types::ExecutedBlock {
1270        block: crate::test::make_first_block(ChainId::root(0)),
1271        outcome: crate::data_types::BlockExecutionOutcome::default(),
1272    };
1273    let size = bcs::serialized_size(&executed_block).unwrap();
1274    assert_eq!(size, EMPTY_EXECUTED_BLOCK_SIZE);
1275}