op_alloy_protocol/channel.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
//! Channel Types
use alloc::vec::Vec;
use alloy_primitives::{map::HashMap, Bytes};
use crate::{block::BlockInfo, frame::Frame};
/// [CHANNEL_ID_LENGTH] is the length of the channel ID.
pub const CHANNEL_ID_LENGTH: usize = 16;
/// [ChannelId] is an opaque identifier for a channel.
pub type ChannelId = [u8; CHANNEL_ID_LENGTH];
/// [MAX_RLP_BYTES_PER_CHANNEL] is the maximum amount of bytes that will be read from
/// a channel. This limit is set when decoding the RLP.
pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000;
/// [FJORD_MAX_RLP_BYTES_PER_CHANNEL] is the maximum amount of bytes that will be read from
/// a channel when the Fjord Hardfork is activated. This limit is set when decoding the RLP.
pub const FJORD_MAX_RLP_BYTES_PER_CHANNEL: u64 = 100_000_000;
/// An error returned when adding a frame to a channel.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ChannelError {
/// The frame id does not match the channel id.
FrameIdMismatch,
/// The channel is closed.
ChannelClosed,
/// The frame number is already in the channel.
FrameNumberExists(usize),
/// The frame number is beyond the end frame.
FrameBeyondEndFrame(usize),
}
impl core::fmt::Display for ChannelError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::FrameIdMismatch => write!(f, "Frame id does not match channel id"),
Self::ChannelClosed => write!(f, "Channel is closed"),
Self::FrameNumberExists(n) => write!(f, "Frame number {} already exists", n),
Self::FrameBeyondEndFrame(n) => {
write!(f, "Frame number {} is beyond end frame", n)
}
}
}
}
/// A Channel is a set of batches that are split into at least one, but possibly multiple frames.
///
/// Frames are allowed to be ingested out of order.
/// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the
/// channel may mark itself as ready for reading once all intervening frames have been added
#[derive(Debug, Clone, Default)]
pub struct Channel {
/// The unique identifier for this channel
id: ChannelId,
/// The block that the channel is currently open at
open_block: BlockInfo,
/// Estimated memory size, used to drop the channel if we have too much data
estimated_size: usize,
/// True if the last frame has been buffered
closed: bool,
/// The highest frame number that has been ingested
highest_frame_number: u16,
/// The frame number of the frame where `is_last` is true
/// No other frame number may be higher than this
last_frame_number: u16,
/// Store a map of frame number to frame for constant time ordering
inputs: HashMap<u16, Frame>,
/// The highest L1 inclusion block that a frame was included in
highest_l1_inclusion_block: BlockInfo,
}
impl Channel {
/// Create a new [Channel] with the given [ChannelId] and [BlockInfo].
pub fn new(id: ChannelId, open_block: BlockInfo) -> Self {
Self { id, open_block, inputs: HashMap::default(), ..Default::default() }
}
/// Returns the current [ChannelId] for the channel.
pub const fn id(&self) -> ChannelId {
self.id
}
/// Returns the number of frames ingested.
pub fn len(&self) -> usize {
self.inputs.len()
}
/// Returns if the channel is empty.
pub fn is_empty(&self) -> bool {
self.inputs.is_empty()
}
/// Add a frame to the channel.
///
/// ## Takes
/// - `frame`: The frame to add to the channel
/// - `l1_inclusion_block`: The block that the frame was included in
///
/// ## Returns
/// - `Ok(()):` If the frame was successfully buffered
/// - `Err(_):` If the frame was invalid
pub fn add_frame(
&mut self,
frame: Frame,
l1_inclusion_block: BlockInfo,
) -> Result<(), ChannelError> {
// Ensure that the frame ID is equal to the channel ID.
if frame.id != self.id {
return Err(ChannelError::FrameIdMismatch);
}
if frame.is_last && self.closed {
return Err(ChannelError::ChannelClosed);
}
if self.inputs.contains_key(&frame.number) {
return Err(ChannelError::FrameNumberExists(frame.number as usize));
}
if self.closed && frame.number >= self.last_frame_number {
return Err(ChannelError::FrameBeyondEndFrame(frame.number as usize));
}
// Guaranteed to succeed at this point. Update the channel state.
if frame.is_last {
self.last_frame_number = frame.number;
self.closed = true;
// Prune frames with a higher number than the last frame number when we receive a
// closing frame.
if self.last_frame_number < self.highest_frame_number {
self.inputs.retain(|id, frame| {
self.estimated_size -= frame.size();
*id < self.last_frame_number
});
self.highest_frame_number = self.last_frame_number;
}
}
// Update the highest frame number.
if frame.number > self.highest_frame_number {
self.highest_frame_number = frame.number;
}
if self.highest_l1_inclusion_block.number < l1_inclusion_block.number {
self.highest_l1_inclusion_block = l1_inclusion_block;
}
self.estimated_size += frame.size();
self.inputs.insert(frame.number, frame);
Ok(())
}
/// Returns the block number of the L1 block that contained the first [Frame] in this channel.
pub const fn open_block_number(&self) -> u64 {
self.open_block.number
}
/// Returns the estimated size of the channel including [Frame] overhead.
pub const fn size(&self) -> usize {
self.estimated_size
}
/// Returns `true` if the channel is ready to be read.
pub fn is_ready(&self) -> bool {
// Must have buffered the last frame before the channel is ready.
if !self.closed {
return false;
}
// Must have the possibility of contiguous frames.
if self.inputs.len() != (self.last_frame_number + 1) as usize {
return false;
}
// Check for contiguous frames.
for i in 0..=self.last_frame_number {
if !self.inputs.contains_key(&i) {
return false;
}
}
true
}
/// Returns all of the channel's [Frame]s concatenated together.
///
/// ## Returns
///
/// - `Some(Bytes)`: The concatenated frame data
/// - `None`: If the channel is missing frames
pub fn frame_data(&self) -> Option<Bytes> {
let mut data = Vec::with_capacity(self.size());
(0..=self.last_frame_number).try_for_each(|i| {
let frame = self.inputs.get(&i)?;
data.extend_from_slice(&frame.data);
Some(())
})?;
Some(data.into())
}
}
#[cfg(test)]
mod test {
use super::*;
use alloc::{
string::{String, ToString},
vec,
};
struct FrameValidityTestCase {
#[allow(dead_code)]
name: String,
frames: Vec<Frame>,
should_error: Vec<bool>,
sizes: Vec<u64>,
}
fn run_frame_validity_test(test_case: FrameValidityTestCase) {
#[cfg(feature = "std")]
println!("Running test: {}", test_case.name);
let id = [0xFF; 16];
let block = BlockInfo::default();
let mut channel = Channel::new(id, block);
if test_case.frames.len() != test_case.should_error.len()
|| test_case.frames.len() != test_case.sizes.len()
{
panic!("Test case length mismatch");
}
for (i, frame) in test_case.frames.iter().enumerate() {
let result = channel.add_frame(frame.clone(), block);
if test_case.should_error[i] {
assert!(result.is_err());
} else {
assert!(result.is_ok());
}
assert_eq!(channel.size(), test_case.sizes[i] as usize);
}
}
#[test]
fn test_frame_validity() {
let id = [0xFF; 16];
let test_cases = [
FrameValidityTestCase {
name: "wrong channel".to_string(),
frames: vec![Frame { id: [0xEE; 16], ..Default::default() }],
should_error: vec![true],
sizes: vec![0],
},
FrameValidityTestCase {
name: "double close".to_string(),
frames: vec![
Frame { id, is_last: true, number: 2, data: b"four".to_vec() },
Frame { id, is_last: true, number: 1, ..Default::default() },
],
should_error: vec![false, true],
sizes: vec![204, 204],
},
FrameValidityTestCase {
name: "duplicate frame".to_string(),
frames: vec![
Frame { id, number: 2, data: b"four".to_vec(), ..Default::default() },
Frame { id, number: 2, data: b"seven".to_vec(), ..Default::default() },
],
should_error: vec![false, true],
sizes: vec![204, 204],
},
FrameValidityTestCase {
name: "duplicate closing frames".to_string(),
frames: vec![
Frame { id, number: 2, is_last: true, data: b"four".to_vec() },
Frame { id, number: 2, is_last: true, data: b"seven".to_vec() },
],
should_error: vec![false, true],
sizes: vec![204, 204],
},
FrameValidityTestCase {
name: "frame past closing".to_string(),
frames: vec![
Frame { id, number: 2, is_last: true, data: b"four".to_vec() },
Frame { id, number: 10, data: b"seven".to_vec(), ..Default::default() },
],
should_error: vec![false, true],
sizes: vec![204, 204],
},
FrameValidityTestCase {
name: "prune after close frame".to_string(),
frames: vec![
Frame { id, number: 10, is_last: false, data: b"seven".to_vec() },
Frame { id, number: 2, is_last: true, data: b"four".to_vec() },
],
should_error: vec![false, false],
sizes: vec![205, 204],
},
FrameValidityTestCase {
name: "multiple valid frames".to_string(),
frames: vec![
Frame { id, number: 10, data: b"seven__".to_vec(), ..Default::default() },
Frame { id, number: 2, data: b"four".to_vec(), ..Default::default() },
],
should_error: vec![false, false],
sizes: vec![207, 411],
},
];
test_cases.into_iter().for_each(run_frame_validity_test);
}
}