kona_derive_alloy/
pipeline.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
//! Helper to construct a [DerivationPipeline] using online types.

use kona_derive::{
    attributes::StatefulAttributesBuilder,
    pipeline::{DerivationPipeline, PipelineBuilder},
    sources::EthereumDataSource,
    stages::{
        AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
        L1Retrieval, L1Traversal,
    },
};
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::BlockInfo;
use std::sync::Arc;

use crate::{
    AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProviderWithFallback,
};

/// An online derivation pipeline.
pub type OnlinePipeline =
    DerivationPipeline<OnlineAttributesQueue<OnlineDataProvider>, AlloyL2ChainProvider>;

/// An `online` Ethereum data source.
pub type OnlineDataProvider = EthereumDataSource<
    AlloyChainProvider,
    OnlineBlobProviderWithFallback<OnlineBeaconClient, OnlineBeaconClient>,
>;

/// An `online` payload attributes builder for the `AttributesQueue` stage of the derivation
/// pipeline.
pub type OnlineAttributesBuilder =
    StatefulAttributesBuilder<AlloyChainProvider, AlloyL2ChainProvider>;

/// An `online` attributes queue for the derivation pipeline.
pub type OnlineAttributesQueue<DAP> = AttributesQueue<
    BatchProvider<
        BatchStream<
            ChannelReader<
                ChannelProvider<FrameQueue<L1Retrieval<DAP, L1Traversal<AlloyChainProvider>>>>,
            >,
            AlloyL2ChainProvider,
        >,
        AlloyL2ChainProvider,
    >,
    OnlineAttributesBuilder,
>;

/// Creates a new online [DerivationPipeline] from the given inputs.
/// Internally, this uses the [PipelineBuilder] to construct the pipeline.
pub fn new_online_pipeline(
    rollup_config: Arc<RollupConfig>,
    chain_provider: AlloyChainProvider,
    dap_source: OnlineDataProvider,
    l2_chain_provider: AlloyL2ChainProvider,
    builder: OnlineAttributesBuilder,
    origin: BlockInfo,
) -> OnlinePipeline {
    PipelineBuilder::new()
        .rollup_config(rollup_config)
        .dap_source(dap_source)
        .l2_chain_provider(l2_chain_provider)
        .chain_provider(chain_provider)
        .builder(builder)
        .origin(origin)
        .build()
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::OnlineBlobProvider;
    use kona_derive::prelude::OriginProvider;

    #[test]
    fn test_new_online_pipeline() {
        let rollup_config = Arc::new(RollupConfig::default());
        let chain_provider =
            AlloyChainProvider::new_http("http://127.0.0.1:8545".try_into().unwrap());
        let l2_chain_provider = AlloyL2ChainProvider::new_http(
            "http://127.0.0.1:9545".try_into().unwrap(),
            rollup_config.clone(),
        );
        let beacon_client = OnlineBeaconClient::new_http("http://127.0.0.1:5555".into());
        let blob_provider = OnlineBlobProvider::new(beacon_client, None, None);
        let blob_provider = OnlineBlobProviderWithFallback::new(blob_provider, None);
        let dap_source = EthereumDataSource::new_from_parts(
            chain_provider.clone(),
            blob_provider,
            &rollup_config,
        );
        let builder = StatefulAttributesBuilder::new(
            rollup_config.clone(),
            l2_chain_provider.clone(),
            chain_provider.clone(),
        );
        let origin = BlockInfo::default();

        let pipeline = new_online_pipeline(
            rollup_config.clone(),
            chain_provider,
            dap_source,
            l2_chain_provider,
            builder,
            origin,
        );

        assert_eq!(pipeline.rollup_config, rollup_config);
        assert_eq!(pipeline.origin(), Some(origin));
    }
}