kona_derive/stages/
attributes_queue.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
//! Contains the logic for the `AttributesQueue` stage.

use crate::{
    errors::{PipelineError, ResetError},
    traits::{
        AttributesBuilder, AttributesProvider, NextAttributes, OriginAdvancer, OriginProvider,
        SignalReceiver,
    },
    types::{PipelineResult, Signal},
};
use alloc::{boxed::Box, sync::Arc};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::{BlockInfo, L2BlockInfo, SingleBatch};
use op_alloy_rpc_types_engine::{OpAttributesWithParent, OpPayloadAttributes};
use tracing::info;

/// [AttributesQueue] accepts batches from the [BatchQueue] stage
/// and transforms them into [OpPayloadAttributes].
///
/// The outputted payload attributes cannot be buffered because each batch->attributes
/// transformation pulls in data about the current L2 safe head.
///
/// [AttributesQueue] also buffers batches that have been output because
/// multiple batches can be created at once.
///
/// This stage can be reset by clearing its batch buffer.
/// This stage does not need to retain any references to L1 blocks.
///
/// [BatchQueue]: crate::stages::BatchQueue
#[derive(Debug)]
pub struct AttributesQueue<P, AB>
where
    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
    AB: AttributesBuilder + Debug,
{
    /// The rollup config.
    cfg: Arc<RollupConfig>,
    /// The previous stage of the derivation pipeline.
    prev: P,
    /// Whether the current batch is the last in its span.
    is_last_in_span: bool,
    /// The current batch being processed.
    batch: Option<SingleBatch>,
    /// The attributes builder.
    builder: AB,
}

impl<P, AB> AttributesQueue<P, AB>
where
    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
    AB: AttributesBuilder + Debug,
{
    /// Create a new [AttributesQueue] stage.
    pub const fn new(cfg: Arc<RollupConfig>, prev: P, builder: AB) -> Self {
        Self { cfg, prev, is_last_in_span: false, batch: None, builder }
    }

    /// Loads a [SingleBatch] from the [AttributesProvider] if needed.
    pub async fn load_batch(&mut self, parent: L2BlockInfo) -> PipelineResult<SingleBatch> {
        if self.batch.is_none() {
            let batch = self.prev.next_batch(parent).await?;
            self.batch = Some(batch);
            self.is_last_in_span = self.prev.is_last_in_span();
        }
        self.batch.as_ref().cloned().ok_or(PipelineError::Eof.temp())
    }

    /// Returns the next [OpAttributesWithParent] from the current batch.
    pub async fn next_attributes(
        &mut self,
        parent: L2BlockInfo,
    ) -> PipelineResult<OpAttributesWithParent> {
        let batch = match self.load_batch(parent).await {
            Ok(batch) => batch,
            Err(e) => {
                return Err(e);
            }
        };

        // Construct the payload attributes from the loaded batch.
        let attributes = match self.create_next_attributes(batch, parent).await {
            Ok(attributes) => attributes,
            Err(e) => {
                return Err(e);
            }
        };
        let populated_attributes =
            OpAttributesWithParent { attributes, parent, is_last_in_span: self.is_last_in_span };

        // Clear out the local state once payload attributes are prepared.
        self.batch = None;
        self.is_last_in_span = false;
        Ok(populated_attributes)
    }

    /// Creates the next attributes, transforming a [SingleBatch] into [OpPayloadAttributes].
    /// This sets `no_tx_pool` and appends the batched txs to the attributes tx list.
    pub async fn create_next_attributes(
        &mut self,
        batch: SingleBatch,
        parent: L2BlockInfo,
    ) -> PipelineResult<OpPayloadAttributes> {
        // Sanity check parent hash
        if batch.parent_hash != parent.block_info.hash {
            return Err(ResetError::BadParentHash(batch.parent_hash, parent.block_info.hash).into());
        }

        // Sanity check timestamp
        let actual = parent.block_info.timestamp + self.cfg.block_time;
        if actual != batch.timestamp {
            return Err(ResetError::BadTimestamp(batch.timestamp, actual).into());
        }

        // Prepare the payload attributes
        let tx_count = batch.transactions.len();
        let mut attributes = self.builder.prepare_payload_attributes(parent, batch.epoch()).await?;
        attributes.no_tx_pool = Some(true);
        match attributes.transactions {
            Some(ref mut txs) => txs.extend(batch.transactions),
            None => {
                if !batch.transactions.is_empty() {
                    attributes.transactions = Some(batch.transactions);
                }
            }
        }

        info!(
            target: "attributes-queue",
            "generated attributes in payload queue: txs={}, timestamp={}",
            tx_count, batch.timestamp
        );

        Ok(attributes)
    }
}

