op_alloy_protocol/
channel.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
//! Channel Types

use alloc::vec::Vec;
use alloy_primitives::{map::HashMap, Bytes};

use crate::{block::BlockInfo, frame::Frame};

/// [CHANNEL_ID_LENGTH] is the length of the channel ID.
pub const CHANNEL_ID_LENGTH: usize = 16;

/// [ChannelId] is an opaque identifier for a channel.
pub type ChannelId = [u8; CHANNEL_ID_LENGTH];

/// [MAX_RLP_BYTES_PER_CHANNEL] is the maximum amount of bytes that will be read from
/// a channel. This limit is set when decoding the RLP.
pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000;

/// [FJORD_MAX_RLP_BYTES_PER_CHANNEL] is the maximum amount of bytes that will be read from
/// a channel when the Fjord Hardfork is activated. This limit is set when decoding the RLP.
pub const FJORD_MAX_RLP_BYTES_PER_CHANNEL: u64 = 100_000_000;

/// An error returned when adding a frame to a channel.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ChannelError {
    /// The frame id does not match the channel id.
    FrameIdMismatch,
    /// The channel is closed.
    ChannelClosed,
    /// The frame number is already in the channel.
    FrameNumberExists(usize),
    /// The frame number is beyond the end frame.
    FrameBeyondEndFrame(usize),
}

impl core::fmt::Display for ChannelError {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            Self::FrameIdMismatch => write!(f, "Frame id does not match channel id"),
            Self::ChannelClosed => write!(f, "Channel is closed"),
            Self::FrameNumberExists(n) => write!(f, "Frame number {} already exists", n),
            Self::FrameBeyondEndFrame(n) => {
                write!(f, "Frame number {} is beyond end frame", n)
            }
        }
    }
}

/// A Channel is a set of batches that are split into at least one, but possibly multiple frames.
///
/// Frames are allowed to be ingested out of order.
/// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the
/// channel may mark itself as ready for reading once all intervening frames have been added
#[derive(Debug, Clone, Default)]
pub struct Channel {
    /// The unique identifier for this channel
    id: ChannelId,
    /// The block that the channel is currently open at
    open_block: BlockInfo,
    /// Estimated memory size, used to drop the channel if we have too much data
    estimated_size: usize,
    /// True if the last frame has been buffered
    closed: bool,
    /// The highest frame number that has been ingested
    highest_frame_number: u16,
    /// The frame number of the frame where `is_last` is true
    /// No other frame number may be higher than this
    last_frame_number: u16,
    /// Store a map of frame number to frame for constant time ordering
    inputs: HashMap<u16, Frame>,
    /// The highest L1 inclusion block that a frame was included in
    highest_l1_inclusion_block: BlockInfo,
}

impl Channel {
    /// Create a new [Channel] with the given [ChannelId] and [BlockInfo].
    pub fn new(id: ChannelId, open_block: BlockInfo) -> Self {
        Self { id, open_block, inputs: HashMap::default(), ..Default::default() }
    }

    /// Returns the current [ChannelId] for the channel.
    pub const fn id(&self) -> ChannelId {
        self.id
    }

    /// Returns the number of frames ingested.
    pub fn len(&self) -> usize {
        self.inputs.len()
    }

    /// Returns if the channel is empty.
    pub fn is_empty(&self) -> bool {
        self.inputs.is_empty()
    }

    /// Add a frame to the channel.
    ///
    /// ## Takes
    /// - `frame`: The frame to add to the channel
    /// - `l1_inclusion_block`: The block that the frame was included in
    ///
    /// ## Returns
    /// - `Ok(()):` If the frame was successfully buffered
    /// - `Err(_):` If the frame was invalid
    pub fn add_frame(
        &mut self,
        frame: Frame,
        l1_inclusion_block: BlockInfo,
    ) -> Result<(), ChannelError> {
        // Ensure that the frame ID is equal to the channel ID.
        if frame.id != self.id {
            return Err(ChannelError::FrameIdMismatch);
        }
        if frame.is_last && self.closed {
            return Err(ChannelError::ChannelClosed);
        }
        if self.inputs.contains_key(&frame.number) {
            return Err(ChannelError::FrameNumberExists(frame.number as usize));
        }
        if self.closed && frame.number >= self.last_frame_number {
            return Err(ChannelError::FrameBeyondEndFrame(frame.number as usize));
        }

        // Guaranteed to succeed at this point. Update the channel state.
        if frame.is_last {
            self.last_frame_number = frame.number;
            self.closed = true;

            // Prune frames with a higher number than the last frame number when we receive a
            // closing frame.
            if self.last_frame_number < self.highest_frame_number {
                self.inputs.retain(|id, frame| {
                    self.estimated_size -= frame.size();
                    *id < self.last_frame_number
                });
                self.highest_frame_number = self.last_frame_number;
            }
        }

        // Update the highest frame number.
        if frame.number > self.highest_frame_number {
            self.highest_frame_number = frame.number;
        }

        if self.highest_l1_inclusion_block.number < l1_inclusion_block.number {
            self.highest_l1_inclusion_block = l1_inclusion_block;
        }

        self.estimated_size += frame.size();
        self.inputs.insert(frame.number, frame);
        Ok(())
    }

