1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
//! Defines storage layer for finalized blocks.
use anyhow::Context as _;
use std::{collections::VecDeque, fmt, sync::Arc};
use zksync_concurrency::{ctx, error::Wrap as _, scope, sync};
use zksync_consensus_roles::validator;

mod metrics;

/// State of the `BlockStore`: continuous range of blocks.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockStoreState {
    /// Stored block with the lowest number.
    /// If last is `None`, this is the first block that should be fetched.
    pub first: validator::BlockNumber,
    /// Stored block with the highest number.
    /// None iff store is empty.
    pub last: Option<validator::CommitQC>,
}

impl BlockStoreState {
    /// Checks whether block with the given number is stored in the `BlockStore`.
    pub fn contains(&self, number: validator::BlockNumber) -> bool {
        let Some(last) = &self.last else { return false };
        self.first <= number && number <= last.header().number
    }

    /// Number of the next block that can be stored in the `BlockStore`.
    /// (i.e. `last` + 1).
    pub fn next(&self) -> validator::BlockNumber {
        match &self.last {
            Some(qc) => qc.header().number.next(),
            None => self.first,
        }
    }

    /// Verifies `BlockStoreState'.
    pub fn verify(&self, genesis: &validator::Genesis) -> anyhow::Result<()> {
        anyhow::ensure!(
            genesis.first_block <= self.first,
            "first block ({}) doesn't belong to the fork (which starts at block {})",
            self.first,
            genesis.first_block
        );
        if let Some(last) = &self.last {
            anyhow::ensure!(
                self.first <= last.header().number,
                "first block {} has bigger number than the last block {}",
                self.first,
                last.header().number
            );
            last.verify(genesis).context("last.verify()")?;
        }
        Ok(())
    }
}

/// Storage of a continuous range of L2 blocks.
///
/// Implementations **must** propagate context cancellation using [`StorageError::Canceled`].
#[async_trait::async_trait]
pub trait PersistentBlockStore: 'static + fmt::Debug + Send + Sync {
    /// Genesis matching the block store content.
    /// Consensus code calls this method only once.
    async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result<validator::Genesis>;

    /// Range of blocks persisted in storage.
    fn persisted(&self) -> sync::watch::Receiver<BlockStoreState>;

    /// Gets a block by its number.
    /// All the blocks from `state()` range are expected to be available.
    /// Blocks that have been queued but haven't been persisted yet don't have to be available.
    /// Returns error if block is missing.
    async fn block(
        &self,
        ctx: &ctx::Ctx,
        number: validator::BlockNumber,
    ) -> ctx::Result<validator::FinalBlock>;

    /// Queue the block to be persisted in storage.
    /// `queue_next_block()` may return BEFORE the block is actually persisted,
    /// but if the call succeeded the block is expected to be persisted eventually.
    /// Implementations are only required to accept a block directly after the previous queued
    /// block, starting with `persisted().borrow().next()`.
    async fn queue_next_block(
        &self,
        ctx: &ctx::Ctx,
        block: validator::FinalBlock,
    ) -> ctx::Result<()>;
}

#[derive(Debug)]
struct Inner {
    queued: BlockStoreState,
    persisted: BlockStoreState,
    cache: VecDeque<validator::FinalBlock>,
}

impl Inner {
    /// Minimal number of most recent blocks to keep in memory.
    /// It allows to serve the recent blocks to peers fast, even
    /// if persistent storage reads are slow (like in RocksDB).
    /// `BlockStore` may keep in memory more blocks in case
    /// blocks are queued faster than they are persisted.
    const CACHE_CAPACITY: usize = 100;

    /// Tries to push the next block to cache.
    /// Noop if provided block is not the expected one.
    /// Returns true iff cache has been modified.
    fn try_push(&mut self, block: validator::FinalBlock) -> bool {
        if self.queued.next() != block.number() {
            return false;
        }
        self.queued.last = Some(block.justification.clone());
        self.cache.push_back(block);
        self.truncate_cache();
        true
    }

    /// Updates `persisted` field.
    fn update_persisted(&mut self, persisted: BlockStoreState) -> anyhow::Result<()> {
        if persisted.next() < self.persisted.next() {
            anyhow::bail!("head block has been removed from storage, this is not supported");
        }
        self.persisted = persisted;
        if self.queued.first < self.persisted.first {
            self.queued.first = self.persisted.first;
        }
        // If persisted blocks overtook the queue (blocks were fetched via some side-channel),
        // it means we need to reset the cache - otherwise we would have a gap.
        if self.queued.next() < self.persisted.next() {
            self.queued = self.persisted.clone();
            self.cache.clear();
        }
        self.truncate_cache();
        Ok(())
    }

    /// If cache size has been exceeded, remove entries which were already persisted.
    fn truncate_cache(&mut self) {
        while self.cache.len() > Self::CACHE_CAPACITY
            && self.persisted.next() > self.cache[0].number()
        {
            self.cache.pop_front();
        }
    }

    fn block(&self, n: validator::BlockNumber) -> Option<validator::FinalBlock> {
        // Subtraction is safe, because blocks in cache are
        // stored in increasing order of block number.
        let first = self.cache.front()?;
        self.cache.get((n.0 - first.number().0) as usize).cloned()
    }
}

/// A wrapper around a PersistentBlockStore which adds caching blocks in-memory
/// and other useful utilities.
#[derive(Debug)]
pub struct BlockStore {
    inner: sync::watch::Sender<Inner>,
    persistent: Box<dyn PersistentBlockStore>,
    genesis: validator::Genesis,
}

/// Runner of the BlockStore background tasks.
#[must_use]
#[derive(Debug, Clone)]
pub struct BlockStoreRunner(Arc<BlockStore>);

