ckb_freezer/
freezer.rs

1use crate::freezer_files::FreezerFiles;
2use crate::internal_error;
3use ckb_error::Error;
4use ckb_types::{
5    core::{BlockNumber, BlockView, HeaderView},
6    packed,
7    prelude::*,
8};
9use ckb_util::Mutex;
10use fs2::FileExt;
11use std::collections::BTreeMap;
12use std::fs::{File, OpenOptions};
13use std::path::{Path, PathBuf};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16
17const LOCKNAME: &str = "FLOCK";
18
19/// freeze result represent blkhash -> (blknum, txsnum) btree-map
20/// sorted blkhash for making ranges for compaction
21type FreezeResult = BTreeMap<packed::Byte32, (BlockNumber, u32)>;
22
23struct Inner {
24    pub(crate) files: FreezerFiles,
25    pub(crate) tip: Option<HeaderView>,
26}
27
28/// Freezer is an memory mapped append-only database to store immutable chain data into flat files
29#[derive(Clone)]
30pub struct Freezer {
31    inner: Arc<Mutex<Inner>>,
32    number: Arc<AtomicU64>,
33    /// stop flag
34    pub stopped: Arc<AtomicBool>,
35    /// file lock to prevent double opens
36    pub(crate) _lock: Arc<File>,
37}
38
39impl Freezer {
40    /// Creates a freezer at specified path
41    pub fn open(path: PathBuf) -> Result<Freezer, Error> {
42        let lock_path = path.join(LOCKNAME);
43        let lock = OpenOptions::new()
44            .write(true)
45            .create(true)
46            .truncate(false)
47            .open(lock_path)
48            .map_err(internal_error)?;
49        lock.try_lock_exclusive().map_err(internal_error)?;
50        let mut files = FreezerFiles::open(path).map_err(internal_error)?;
51        let freezer_number = files.number();
52
53        let mut tip = None;
54        if freezer_number > 1 {
55            let raw_block = files
56                .retrieve(freezer_number - 1)
57                .map_err(internal_error)?
58                .ok_or_else(|| internal_error("freezer inconsistent"))?;
59            let block = packed::BlockReader::from_compatible_slice(&raw_block)
60                .map_err(internal_error)?
61                .to_entity();
62            if block.count_extra_fields() > 1 {
63                return Err(internal_error("block has more than one extra fields"));
64            }
65            tip = Some(block.header().into_view());
66        }
67
68        let inner = Inner { files, tip };
69        Ok(Freezer {
70            number: Arc::clone(&inner.files.number),
71            inner: Arc::new(Mutex::new(inner)),
72            stopped: Arc::new(AtomicBool::new(false)),
73            _lock: Arc::new(lock),
74        })
75    }
76
77    /// Creates a freezer at temporary path
78    pub fn open_in<P: AsRef<Path>>(path: P) -> Result<Freezer, Error> {
79        Self::open(path.as_ref().to_path_buf())
80    }
81
82    /// Freeze background process that periodically checks the chain data for any
83    /// import progress and moves ancient data from the kv-db into the freezer.
84    pub fn freeze<F>(
85        &self,
86        threshold: BlockNumber,
87        get_block_by_number: F,
88    ) -> Result<FreezeResult, Error>
89    where
90        F: Fn(BlockNumber) -> Option<BlockView>,
91    {
92        let number = self.number();
93        let mut guard = self.inner.lock();
94        let mut ret = BTreeMap::new();
95        ckb_logger::trace!(
96            "Freezer process initiated, starting from {}, threshold {}",
97            number,
98            threshold
99        );
100
101        for number in number..threshold {
102            if self.stopped.load(Ordering::SeqCst) {
103                guard.files.sync_all().map_err(internal_error)?;
104                return Ok(ret);
105            }
106
107            if let Some(block) = get_block_by_number(number) {
108                if let Some(ref header) = guard.tip {
109                    if header.hash() != block.header().parent_hash() {
110                        return Err(internal_error(format!(
111                            "appending unexpected block expected parent_hash {} have {}",
112                            header.hash(),
113                            block.header().parent_hash()
114                        )));
115                    }
116                }
117                let raw_block = block.data();
118                guard
119                    .files
120                    .append(number, raw_block.as_slice())
121                    .map_err(internal_error)?;
122
123                ret.insert(
124                    block.header().hash(),
125                    (number, block.transactions().len() as u32),
126                );
127                guard.tip = Some(block.header());
128                ckb_logger::trace!("Freezer block append {}", number);
129
130                if let Some(metrics) = ckb_metrics::handle() {
131                    metrics.ckb_freezer_number.set(number as i64);
132                }
133            } else {
134                ckb_logger::error!("Freezer block missing {}", number);
135                break;
136            }
137        }
138        guard.files.sync_all().map_err(internal_error)?;
139        Ok(ret)
140    }
141
142    /// Retrieve an item with the given number
143    pub fn retrieve(&self, number: BlockNumber) -> Result<Option<Vec<u8>>, Error> {
144        self.inner
145            .lock()
146            .files
147            .retrieve(number)
148            .map_err(internal_error)
149    }
150
151    /// Return total item number in the freezer
152    pub fn number(&self) -> BlockNumber {
153        self.number.load(Ordering::SeqCst)
154    }
155
156    /// Truncate discards any recent data above the provided threshold number.
157    pub fn truncate(&self, item: u64) -> Result<(), Error> {
158        if item > 0 && ((item + 1) < self.number()) {
159            let mut inner = self.inner.lock();
160            inner.files.truncate(item).map_err(internal_error)?;
161
162            let raw_block = inner
163                .files
164                .retrieve(item)
165                .map_err(internal_error)?
166                .expect("frozen number sync with files");
167            let block = packed::BlockReader::from_compatible_slice(&raw_block)
168                .map_err(internal_error)?
169                .to_entity();
170            if block.count_extra_fields() > 1 {
171                return Err(internal_error("block has more than one extra fields"));
172            }
173            inner.tip = Some(block.header().into_view());
174        }
175        Ok(())
176    }
177}