1use async_graphql::SimpleObject;
5use linera_base::{
6 data_types::{ArithmeticError, BlockHeight},
7 ensure,
8 identifiers::ChainId,
9};
10#[cfg(with_testing)]
11use linera_views::context::{create_test_memory_context, MemoryContext};
12use linera_views::{
13 context::Context,
14 queue_view::QueueView,
15 register_view::RegisterView,
16 views::{ClonableView, View, ViewError},
17};
18use serde::{Deserialize, Serialize};
19use thiserror::Error;
20
21use crate::{data_types::MessageBundle, ChainError, Origin};
22
23#[cfg(test)]
24#[path = "unit_tests/inbox_tests.rs"]
25mod inbox_tests;
26
27#[derive(Debug, ClonableView, View, async_graphql::SimpleObject)]
40pub struct InboxStateView<C>
41where
42 C: Clone + Context + Send + Sync,
43{
44 pub next_cursor_to_add: RegisterView<C, Cursor>,
46 pub next_cursor_to_remove: RegisterView<C, Cursor>,
48 pub added_bundles: QueueView<C, MessageBundle>,
50 pub removed_bundles: QueueView<C, MessageBundle>,
53}
54
55#[derive(
56 Debug,
57 Default,
58 Clone,
59 Copy,
60 Hash,
61 Eq,
62 PartialEq,
63 Ord,
64 PartialOrd,
65 Serialize,
66 Deserialize,
67 SimpleObject,
68)]
69pub struct Cursor {
70 height: BlockHeight,
71 index: u32,
72}
73
74#[derive(Error, Debug)]
75pub(crate) enum InboxError {
76 #[error(transparent)]
77 ViewError(#[from] ViewError),
78 #[error(transparent)]
79 ArithmeticError(#[from] ArithmeticError),
80 #[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
81 UnexpectedBundle {
82 bundle: MessageBundle,
83 previous_bundle: MessageBundle,
84 },
85 #[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
86 IncorrectOrder {
87 bundle: MessageBundle,
88 next_cursor: Cursor,
89 },
90 #[error(
91 "{bundle:?} cannot be skipped: it must be received before the next \
92 messages from the same origin"
93 )]
94 UnskippableBundle { bundle: MessageBundle },
95}
96
97impl From<&MessageBundle> for Cursor {
98 #[inline]
99 fn from(bundle: &MessageBundle) -> Self {
100 Self {
101 height: bundle.height,
102 index: bundle.transaction_index,
103 }
104 }
105}
106
107impl Cursor {
108 fn try_add_one(self) -> Result<Self, ArithmeticError> {
109 let value = Self {
110 height: self.height,
111 index: self.index.checked_add(1).ok_or(ArithmeticError::Overflow)?,
112 };
113 Ok(value)
114 }
115}
116
117impl From<(ChainId, Origin, InboxError)> for ChainError {
118 fn from(value: (ChainId, Origin, InboxError)) -> Self {
119 let (chain_id, origin, error) = value;
120 match error {
121 InboxError::ViewError(e) => ChainError::ViewError(e),
122 InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
123 InboxError::UnexpectedBundle {
124 bundle,
125 previous_bundle,
126 } => ChainError::UnexpectedMessage {
127 chain_id,
128 origin: origin.into(),
129 bundle: Box::new(bundle),
130 previous_bundle: Box::new(previous_bundle),
131 },
132 InboxError::IncorrectOrder {
133 bundle,
134 next_cursor,
135 } => ChainError::IncorrectMessageOrder {
136 chain_id,
137 origin: origin.into(),
138 bundle: Box::new(bundle),
139 next_height: next_cursor.height,
140 next_index: next_cursor.index,
141 },
142 InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
143 chain_id,
144 origin: origin.into(),
145 bundle: Box::new(bundle),
146 },
147 }
148 }
149}
150
151impl<C> InboxStateView<C>
152where
153 C: Context + Clone + Send + Sync + 'static,
154{
155 pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
158 let cursor = self.next_cursor_to_add.get();
159 if cursor.index == 0 {
160 Ok(cursor.height)
161 } else {
162 Ok(cursor.height.try_add_one()?)
163 }
164 }
165
166 pub(crate) async fn remove_bundle(
170 &mut self,
171 bundle: &MessageBundle,
172 ) -> Result<bool, InboxError> {
173 let cursor = Cursor::from(bundle);
175 ensure!(
176 cursor >= *self.next_cursor_to_remove.get(),
177 InboxError::IncorrectOrder {
178 bundle: bundle.clone(),
179 next_cursor: *self.next_cursor_to_remove.get(),
180 }
181 );
182 while let Some(previous_bundle) = self.added_bundles.front().await? {
184 if Cursor::from(&previous_bundle) >= cursor {
185 break;
186 }
187 ensure!(
188 previous_bundle.is_skippable(),
189 InboxError::UnskippableBundle {
190 bundle: previous_bundle
191 }
192 );
193 self.added_bundles.delete_front();
194 tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
195 }
196 let already_known = match self.added_bundles.front().await? {
198 Some(previous_bundle) => {
199 ensure!(
205 bundle == &previous_bundle,
206 InboxError::UnexpectedBundle {
207 previous_bundle,
208 bundle: bundle.clone(),
209 }
210 );
211 self.added_bundles.delete_front();
212 tracing::trace!("Consuming bundle {:?}", bundle);
213 true
214 }
215 None => {
216 tracing::trace!("Marking bundle as expected: {:?}", bundle);
217 self.removed_bundles.push_back(bundle.clone());
218 false
219 }
220 };
221 self.next_cursor_to_remove.set(cursor.try_add_one()?);
222 Ok(already_known)
223 }
224
225 pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
230 let cursor = Cursor::from(&bundle);
232 ensure!(
233 cursor >= *self.next_cursor_to_add.get(),
234 InboxError::IncorrectOrder {
235 bundle: bundle.clone(),
236 next_cursor: *self.next_cursor_to_add.get(),
237 }
238 );
239 let newly_added = match self.removed_bundles.front().await? {
241 Some(previous_bundle) => {
242 if Cursor::from(&previous_bundle) == cursor {
243 ensure!(
246 bundle == previous_bundle,
247 InboxError::UnexpectedBundle {
248 previous_bundle,
249 bundle,
250 }
251 );
252 self.removed_bundles.delete_front();
253 } else {
254 ensure!(
257 cursor < Cursor::from(&previous_bundle) && bundle.is_skippable(),
258 InboxError::UnexpectedBundle {
259 previous_bundle,
260 bundle,
261 }
262 );
263 }
264 false
265 }
266 None => {
267 self.added_bundles.push_back(bundle);
269 true
270 }
271 };
272 self.next_cursor_to_add.set(cursor.try_add_one()?);
273 Ok(newly_added)
274 }
275}
276
277#[cfg(with_testing)]
278impl InboxStateView<MemoryContext<()>>
279where
280 MemoryContext<()>: Context + Clone + Send + Sync + 'static,
281{
282 pub async fn new() -> Self {
283 let context = create_test_memory_context();
284 Self::load(context)
285 .await
286 .expect("Loading from memory should work")
287 }
288}