1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
//! The module provides definition and implementation of the relayer storage.

use crate::ports::{
    DatabaseTransaction,
    RelayerDb,
    Transactional,
};
use fuel_core_storage::{
    blueprint::plain::Plain,
    codec::{
        postcard::Postcard,
        primitive::Primitive,
    },
    kv_store::StorageColumn,
    structured_storage::TableWithBlueprint,
    transactional::{
        Modifiable,
        StorageTransaction,
    },
    Error as StorageError,
    Mappable,
    Result as StorageResult,
    StorageAsMut,
    StorageMutate,
};
use fuel_core_types::{
    blockchain::primitives::DaBlockHeight,
    services::relayer::Event,
};

/// GraphQL database tables column ids to the corresponding [`fuel_core_storage::Mappable`] table.
#[repr(u32)]
#[derive(
    Copy,
    Clone,
    Debug,
    strum_macros::EnumCount,
    strum_macros::IntoStaticStr,
    PartialEq,
    Eq,
    enum_iterator::Sequence,
    Hash,
)]
pub enum Column {
    /// The column id of metadata about the relayer storage.
    Metadata = 0,
    /// The column of the table that stores history of the relayer.
    History = 1,
}

impl Column {
    /// The total count of variants in the enum.
    pub const COUNT: usize = <Self as strum::EnumCount>::COUNT;

    /// Returns the `usize` representation of the `Column`.
    pub fn as_u32(&self) -> u32 {
        *self as u32
    }
}

impl StorageColumn for Column {
    fn name(&self) -> &'static str {
        self.into()
    }

    fn id(&self) -> u32 {
        self.as_u32()
    }
}

/// The table contains history of events on the DA.
pub struct EventsHistory;

impl Mappable for EventsHistory {
    /// The key is the height of the DA.
    type Key = Self::OwnedKey;
    type OwnedKey = DaBlockHeight;
    /// The value is an events happened at the height.
    type Value = [Event];
    type OwnedValue = Vec<Event>;
}

impl TableWithBlueprint for EventsHistory {
    type Blueprint = Plain<Primitive<8>, Postcard>;
    type Column = Column;

    fn column() -> Column {
        Column::History
    }
}

impl<T> RelayerDb for T
where
    T: Send + Sync,
    T: Transactional,
    for<'a> T::Transaction<'a>: StorageMutate<EventsHistory, Error = StorageError>,
{
    fn insert_events(
        &mut self,
        da_height: &DaBlockHeight,
        events: &[Event],
    ) -> StorageResult<()> {
        // A transaction is required to ensure that the height is
        // set atomically with the insertion based on the current
        // height. Also so that the messages are inserted atomically
        // with the height.

        // Get the current DA block height from the database.
        let before = self.latest_da_height().unwrap_or_default();

        let mut db_tx = self.transaction();

        for event in events {
            if da_height != &event.da_height() {
                return Err(anyhow::anyhow!("Invalid da height").into());
            }
        }

        db_tx.storage::<EventsHistory>().insert(da_height, events)?;
        db_tx.commit()?;

        // Compare the new DA block height with previous the block height. Block
        // height must always be monotonically increasing. If the new block
        // height is less than the previous block height, the service is in
        // an error state and must be shut down.
        let after = self
            .latest_da_height()
            .expect("DA height must be set at this point");
        if after < before {
            StorageResult::Err(
                anyhow::anyhow!("Block height must be monotonically increasing").into(),
            )?
        }

        // TODO: Think later about how to clean up the history of the relayer.
        //  Since we don't have too much information on the relayer and it can be useful
        //  at any time, maybe we want to consider keeping it all the time instead of creating snapshots.
        //  https://github.com/FuelLabs/fuel-core/issues/1627
        Ok(())
    }

    fn get_finalized_da_height(&self) -> Option<DaBlockHeight> {
        self.latest_da_height()
    }
}

impl<S> DatabaseTransaction for StorageTransaction<S>
where
    S: Modifiable,
{
    fn commit(self) -> StorageResult<()> {
        self.commit()?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fuel_core_storage::basic_storage_tests!(
        EventsHistory,
        <EventsHistory as Mappable>::Key::default(),
        vec![
            Event::Message(Default::default()),
            Event::Transaction(Default::default())
        ]
    );
}