kona_derive/sources/
calldata.rsuse crate::{
errors::PipelineError,
traits::{ChainProvider, DataAvailabilityProvider},
types::PipelineResult,
};
use alloc::{boxed::Box, collections::VecDeque};
use alloy_consensus::{Transaction, TxEnvelope};
use alloy_primitives::{Address, Bytes};
use async_trait::async_trait;
use op_alloy_protocol::BlockInfo;
#[derive(Debug, Clone)]
pub struct CalldataSource<CP>
where
CP: ChainProvider + Send,
{
pub chain_provider: CP,
pub batch_inbox_address: Address,
pub signer: Address,
pub calldata: VecDeque<Bytes>,
pub open: bool,
}
impl<CP: ChainProvider + Send> CalldataSource<CP> {
pub const fn new(chain_provider: CP, batch_inbox_address: Address, signer: Address) -> Self {
Self { chain_provider, batch_inbox_address, signer, calldata: VecDeque::new(), open: false }
}
async fn load_calldata(&mut self, block_ref: &BlockInfo) -> Result<(), CP::Error> {
if self.open {
return Ok(());
}
let (_, txs) =
self.chain_provider.block_info_and_transactions_by_hash(block_ref.hash).await?;
self.calldata = txs
.iter()
.filter_map(|tx| {
let (tx_kind, data) = match tx {
TxEnvelope::Legacy(tx) => (tx.tx().to(), tx.tx().input()),
TxEnvelope::Eip2930(tx) => (tx.tx().to(), tx.tx().input()),
TxEnvelope::Eip1559(tx) => (tx.tx().to(), tx.tx().input()),
_ => return None,
};
let to = tx_kind?;
if to != self.batch_inbox_address {
return None;
}
if tx.recover_signer().ok()? != self.signer {
return None;
}
Some(data.to_vec().into())
})
.collect::<VecDeque<_>>();
self.open = true;
Ok(())
}
}
#[async_trait]
impl<CP: ChainProvider + Send> DataAvailabilityProvider for CalldataSource<CP> {
type Item = Bytes;
async fn next(&mut self, block_ref: &BlockInfo) -> PipelineResult<Self::Item> {
self.load_calldata(block_ref).await.map_err(Into::into)?;
self.calldata.pop_front().ok_or(PipelineError::Eof.temp())
}
fn clear(&mut self) {
self.calldata.clear();
self.open = false;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{errors::PipelineErrorKind, test_utils::TestChainProvider};
use alloc::{vec, vec::Vec};
use alloy_consensus::{Signed, TxEip2930, TxEip4844, TxEip4844Variant, TxLegacy};
use alloy_primitives::{address, Address, PrimitiveSignature as Signature, TxKind};
pub(crate) fn test_legacy_tx(to: Address) -> TxEnvelope {
let sig = Signature::test_signature();
TxEnvelope::Legacy(Signed::new_unchecked(
TxLegacy { to: TxKind::Call(to), ..Default::default() },
sig,
Default::default(),
))
}
pub(crate) fn test_eip2930_tx(to: Address) -> TxEnvelope {
let sig = Signature::test_signature();
TxEnvelope::Eip2930(Signed::new_unchecked(
TxEip2930 { to: TxKind::Call(to), ..Default::default() },
sig,
Default::default(),
))
}
pub(crate) fn test_blob_tx(to: Address) -> TxEnvelope {
let sig = Signature::test_signature();
TxEnvelope::Eip4844(Signed::new_unchecked(
TxEip4844Variant::TxEip4844(TxEip4844 { to, ..Default::default() }),
sig,
Default::default(),
))
}
pub(crate) fn default_test_calldata_source() -> CalldataSource<TestChainProvider> {
CalldataSource::new(TestChainProvider::default(), Default::default(), Default::default())
}
#[tokio::test]
async fn test_clear_calldata() {
let mut source = default_test_calldata_source();
source.open = true;
source.calldata.push_back(Bytes::default());
source.clear();
assert!(source.calldata.is_empty());
assert!(!source.open);
}
#[tokio::test]
async fn test_load_calldata_open() {
let mut source = default_test_calldata_source();
source.open = true;
assert!(source.load_calldata(&BlockInfo::default()).await.is_ok());
}
#[tokio::test]
async fn test_load_calldata_provider_err() {
let mut source = default_test_calldata_source();
assert!(source.load_calldata(&BlockInfo::default()).await.is_err());
}
#[tokio::test]
async fn test_load_calldata_chain_provider_empty_txs() {
let mut source = default_test_calldata_source();
let block_info = BlockInfo::default();
source.chain_provider.insert_block_with_transactions(0, block_info, Vec::new());
assert!(!source.open); assert!(source.load_calldata(&BlockInfo::default()).await.is_ok());
assert!(source.calldata.is_empty());
assert!(source.open);
}
#[tokio::test]
async fn test_load_calldata_wrong_batch_inbox_address() {
let batch_inbox_address = address!("0123456789012345678901234567890123456789");
let mut source = default_test_calldata_source();
let block_info = BlockInfo::default();
let tx = test_legacy_tx(batch_inbox_address);
source.chain_provider.insert_block_with_transactions(0, block_info, vec![tx]);
assert!(!source.open); assert!(source.load_calldata(&BlockInfo::default()).await.is_ok());
assert!(source.calldata.is_empty());
assert!(source.open);
}
#[tokio::test]
async fn test_load_calldata_wrong_signer() {
let batch_inbox_address = address!("0123456789012345678901234567890123456789");
let mut source = default_test_calldata_source();
source.batch_inbox_address = batch_inbox_address;
let block_info = BlockInfo::default();
let tx = test_legacy_tx(batch_inbox_address);
source.chain_provider.insert_block_with_transactions(0, block_info, vec![tx]);
assert!(!source.open); assert!(source.load_calldata(&BlockInfo::default()).await.is_ok());
assert!(source.calldata.is_empty());
assert!(source.open);
}
#[tokio::test]
async fn test_load_calldata_valid_legacy_tx() {
let batch_inbox_address = address!("0123456789012345678901234567890123456789");
let mut source = default_test_calldata_source();
source.batch_inbox_address = batch_inbox_address;
let tx = test_legacy_tx(batch_inbox_address);
source.signer = tx.recover_signer().unwrap();
let block_info = BlockInfo::default();
source.chain_provider.insert_block_with_transactions(0, block_info, vec![tx]);
assert!(!source.open); assert!(source.load_calldata(&BlockInfo::default()).await.is_ok());
assert!(!source.calldata.is_empty()); assert!(source.open);
}
#[tokio::test]
async fn test_load_calldata_valid_eip2930_tx() {
let batch_inbox_address = address!("0123456789012345678901234567890123456789");
let mut source = default_test_calldata_source();
source.batch_inbox_address = batch_inbox_address;
let tx = test_eip2930_tx(batch_inbox_address);
source.signer = tx.recover_signer().unwrap();
let block_info = BlockInfo::default();
source.chain_provider.insert_block_with_transactions(0, block_info, vec![tx]);
assert!(!source.open); assert!(source.load_calldata(&BlockInfo::default()).await.is_ok());
assert!(!source.calldata.is_empty()); assert!(source.open);
}
#[tokio::test]
async fn test_load_calldata_blob_tx_ignored() {
let batch_inbox_address = address!("0123456789012345678901234567890123456789");
let mut source = default_test_calldata_source();
source.batch_inbox_address = batch_inbox_address;
let tx = test_blob_tx(batch_inbox_address);
source.signer = tx.recover_signer().unwrap();
let block_info = BlockInfo::default();
source.chain_provider.insert_block_with_transactions(0, block_info, vec![tx]);
assert!(!source.open); assert!(source.load_calldata(&BlockInfo::default()).await.is_ok());
assert!(source.calldata.is_empty());
assert!(source.open);
}
#[tokio::test]
async fn test_next_err_loading_calldata() {
let mut source = default_test_calldata_source();
assert!(matches!(
source.next(&BlockInfo::default()).await,
Err(PipelineErrorKind::Temporary(_))
));
}
}