    /// Returns the block number of the L1 block that contained the first [Frame] in this channel.
    pub const fn open_block_number(&self) -> u64 {
        self.open_block.number
    }

    /// Returns the estimated size of the channel including [Frame] overhead.
    pub const fn size(&self) -> usize {
        self.estimated_size
    }

    /// Returns `true` if the channel is ready to be read.
    pub fn is_ready(&self) -> bool {
        // Must have buffered the last frame before the channel is ready.
        if !self.closed {
            return false;
        }

        // Must have the possibility of contiguous frames.
        if self.inputs.len() != (self.last_frame_number + 1) as usize {
            return false;
        }

        // Check for contiguous frames.
        for i in 0..=self.last_frame_number {
            if !self.inputs.contains_key(&i) {
                return false;
            }
        }

        true
    }

    /// Returns all of the channel's [Frame]s concatenated together.
    ///
    /// ## Returns
    ///
    /// - `Some(Bytes)`: The concatenated frame data
    /// - `None`: If the channel is missing frames
    pub fn frame_data(&self) -> Option<Bytes> {
        let mut data = Vec::with_capacity(self.size());
        (0..=self.last_frame_number).try_for_each(|i| {
            let frame = self.inputs.get(&i)?;
            data.extend_from_slice(&frame.data);
            Some(())
        })?;
        Some(data.into())
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use alloc::{
        string::{String, ToString},
        vec,
    };

    struct FrameValidityTestCase {
        #[allow(dead_code)]
        name: String,
        frames: Vec<Frame>,
        should_error: Vec<bool>,
        sizes: Vec<u64>,
    }

    fn run_frame_validity_test(test_case: FrameValidityTestCase) {
        #[cfg(feature = "std")]
        println!("Running test: {}", test_case.name);

        let id = [0xFF; 16];
        let block = BlockInfo::default();
        let mut channel = Channel::new(id, block);

        if test_case.frames.len() != test_case.should_error.len()
            || test_case.frames.len() != test_case.sizes.len()
        {
            panic!("Test case length mismatch");
        }

        for (i, frame) in test_case.frames.iter().enumerate() {
            let result = channel.add_frame(frame.clone(), block);
            if test_case.should_error[i] {
                assert!(result.is_err());
            } else {
                assert!(result.is_ok());
            }
            assert_eq!(channel.size(), test_case.sizes[i] as usize);
        }
    }

    #[test]
    fn test_frame_validity() {
        let id = [0xFF; 16];
        let test_cases = [
            FrameValidityTestCase {
                name: "wrong channel".to_string(),
                frames: vec![Frame { id: [0xEE; 16], ..Default::default() }],
                should_error: vec![true],
                sizes: vec![0],
            },
            FrameValidityTestCase {
                name: "double close".to_string(),
                frames: vec![
                    Frame { id, is_last: true, number: 2, data: b"four".to_vec() },
                    Frame { id, is_last: true, number: 1, ..Default::default() },
                ],
                should_error: vec![false, true],
                sizes: vec![204, 204],
            },
            FrameValidityTestCase {
                name: "duplicate frame".to_string(),
                frames: vec![
                    Frame { id, number: 2, data: b"four".to_vec(), ..Default::default() },
                    Frame { id, number: 2, data: b"seven".to_vec(), ..Default::default() },
                ],
                should_error: vec![false, true],
                sizes: vec![204, 204],
            },
            FrameValidityTestCase {
                name: "duplicate closing frames".to_string(),
                frames: vec![
                    Frame { id, number: 2, is_last: true, data: b"four".to_vec() },
                    Frame { id, number: 2, is_last: true, data: b"seven".to_vec() },
                ],
                should_error: vec![false, true],
                sizes: vec![204, 204],
            },
            FrameValidityTestCase {
                name: "frame past closing".to_string(),
                frames: vec![
                    Frame { id, number: 2, is_last: true, data: b"four".to_vec() },
                    Frame { id, number: 10, data: b"seven".to_vec(), ..Default::default() },
                ],
                should_error: vec![false, true],
                sizes: vec![204, 204],
            },
            FrameValidityTestCase {
                name: "prune after close frame".to_string(),
                frames: vec![
                    Frame { id, number: 10, is_last: false, data: b"seven".to_vec() },
                    Frame { id, number: 2, is_last: true, data: b"four".to_vec() },
                ],
                should_error: vec![false, false],
                sizes: vec![205, 204],
            },
            FrameValidityTestCase {
                name: "multiple valid frames".to_string(),
                frames: vec![
                    Frame { id, number: 10, data: b"seven__".to_vec(), ..Default::default() },
                    Frame { id, number: 2, data: b"four".to_vec(), ..Default::default() },
                ],
                should_error: vec![false, false],
                sizes: vec![207, 411],
            },
        ];

        test_cases.into_iter().for_each(run_frame_validity_test);
    }
}