fuel_core_relayer/
storage.rs1use crate::ports::{
4 DatabaseTransaction,
5 RelayerDb,
6 Transactional,
7};
8use fuel_core_storage::{
9 blueprint::plain::Plain,
10 codec::{
11 postcard::Postcard,
12 primitive::Primitive,
13 },
14 kv_store::StorageColumn,
15 structured_storage::TableWithBlueprint,
16 transactional::{
17 Modifiable,
18 StorageTransaction,
19 },
20 Error as StorageError,
21 Mappable,
22 Result as StorageResult,
23 StorageAsMut,
24 StorageMutate,
25};
26use fuel_core_types::{
27 blockchain::primitives::DaBlockHeight,
28 services::relayer::Event,
29};
30
31#[repr(u32)]
33#[derive(
34 Copy,
35 Clone,
36 Debug,
37 strum_macros::EnumCount,
38 strum_macros::IntoStaticStr,
39 PartialEq,
40 Eq,
41 enum_iterator::Sequence,
42 Hash,
43)]
44pub enum Column {
45 Metadata = 0,
47 History = 1,
49}
50
51impl Column {
52 pub const COUNT: usize = <Self as strum::EnumCount>::COUNT;
54
55 pub fn as_u32(&self) -> u32 {
57 *self as u32
58 }
59}
60
61impl StorageColumn for Column {
62 fn name(&self) -> String {
63 let str: &str = self.into();
64 str.to_string()
65 }
66
67 fn id(&self) -> u32 {
68 self.as_u32()
69 }
70}
71
72pub struct EventsHistory;
74
75impl Mappable for EventsHistory {
76 type Key = Self::OwnedKey;
78 type OwnedKey = DaBlockHeight;
79 type Value = [Event];
81 type OwnedValue = Vec<Event>;
82}
83
84impl TableWithBlueprint for EventsHistory {
85 type Blueprint = Plain<Primitive<8>, Postcard>;
86 type Column = Column;
87
88 fn column() -> Column {
89 Column::History
90 }
91}
92
93impl<T> RelayerDb for T
94where
95 T: Send + Sync,
96 T: Transactional,
97 for<'a> T::Transaction<'a>: StorageMutate<EventsHistory, Error = StorageError>,
98{
99 fn insert_events(
100 &mut self,
101 da_height: &DaBlockHeight,
102 events: &[Event],
103 ) -> StorageResult<()> {
104 let before = self.latest_da_height().unwrap_or_default();
111
112 let mut db_tx = self.transaction();
113
114 for event in events {
115 if da_height != &event.da_height() {
116 return Err(anyhow::anyhow!("Invalid da height").into());
117 }
118 }
119
120 db_tx.storage::<EventsHistory>().insert(da_height, events)?;
121 db_tx.commit()?;
122
123 let after = self
128 .latest_da_height()
129 .expect("DA height must be set at this point");
130 if after < before {
131 StorageResult::Err(
132 anyhow::anyhow!("Block height must be monotonically increasing").into(),
133 )?
134 }
135
136 Ok(())
141 }
142
143 fn get_finalized_da_height(&self) -> Option<DaBlockHeight> {
144 self.latest_da_height()
145 }
146}
147
148impl<S> DatabaseTransaction for StorageTransaction<S>
149where
150 S: Modifiable,
151{
152 fn commit(self) -> StorageResult<()> {
153 self.commit()?;
154 Ok(())
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 fuel_core_storage::basic_storage_tests!(
163 EventsHistory,
164 <EventsHistory as Mappable>::Key::default(),
165 vec![
166 Event::Message(Default::default()),
167 Event::Transaction(Default::default())
168 ]
169 );
170}