fuel_core_relayer/
storage.rs

1//! The module provides definition and implementation of the relayer storage.
2
3use crate::ports::{
4    DatabaseTransaction,
5    RelayerDb,
6    Transactional,
7};
8use fuel_core_storage::{
9    blueprint::plain::Plain,
10    codec::{
11        postcard::Postcard,
12        primitive::Primitive,
13    },
14    kv_store::StorageColumn,
15    structured_storage::TableWithBlueprint,
16    transactional::{
17        Modifiable,
18        StorageTransaction,
19    },
20    Error as StorageError,
21    Mappable,
22    Result as StorageResult,
23    StorageAsMut,
24    StorageMutate,
25};
26use fuel_core_types::{
27    blockchain::primitives::DaBlockHeight,
28    services::relayer::Event,
29};
30
31/// GraphQL database tables column ids to the corresponding [`fuel_core_storage::Mappable`] table.
32#[repr(u32)]
33#[derive(
34    Copy,
35    Clone,
36    Debug,
37    strum_macros::EnumCount,
38    strum_macros::IntoStaticStr,
39    PartialEq,
40    Eq,
41    enum_iterator::Sequence,
42    Hash,
43)]
44pub enum Column {
45    /// The column id of metadata about the relayer storage.
46    Metadata = 0,
47    /// The column of the table that stores history of the relayer.
48    History = 1,
49}
50
51impl Column {
52    /// The total count of variants in the enum.
53    pub const COUNT: usize = <Self as strum::EnumCount>::COUNT;
54
55    /// Returns the `usize` representation of the `Column`.
56    pub fn as_u32(&self) -> u32 {
57        *self as u32
58    }
59}
60
61impl StorageColumn for Column {
62    fn name(&self) -> String {
63        let str: &str = self.into();
64        str.to_string()
65    }
66
67    fn id(&self) -> u32 {
68        self.as_u32()
69    }
70}
71
72/// The table contains history of events on the DA.
73pub struct EventsHistory;
74
75impl Mappable for EventsHistory {
76    /// The key is the height of the DA.
77    type Key = Self::OwnedKey;
78    type OwnedKey = DaBlockHeight;
79    /// The value is an events happened at the height.
80    type Value = [Event];
81    type OwnedValue = Vec<Event>;
82}
83
84impl TableWithBlueprint for EventsHistory {
85    type Blueprint = Plain<Primitive<8>, Postcard>;
86    type Column = Column;
87
88    fn column() -> Column {
89        Column::History
90    }
91}
92
93impl<T> RelayerDb for T
94where
95    T: Send + Sync,
96    T: Transactional,
97    for<'a> T::Transaction<'a>: StorageMutate<EventsHistory, Error = StorageError>,
98{
99    fn insert_events(
100        &mut self,
101        da_height: &DaBlockHeight,
102        events: &[Event],
103    ) -> StorageResult<()> {
104        // A transaction is required to ensure that the height is
105        // set atomically with the insertion based on the current
106        // height. Also so that the messages are inserted atomically
107        // with the height.
108
109        // Get the current DA block height from the database.
110        let before = self.latest_da_height().unwrap_or_default();
111
112        let mut db_tx = self.transaction();
113
114        for event in events {
115            if da_height != &event.da_height() {
116                return Err(anyhow::anyhow!("Invalid da height").into());
117            }
118        }
119
120        db_tx.storage::<EventsHistory>().insert(da_height, events)?;
121        db_tx.commit()?;
122
123        // Compare the new DA block height with previous the block height. Block
124        // height must always be monotonically increasing. If the new block
125        // height is less than the previous block height, the service is in
126        // an error state and must be shut down.
127        let after = self
128            .latest_da_height()
129            .expect("DA height must be set at this point");
130        if after < before {
131            StorageResult::Err(
132                anyhow::anyhow!("Block height must be monotonically increasing").into(),
133            )?
134        }
135
136        // TODO: Think later about how to clean up the history of the relayer.
137        //  Since we don't have too much information on the relayer and it can be useful
138        //  at any time, maybe we want to consider keeping it all the time instead of creating snapshots.
139        //  https://github.com/FuelLabs/fuel-core/issues/1627
140        Ok(())
141    }
142
143    fn get_finalized_da_height(&self) -> Option<DaBlockHeight> {
144        self.latest_da_height()
145    }
146}
147
148impl<S> DatabaseTransaction for StorageTransaction<S>
149where
150    S: Modifiable,
151{
152    fn commit(self) -> StorageResult<()> {
153        self.commit()?;
154        Ok(())
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    fuel_core_storage::basic_storage_tests!(
163        EventsHistory,
164        <EventsHistory as Mappable>::Key::default(),
165        vec![
166            Event::Message(Default::default()),
167            Event::Transaction(Default::default())
168        ]
169    );
170}