#[async_trait]
impl<P, AB> OriginAdvancer for AttributesQueue<P, AB>
where
    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send,
    AB: AttributesBuilder + Debug + Send,
{
    async fn advance_origin(&mut self) -> PipelineResult<()> {
        self.prev.advance_origin().await
    }
}

#[async_trait]
impl<P, AB> NextAttributes for AttributesQueue<P, AB>
where
    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send,
    AB: AttributesBuilder + Debug + Send,
{
    async fn next_attributes(
        &mut self,
        parent: L2BlockInfo,
    ) -> PipelineResult<OpAttributesWithParent> {
        self.next_attributes(parent).await
    }
}

impl<P, AB> OriginProvider for AttributesQueue<P, AB>
where
    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
    AB: AttributesBuilder + Debug,
{
    fn origin(&self) -> Option<BlockInfo> {
        self.prev.origin()
    }
}

#[async_trait]
impl<P, AB> SignalReceiver for AttributesQueue<P, AB>
where
    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
    AB: AttributesBuilder + Send + Debug,
{
    async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
        match signal {
            s @ Signal::Reset(_) | s @ Signal::Activation(_) => {
                self.prev.signal(s).await?;
                self.batch = None;
                self.is_last_in_span = false;
            }
            s @ Signal::FlushChannel => {
                self.batch = None;
                self.prev.signal(s).await?;
            }
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        errors::{BuilderError, PipelineErrorKind},
        test_utils::{new_test_attributes_provider, TestAttributesBuilder, TestAttributesProvider},
        types::ResetSignal,
    };
    use alloc::{sync::Arc, vec, vec::Vec};
    use alloy_primitives::{b256, Address, Bytes, B256};
    use alloy_rpc_types_engine::PayloadAttributes;

    fn default_optimism_payload_attributes() -> OpPayloadAttributes {
        OpPayloadAttributes {
            payload_attributes: PayloadAttributes {
                timestamp: 0,
                suggested_fee_recipient: Address::default(),
                prev_randao: B256::default(),
                withdrawals: None,
                parent_beacon_block_root: None,
            },
            no_tx_pool: Some(false),
            transactions: None,
            gas_limit: None,
            eip_1559_params: None,
        }
    }

    fn new_attributes_queue(
        cfg: Option<RollupConfig>,
        origin: Option<BlockInfo>,
        batches: Vec<PipelineResult<SingleBatch>>,
    ) -> AttributesQueue<TestAttributesProvider, TestAttributesBuilder> {
        let cfg = cfg.unwrap_or_default();
        let mock_batch_queue = new_test_attributes_provider(origin, batches);
        let mock_attributes_builder = TestAttributesBuilder::default();
        AttributesQueue::new(Arc::new(cfg), mock_batch_queue, mock_attributes_builder)
    }

    #[tokio::test]
    async fn test_attributes_queue_flush() {
        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
        attributes_queue.batch = Some(SingleBatch::default());
        assert!(!attributes_queue.prev.flushed);
        attributes_queue.signal(Signal::FlushChannel).await.unwrap();
        assert!(attributes_queue.prev.flushed);
        assert!(attributes_queue.batch.is_none());
    }

    #[tokio::test]
    async fn test_attributes_queue_reset() {
        let cfg = RollupConfig::default();
        let mock = new_test_attributes_provider(None, vec![]);
        let mock_builder = TestAttributesBuilder::default();
        let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder);
        aq.batch = Some(SingleBatch::default());
        assert!(!aq.prev.reset);
        aq.signal(ResetSignal::default().signal()).await.unwrap();
        assert!(aq.batch.is_none());
        assert!(aq.prev.reset);
    }

    #[tokio::test]
    async fn test_load_batch_eof() {
        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
        let parent = L2BlockInfo::default();
        let result = attributes_queue.load_batch(parent).await.unwrap_err();
        assert_eq!(result, PipelineError::Eof.temp());
    }

    #[tokio::test]
    async fn test_load_batch_last_in_span() {
        let mut attributes_queue = new_attributes_queue(None, None, vec![Ok(Default::default())]);
        let parent = L2BlockInfo::default();
        let result = attributes_queue.load_batch(parent).await.unwrap();
        assert_eq!(result, Default::default());
        assert!(attributes_queue.is_last_in_span);
    }

    #[tokio::test]
    async fn test_create_next_attributes_bad_parent_hash() {
        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
        let bad_hash = b256!("6666666666666666666666666666666666666666666666666666666666666666");
        let parent = L2BlockInfo {
            block_info: BlockInfo { hash: bad_hash, ..Default::default() },
            ..Default::default()
        };
        let batch = SingleBatch::default();
        let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
        assert_eq!(
            result,
            PipelineErrorKind::Reset(ResetError::BadParentHash(Default::default(), bad_hash))
        );
    }

    #[tokio::test]
    async fn test_create_next_attributes_bad_timestamp() {
        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
        let parent = L2BlockInfo::default();
        let batch = SingleBatch { timestamp: 1, ..Default::default() };
        let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
        assert_eq!(result, PipelineErrorKind::Reset(ResetError::BadTimestamp(1, 0)));
    }

    #[tokio::test]
    async fn test_create_next_attributes_bad_parent_timestamp() {
        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
        let parent = L2BlockInfo {
            block_info: BlockInfo { timestamp: 2, ..Default::default() },
            ..Default::default()
        };
        let batch = SingleBatch { timestamp: 1, ..Default::default() };
        let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
        assert_eq!(result, PipelineErrorKind::Reset(ResetError::BadTimestamp(1, 2)));
    }

    #[tokio::test]
    async fn test_create_next_attributes_bad_config_timestamp() {
        let cfg = RollupConfig { block_time: 1, ..Default::default() };
        let mut attributes_queue = new_attributes_queue(Some(cfg), None, vec![]);
        let parent = L2BlockInfo {
            block_info: BlockInfo { timestamp: 1, ..Default::default() },
            ..Default::default()
        };
        let batch = SingleBatch { timestamp: 1, ..Default::default() };
        let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
        assert_eq!(result, PipelineErrorKind::Reset(ResetError::BadTimestamp(1, 2)));
    }

    #[tokio::test]
    async fn test_create_next_attributes_preparation_fails() {
        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
        let parent = L2BlockInfo::default();
        let batch = SingleBatch::default();
        let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
        assert_eq!(
            result,
            PipelineError::AttributesBuilder(BuilderError::AttributesUnavailable).crit()
        );
    }

    #[tokio::test]
    async fn test_create_next_attributes_success() {
        let cfg = RollupConfig::default();
        let mock = new_test_attributes_provider(None, vec![]);
        let mut payload_attributes = default_optimism_payload_attributes();
        let mock_builder =
            TestAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] };
        let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder);
        let parent = L2BlockInfo::default();
        let txs = vec![Bytes::default(), Bytes::default()];
        let batch = SingleBatch { transactions: txs.clone(), ..Default::default() };
        let attributes = aq.create_next_attributes(batch, parent).await.unwrap();
        // update the expected attributes
        payload_attributes.no_tx_pool = Some(true);
        match payload_attributes.transactions {
            Some(ref mut t) => t.extend(txs),
            None => payload_attributes.transactions = Some(txs),
        }
        assert_eq!(attributes, payload_attributes);
    }

    #[tokio::test]
    async fn test_next_attributes_load_batch_eof() {
        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
        let parent = L2BlockInfo::default();
        let result = attributes_queue.next_attributes(parent).await.unwrap_err();
        assert_eq!(result, PipelineError::Eof.temp());
    }

    #[tokio::test]
    async fn test_next_attributes_load_batch_last_in_span() {
        let cfg = RollupConfig::default();
        let mock = new_test_attributes_provider(None, vec![Ok(Default::default())]);
        let mut pa = default_optimism_payload_attributes();
        let mock_builder = TestAttributesBuilder { attributes: vec![Ok(pa.clone())] };
        let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder);
        // If we load the batch, we should get the last in span.
        // But it won't take it so it will be available in the next_attributes call.
        let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap();
        assert!(aq.is_last_in_span);
        assert!(aq.batch.is_some());
        // This should successfully construct the next payload attributes.
        // It should also reset the last in span flag and clear the batch.
        let attributes = aq.next_attributes(L2BlockInfo::default()).await.unwrap();
        pa.no_tx_pool = Some(true);
        let populated_attributes = OpAttributesWithParent {
            attributes: pa,
            parent: L2BlockInfo::default(),
            is_last_in_span: true,
        };
        assert_eq!(attributes, populated_attributes);
        assert!(!aq.is_last_in_span);
        assert!(aq.batch.is_none());
    }
}