use crate::{
errors::PipelineError,
stages::NextFrameProvider,
traits::{OriginAdvancer, OriginProvider, SignalReceiver},
types::{PipelineResult, Signal},
};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use alloy_primitives::Bytes;
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::{BlockInfo, Frame};
use tracing::{debug, error, trace};
#[async_trait]
pub trait FrameQueueProvider {
type Item: Into<Bytes>;
async fn next_data(&mut self) -> PipelineResult<Self::Item>;
}
#[derive(Debug)]
pub struct FrameQueue<P>
where
P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
{
pub prev: P,
queue: VecDeque<Frame>,
rollup_config: Arc<RollupConfig>,
}
impl<P> FrameQueue<P>
where
P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
{
pub const fn new(prev: P, cfg: Arc<RollupConfig>) -> Self {
Self { prev, queue: VecDeque::new(), rollup_config: cfg }
}
pub fn is_holocene_active(&self, origin: BlockInfo) -> bool {
self.rollup_config.is_holocene_active(origin.timestamp)
}
pub fn prune(&mut self, origin: BlockInfo) {
if !self.is_holocene_active(origin) {
return;
}
let mut i = 0;
while i < self.queue.len() - 1 {
let prev_frame = &self.queue[i];
let next_frame = &self.queue[i + 1];
let extends_channel = prev_frame.id == next_frame.id;
if extends_channel && prev_frame.number + 1 != next_frame.number {
self.queue.remove(i + 1);
continue;
}
if extends_channel && prev_frame.is_last {
self.queue.remove(i + 1);
continue;
}
if !extends_channel && next_frame.number != 0 {
self.queue.remove(i + 1);
continue;
}
if !extends_channel && !prev_frame.is_last && next_frame.number == 0 {
let first_frame =
self.queue.iter().position(|f| f.id == prev_frame.id).expect("infallible");
let drained = self.queue.drain(first_frame..=i);
i = i.saturating_sub(drained.len());
continue;
}
i += 1;
}
}
pub async fn load_frames(&mut self) -> PipelineResult<()> {
if !self.queue.is_empty() {
return Ok(());
}
let data = match self.prev.next_data().await {
Ok(data) => data,
Err(e) => {
debug!(target: "frame-queue", "Failed to retrieve data: {:?}", e);
return Err(e);
}
};
let Ok(frames) = Frame::parse_frames(&data.into()) else {
error!(target: "frame-queue", "Failed to parse frames from data.");
return Ok(());
};
self.queue.extend(frames);
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
self.prune(origin);
Ok(())
}
}
#[async_trait]
impl<P> OriginAdvancer for FrameQueue<P>
where
P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
{
async fn advance_origin(&mut self) -> PipelineResult<()> {
self.prev.advance_origin().await
}
}
#[async_trait]
impl<P> NextFrameProvider for FrameQueue<P>
where
P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
{
async fn next_frame(&mut self) -> PipelineResult<Frame> {
self.load_frames().await?;
if self.queue.is_empty() {
trace!(target: "frame-queue", "Queue is empty after fetching data. Retrying next_frame.");
return Err(PipelineError::NotEnoughData.temp());
}
Ok(self.queue.pop_front().expect("Frame queue impossibly empty"))
}
}
impl<P> OriginProvider for FrameQueue<P>
where
P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
{
fn origin(&self) -> Option<BlockInfo> {
self.prev.origin()
}
}
#[async_trait]
impl<P> SignalReceiver for FrameQueue<P>
where
P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
{
async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
self.prev.signal(signal).await?;
self.queue = VecDeque::default();
Ok(())
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::{test_utils::TestFrameQueueProvider, types::ResetSignal};
use alloc::vec;
#[tokio::test]
async fn test_frame_queue_reset() {
let mock = TestFrameQueueProvider::new(vec![]);
let mut frame_queue = FrameQueue::new(mock, Default::default());
assert!(!frame_queue.prev.reset);
frame_queue.signal(ResetSignal::default().signal()).await.unwrap();
assert_eq!(frame_queue.queue.len(), 0);
assert!(frame_queue.prev.reset);
}
#[tokio::test]
async fn test_frame_queue_empty_bytes() {
let data = vec![Ok(Bytes::from(vec![0x00]))];
let mut mock = TestFrameQueueProvider::new(data);
mock.set_origin(BlockInfo::default());
let mut frame_queue = FrameQueue::new(mock, Default::default());
assert!(!frame_queue.is_holocene_active(BlockInfo::default()));
let err = frame_queue.next_frame().await.unwrap_err();
assert_eq!(err, PipelineError::NotEnoughData.temp());
}
#[tokio::test]
async fn test_frame_queue_no_frames_decoded() {
let data = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())];
let mut mock = TestFrameQueueProvider::new(data);
mock.set_origin(BlockInfo::default());
let mut frame_queue = FrameQueue::new(mock, Default::default());
assert!(!frame_queue.is_holocene_active(BlockInfo::default()));
let err = frame_queue.next_frame().await.unwrap_err();
assert_eq!(err, PipelineError::NotEnoughData.temp());
}
#[tokio::test]
async fn test_frame_queue_wrong_derivation_version() {
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_origin(BlockInfo::default())
.with_raw_frames(Bytes::from(vec![0x01]))
.with_expected_err(PipelineError::NotEnoughData.temp())
.build();
assert.holocene_active(false);
assert.next_frames().await;
}
#[tokio::test]
async fn test_frame_queue_frame_too_short() {
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_origin(BlockInfo::default())
.with_raw_frames(Bytes::from(vec![0x00, 0x01]))
.with_expected_err(PipelineError::NotEnoughData.temp())
.build();
assert.holocene_active(false);
assert.next_frames().await;
}
#[tokio::test]
async fn test_frame_queue_single_frame() {
let frames = [crate::frame!(0xFF, 0, vec![0xDD; 50], true)];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_expected_frames(&frames)
.with_origin(BlockInfo::default())
.with_frames(&frames)
.build();
assert.holocene_active(false);
assert.next_frames().await;
}
#[tokio::test]
async fn test_frame_queue_multiple_frames() {
let frames = [
crate::frame!(0xFF, 0, vec![0xDD; 50], false),
crate::frame!(0xFF, 1, vec![0xDD; 50], false),
crate::frame!(0xFF, 2, vec![0xDD; 50], true),
];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_expected_frames(&frames)
.with_origin(BlockInfo::default())
.with_frames(&frames)
.build();
assert.holocene_active(false);
assert.next_frames().await;
}
#[tokio::test]
async fn test_frame_queue_missing_origin() {
let frames = [crate::frame!(0xFF, 0, vec![0xDD; 50], true)];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_expected_frames(&frames)
.with_frames(&frames)
.build();
assert.holocene_active(false);
assert.missing_origin().await;
}
#[tokio::test]
async fn test_holocene_valid_frames() {
let frames = [
crate::frame!(0xFF, 0, vec![0xDD; 50], false),
crate::frame!(0xFF, 1, vec![0xDD; 50], false),
crate::frame!(0xFF, 2, vec![0xDD; 50], true),
];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_rollup_config(&RollupConfig { holocene_time: Some(0), ..Default::default() })
.with_origin(BlockInfo::default())
.with_expected_frames(&frames)
.with_frames(&frames)
.build();
assert.holocene_active(true);
assert.next_frames().await;
}
#[tokio::test]
async fn test_holocene_single_frame() {
let frames = [crate::frame!(0xFF, 1, vec![0xDD; 50], true)];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_rollup_config(&RollupConfig { holocene_time: Some(0), ..Default::default() })
.with_origin(BlockInfo::default())
.with_expected_frames(&frames)
.with_frames(&frames)
.build();
assert.holocene_active(true);
assert.next_frames().await;
}
#[tokio::test]
async fn test_holocene_unordered_frames() {
let frames = [
crate::frame!(0xEE, 0, vec![0xDD; 50], false),
crate::frame!(0xEE, 1, vec![0xDD; 50], false),
crate::frame!(0xEE, 2, vec![0xDD; 50], true),
crate::frame!(0xEE, 3, vec![0xDD; 50], false), crate::frame!(0xFF, 0, vec![0xDD; 50], false),
crate::frame!(0xFF, 1, vec![0xDD; 50], true),
];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_rollup_config(&RollupConfig { holocene_time: Some(0), ..Default::default() })
.with_origin(BlockInfo::default())
.with_expected_frames(&[&frames[0..3], &frames[4..]].concat())
.with_frames(&frames)
.build();
assert.holocene_active(true);
assert.next_frames().await;
}
#[tokio::test]
async fn test_holocene_non_sequential_frames() {
let frames = [
crate::frame!(0xEE, 0, vec![0xDD; 50], false),
crate::frame!(0xEE, 1, vec![0xDD; 50], false),
crate::frame!(0xEE, 3, vec![0xDD; 50], true), crate::frame!(0xEE, 4, vec![0xDD; 50], false), ];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_rollup_config(&RollupConfig { holocene_time: Some(0), ..Default::default() })
.with_origin(BlockInfo::default())
.with_expected_frames(&frames[0..2])
.with_frames(&frames)
.build();
assert.holocene_active(true);
assert.next_frames().await;
}
#[tokio::test]
async fn test_holocene_unclosed_channel() {
let frames = [
crate::frame!(0xEE, 0, vec![0xDD; 50], false),
crate::frame!(0xEE, 1, vec![0xDD; 50], false),
crate::frame!(0xEE, 2, vec![0xDD; 50], false),
crate::frame!(0xEE, 3, vec![0xDD; 50], false),
crate::frame!(0xFF, 0, vec![0xDD; 50], false),
crate::frame!(0xFF, 1, vec![0xDD; 50], true),
];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_rollup_config(&RollupConfig { holocene_time: Some(0), ..Default::default() })
.with_origin(BlockInfo::default())
.with_expected_frames(&frames[4..])
.with_frames(&frames)
.build();
assert.holocene_active(true);
assert.next_frames().await;
}
#[tokio::test]
async fn test_holocene_unstarted_channel() {
let frames = [
crate::frame!(0xDD, 0, vec![0xDD; 50], false),
crate::frame!(0xDD, 1, vec![0xDD; 50], false),
crate::frame!(0xDD, 2, vec![0xDD; 50], false),
crate::frame!(0xDD, 3, vec![0xDD; 50], true),
crate::frame!(0xEE, 1, vec![0xDD; 50], false), crate::frame!(0xEE, 2, vec![0xDD; 50], true), crate::frame!(0xFF, 0, vec![0xDD; 50], false),
crate::frame!(0xFF, 1, vec![0xDD; 50], true),
];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_rollup_config(&RollupConfig { holocene_time: Some(0), ..Default::default() })
.with_origin(BlockInfo::default())
.with_expected_frames(&[&frames[0..4], &frames[6..]].concat())
.with_frames(&frames)
.build();
assert.holocene_active(true);
assert.next_frames().await;
}
#[tokio::test]
async fn test_holocene_unclosed_channel_with_invalid_start() {
let frames = [
crate::frame!(0xEE, 0, vec![0xDD; 50], false),
crate::frame!(0xEE, 1, vec![0xDD; 50], false),
crate::frame!(0xEE, 2, vec![0xDD; 50], false),
crate::frame!(0xEE, 3, vec![0xDD; 50], false),
crate::frame!(0xFF, 1, vec![0xDD; 50], false), crate::frame!(0xFF, 2, vec![0xDD; 50], true), ];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_rollup_config(&RollupConfig { holocene_time: Some(0), ..Default::default() })
.with_origin(BlockInfo::default())
.with_expected_frames(&frames[0..4])
.with_frames(&frames)
.build();
assert.holocene_active(true);
assert.next_frames().await;
}
#[tokio::test]
async fn test_holocene_replace_channel() {
let frames = [
crate::frame!(0xDD, 0, vec![0xDD; 50], false),
crate::frame!(0xDD, 1, vec![0xDD; 50], true),
crate::frame!(0xEE, 0, vec![0xDD; 50], false),
crate::frame!(0xEE, 1, vec![0xDD; 50], false),
crate::frame!(0xFF, 0, vec![0xDD; 50], false),
crate::frame!(0xFF, 1, vec![0xDD; 50], true),
];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_rollup_config(&RollupConfig { holocene_time: Some(0), ..Default::default() })
.with_origin(BlockInfo::default())
.with_expected_frames(&[&frames[0..2], &frames[4..]].concat())
.with_frames(&frames)
.build();
assert.holocene_active(true);
assert.next_frames().await;
}
#[tokio::test]
async fn test_holocene_interleaved_invalid_channel() {
let frames = [
crate::frame!(0x01, 0, vec![0xDD; 50], false),
crate::frame!(0x02, 0, vec![0xDD; 50], false),
crate::frame!(0x01, 1, vec![0xDD; 50], true),
crate::frame!(0x02, 1, vec![0xDD; 50], false),
crate::frame!(0xFF, 0, vec![0xDD; 50], false),
crate::frame!(0xFF, 1, vec![0xDD; 50], true),
];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_rollup_config(&RollupConfig { holocene_time: Some(0), ..Default::default() })
.with_origin(BlockInfo::default())
.with_expected_frames(&frames[4..])
.with_frames(&frames)
.build();
assert.holocene_active(true);
assert.next_frames().await;
}
#[tokio::test]
async fn test_holocene_interleaved_valid_channel() {
let frames = [
crate::frame!(0x01, 0, vec![0xDD; 50], false),
crate::frame!(0x02, 0, vec![0xDD; 50], false),
crate::frame!(0x01, 1, vec![0xDD; 50], true),
crate::frame!(0x02, 1, vec![0xDD; 50], true),
crate::frame!(0xFF, 0, vec![0xDD; 50], false),
crate::frame!(0xFF, 1, vec![0xDD; 50], true),
];
let assert = crate::test_utils::FrameQueueBuilder::new()
.with_rollup_config(&RollupConfig { holocene_time: Some(0), ..Default::default() })
.with_origin(BlockInfo::default())
.with_expected_frames(&[&frames[1..2], &frames[3..]].concat())
.with_frames(&frames)
.build();
assert.holocene_active(true);
assert.next_frames().await;
}
}