impl BlockStoreRunner {
    /// Runs the background tasks of the BlockStore.
    pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
        #[vise::register]
        static COLLECTOR: vise::Collector<Option<metrics::BlockStore>> = vise::Collector::new();
        let store_ref = Arc::downgrade(&self.0);
        let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics()));

        let res = scope::run!(ctx, |ctx, s| async {
            s.spawn::<()>(async {
                // Task watching the persisted state.
                let mut persisted = self.0.persistent.persisted();
                persisted.mark_changed();
                loop {
                    let new = sync::changed(ctx, &mut persisted).await?.clone();
                    sync::try_send_modify(&self.0.inner, |inner| inner.update_persisted(new))?;
                }
            });
            // Task queueing blocks to be persisted.
            let inner = &mut self.0.inner.subscribe();
            let mut queue_next = validator::BlockNumber(0);
            loop {
                let block = sync::wait_for_some(ctx, inner, |inner| {
                    inner.block(queue_next.max(inner.persisted.next()))
                })
                .await?;
                queue_next = block.number().next();
                // TODO: monitor errors as well.
                let t = metrics::PERSISTENT_BLOCK_STORE
                    .queue_next_block_latency
                    .start();
                self.0.persistent.queue_next_block(ctx, block).await?;
                t.observe();
            }
        })
        .await;
        match res {
            Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
            Err(ctx::Error::Internal(err)) => Err(err),
        }
    }
}

impl BlockStore {
    /// Constructs a BlockStore.
    /// BlockStore takes ownership of the passed PersistentBlockStore,
    /// i.e. caller should modify the underlying persistent storage
    /// ONLY through the constructed BlockStore.
    pub async fn new(
        ctx: &ctx::Ctx,
        persistent: Box<dyn PersistentBlockStore>,
    ) -> ctx::Result<(Arc<Self>, BlockStoreRunner)> {
        let t = metrics::PERSISTENT_BLOCK_STORE.genesis_latency.start();
        let genesis = persistent.genesis(ctx).await.wrap("persistent.genesis()")?;
        t.observe();
        let persisted = persistent.persisted().borrow().clone();
        persisted.verify(&genesis).context("state.verify()")?;
        let this = Arc::new(Self {
            inner: sync::watch::channel(Inner {
                queued: persisted.clone(),
                persisted,
                cache: VecDeque::new(),
            })
            .0,
            genesis,
            persistent,
        });
        Ok((this.clone(), BlockStoreRunner(this)))
    }

    /// Genesis specification for this block store.
    pub fn genesis(&self) -> &validator::Genesis {
        &self.genesis
    }

    /// Available blocks (in memory & persisted).
    pub fn queued(&self) -> BlockStoreState {
        self.inner.borrow().queued.clone()
    }

    /// Fetches a block (from queue or persistent storage).
    pub async fn block(
        &self,
        ctx: &ctx::Ctx,
        number: validator::BlockNumber,
    ) -> ctx::Result<Option<validator::FinalBlock>> {
        {
            let inner = self.inner.borrow();
            if !inner.queued.contains(number) {
                return Ok(None);
            }
            if let Some(block) = inner.block(number) {
                return Ok(Some(block));
            }
        }
        let t = metrics::PERSISTENT_BLOCK_STORE.block_latency.start();
        let block = self
            .persistent
            .block(ctx, number)
            .await
            .wrap("persistent.block()")?;
        t.observe();
        Ok(Some(block))
    }

    /// Append block to a queue to be persisted eventually.
    /// Since persisting a block may take a significant amount of time,
    /// BlockStore contains a queue of blocks waiting to be persisted.
    /// `queue_block()` adds a block to the queue as soon as all intermediate
    /// blocks are queued_state as well. Queue is unbounded, so it is caller's
    /// responsibility to manage the queue size.
    pub async fn queue_block(
        &self,
        ctx: &ctx::Ctx,
        block: validator::FinalBlock,
    ) -> ctx::Result<()> {
        block.verify(&self.genesis).context("block.verify()")?;
        sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| {
            inner.queued.next() >= block.number()
        })
        .await?;
        self.inner.send_if_modified(|inner| inner.try_push(block));
        Ok(())
    }

    /// Waits until the queued blocks range is different than `old`.
    pub async fn wait_for_queued_change(
        &self,
        ctx: &ctx::Ctx,
        old: &BlockStoreState,
    ) -> ctx::OrCanceled<BlockStoreState> {
        sync::wait_for_some(ctx, &mut self.inner.subscribe(), |inner| {
            if &inner.queued == old {
                return None;
            }
            Some(inner.queued.clone())
        })
        .await
    }

    /// Waits until the given block is queued (in memory, or persisted).
    /// Note that it doesn't mean that the block is actually available, as old blocks might get pruned.
    pub async fn wait_until_queued(
        &self,
        ctx: &ctx::Ctx,
        number: validator::BlockNumber,
    ) -> ctx::OrCanceled<BlockStoreState> {
        Ok(sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| {
            number < inner.queued.next()
        })
        .await?
        .queued
        .clone())
    }

    /// Waits until the given block is stored persistently.
    /// Note that it doesn't mean that the block is actually available, as old blocks might get pruned.
    pub async fn wait_until_persisted(
        &self,
        ctx: &ctx::Ctx,
        number: validator::BlockNumber,
    ) -> ctx::OrCanceled<BlockStoreState> {
        Ok(
            sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| {
                number < persisted.next()
            })
            .await?
            .clone(),
        )
    }

    fn scrape_metrics(&self) -> metrics::BlockStore {
        let m = metrics::BlockStore::default();
        let inner = self.inner.borrow();
        m.next_queued_block.set(inner.queued.next().0);
        m.next_persisted_block.set(inner.persisted.next().0);
        m
    }
}