linera_chain/
inbox.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_graphql::SimpleObject;
5use linera_base::{
6    data_types::{ArithmeticError, BlockHeight},
7    ensure,
8    identifiers::ChainId,
9};
10#[cfg(with_testing)]
11use linera_views::context::{create_test_memory_context, MemoryContext};
12use linera_views::{
13    context::Context,
14    queue_view::QueueView,
15    register_view::RegisterView,
16    views::{ClonableView, View, ViewError},
17};
18use serde::{Deserialize, Serialize};
19use thiserror::Error;
20
21use crate::{data_types::MessageBundle, ChainError, Origin};
22
23#[cfg(test)]
24#[path = "unit_tests/inbox_tests.rs"]
25mod inbox_tests;
26
27/// The state of an inbox.
28/// * An inbox is used to track bundles received and executed locally.
29/// * A `MessageBundle` consists of a logical cursor `(height, index)` and some message
30///   content `messages`.
31/// * On the surface, an inbox looks like a FIFO queue: the main APIs are `add_bundle` and
32///   `remove_bundle`.
33/// * However, bundles can also be removed before they are added. When this happens,
34///   the bundles removed by anticipation are tracked in a separate queue. Any bundle added
35///   later will be required to match the first removed bundle and so on.
36/// * The cursors of added bundles (resp. removed bundles) must be increasing over time.
37/// * Reconciliation of added and removed bundles is allowed to skip some added bundles.
38///   However, the opposite is not true: every removed bundle must be eventually added.
39#[derive(Debug, ClonableView, View, async_graphql::SimpleObject)]
40pub struct InboxStateView<C>
41where
42    C: Clone + Context + Send + Sync,
43{
44    /// We have already added all the messages below this height and index.
45    pub next_cursor_to_add: RegisterView<C, Cursor>,
46    /// We have already removed all the messages below this height and index.
47    pub next_cursor_to_remove: RegisterView<C, Cursor>,
48    /// These bundles have been added and are waiting to be removed.
49    pub added_bundles: QueueView<C, MessageBundle>,
50    /// These bundles have been removed by anticipation and are waiting to be added.
51    /// At least one of `added_bundles` and `removed_bundles` should be empty.
52    pub removed_bundles: QueueView<C, MessageBundle>,
53}
54
55#[derive(
56    Debug,
57    Default,
58    Clone,
59    Copy,
60    Hash,
61    Eq,
62    PartialEq,
63    Ord,
64    PartialOrd,
65    Serialize,
66    Deserialize,
67    SimpleObject,
68)]
69pub struct Cursor {
70    height: BlockHeight,
71    index: u32,
72}
73
74#[derive(Error, Debug)]
75pub(crate) enum InboxError {
76    #[error(transparent)]
77    ViewError(#[from] ViewError),
78    #[error(transparent)]
79    ArithmeticError(#[from] ArithmeticError),
80    #[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
81    UnexpectedBundle {
82        bundle: MessageBundle,
83        previous_bundle: MessageBundle,
84    },
85    #[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
86    IncorrectOrder {
87        bundle: MessageBundle,
88        next_cursor: Cursor,
89    },
90    #[error(
91        "{bundle:?} cannot be skipped: it must be received before the next \
92        messages from the same origin"
93    )]
94    UnskippableBundle { bundle: MessageBundle },
95}
96
97impl From<&MessageBundle> for Cursor {
98    #[inline]
99    fn from(bundle: &MessageBundle) -> Self {
100        Self {
101            height: bundle.height,
102            index: bundle.transaction_index,
103        }
104    }
105}
106
107impl Cursor {
108    fn try_add_one(self) -> Result<Self, ArithmeticError> {
109        let value = Self {
110            height: self.height,
111            index: self.index.checked_add(1).ok_or(ArithmeticError::Overflow)?,
112        };
113        Ok(value)
114    }
115}
116
117impl From<(ChainId, Origin, InboxError)> for ChainError {
118    fn from(value: (ChainId, Origin, InboxError)) -> Self {
119        let (chain_id, origin, error) = value;
120        match error {
121            InboxError::ViewError(e) => ChainError::ViewError(e),
122            InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
123            InboxError::UnexpectedBundle {
124                bundle,
125                previous_bundle,
126            } => ChainError::UnexpectedMessage {
127                chain_id,
128                origin: origin.into(),
129                bundle: Box::new(bundle),
130                previous_bundle: Box::new(previous_bundle),
131            },
132            InboxError::IncorrectOrder {
133                bundle,
134                next_cursor,
135            } => ChainError::IncorrectMessageOrder {
136                chain_id,
137                origin: origin.into(),
138                bundle: Box::new(bundle),
139                next_height: next_cursor.height,
140                next_index: next_cursor.index,
141            },
142            InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
143                chain_id,
144                origin: origin.into(),
145                bundle: Box::new(bundle),
146            },
147        }
148    }
149}
150
151impl<C> InboxStateView<C>
152where
153    C: Context + Clone + Send + Sync + 'static,
154{
155    /// Converts the internal cursor for added bundles into an externally-visible block height.
156    /// This makes sense because the rest of the system always adds bundles one block at a time.
157    pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
158        let cursor = self.next_cursor_to_add.get();
159        if cursor.index == 0 {
160            Ok(cursor.height)
161        } else {
162            Ok(cursor.height.try_add_one()?)
163        }
164    }
165
166    /// Consumes a bundle from the inbox.
167    ///
168    /// Returns `true` if the bundle was already known, i.e. it was present in `added_bundles`.
169    pub(crate) async fn remove_bundle(
170        &mut self,
171        bundle: &MessageBundle,
172    ) -> Result<bool, InboxError> {
173        // Record the latest cursor.
174        let cursor = Cursor::from(bundle);
175        ensure!(
176            cursor >= *self.next_cursor_to_remove.get(),
177            InboxError::IncorrectOrder {
178                bundle: bundle.clone(),
179                next_cursor: *self.next_cursor_to_remove.get(),
180            }
181        );
182        // Discard added bundles with lower cursors (if any).
183        while let Some(previous_bundle) = self.added_bundles.front().await? {
184            if Cursor::from(&previous_bundle) >= cursor {
185                break;
186            }
187            ensure!(
188                previous_bundle.is_skippable(),
189                InboxError::UnskippableBundle {
190                    bundle: previous_bundle
191                }
192            );
193            self.added_bundles.delete_front();
194            tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
195        }
196        // Reconcile the bundle with the next added bundle, or mark it as removed.
197        let already_known = match self.added_bundles.front().await? {
198            Some(previous_bundle) => {
199                // Rationale: If the two cursors are equal, then the bundles should match.
200                // Otherwise, at this point we know that `self.next_cursor_to_add >
201                // Cursor::from(&previous_bundle) > cursor`. Notably, `bundle` will never be
202                // added in the future. Therefore, we should fail instead of adding
203                // it to `self.removed_bundles`.
204                ensure!(
205                    bundle == &previous_bundle,
206                    InboxError::UnexpectedBundle {
207                        previous_bundle,
208                        bundle: bundle.clone(),
209                    }
210                );
211                self.added_bundles.delete_front();
212                tracing::trace!("Consuming bundle {:?}", bundle);
213                true
214            }
215            None => {
216                tracing::trace!("Marking bundle as expected: {:?}", bundle);
217                self.removed_bundles.push_back(bundle.clone());
218                false
219            }
220        };
221        self.next_cursor_to_remove.set(cursor.try_add_one()?);
222        Ok(already_known)
223    }
224
225    /// Pushes a bundle to the inbox. The verifications should not fail in production unless
226    /// many validators are faulty.
227    ///
228    /// Returns `true` if the bundle was new, `false` if it was already in `removed_bundles`.
229    pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
230        // Record the latest cursor.
231        let cursor = Cursor::from(&bundle);
232        ensure!(
233            cursor >= *self.next_cursor_to_add.get(),
234            InboxError::IncorrectOrder {
235                bundle: bundle.clone(),
236                next_cursor: *self.next_cursor_to_add.get(),
237            }
238        );
239        // Find if the bundle was removed ahead of time.
240        let newly_added = match self.removed_bundles.front().await? {
241            Some(previous_bundle) => {
242                if Cursor::from(&previous_bundle) == cursor {
243                    // We already executed this bundle by anticipation. Remove it from
244                    // the queue.
245                    ensure!(
246                        bundle == previous_bundle,
247                        InboxError::UnexpectedBundle {
248                            previous_bundle,
249                            bundle,
250                        }
251                    );
252                    self.removed_bundles.delete_front();
253                } else {
254                    // The receiver has already executed a later bundle from the same
255                    // sender ahead of time so we should skip this one.
256                    ensure!(
257                        cursor < Cursor::from(&previous_bundle) && bundle.is_skippable(),
258                        InboxError::UnexpectedBundle {
259                            previous_bundle,
260                            bundle,
261                        }
262                    );
263                }
264                false
265            }
266            None => {
267                // Otherwise, schedule the messages for execution.
268                self.added_bundles.push_back(bundle);
269                true
270            }
271        };
272        self.next_cursor_to_add.set(cursor.try_add_one()?);
273        Ok(newly_added)
274    }
275}
276
277#[cfg(with_testing)]
278impl InboxStateView<MemoryContext<()>>
279where
280    MemoryContext<()>: Context + Clone + Send + Sync + 'static,
281{
282    pub async fn new() -> Self {
283        let context = create_test_memory_context();
284        Self::load(context)
285            .await
286            .expect("Loading from memory should work")
287    }
288}