1use crate::freezer_files::FreezerFiles;
2use crate::internal_error;
3use ckb_error::Error;
4use ckb_types::{
5 core::{BlockNumber, BlockView, HeaderView},
6 packed,
7 prelude::*,
8};
9use ckb_util::Mutex;
10use fs2::FileExt;
11use std::collections::BTreeMap;
12use std::fs::{File, OpenOptions};
13use std::path::{Path, PathBuf};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16
17const LOCKNAME: &str = "FLOCK";
18
19type FreezeResult = BTreeMap<packed::Byte32, (BlockNumber, u32)>;
22
23struct Inner {
24 pub(crate) files: FreezerFiles,
25 pub(crate) tip: Option<HeaderView>,
26}
27
28#[derive(Clone)]
30pub struct Freezer {
31 inner: Arc<Mutex<Inner>>,
32 number: Arc<AtomicU64>,
33 pub stopped: Arc<AtomicBool>,
35 pub(crate) _lock: Arc<File>,
37}
38
39impl Freezer {
40 pub fn open(path: PathBuf) -> Result<Freezer, Error> {
42 let lock_path = path.join(LOCKNAME);
43 let lock = OpenOptions::new()
44 .write(true)
45 .create(true)
46 .truncate(false)
47 .open(lock_path)
48 .map_err(internal_error)?;
49 lock.try_lock_exclusive().map_err(internal_error)?;
50 let mut files = FreezerFiles::open(path).map_err(internal_error)?;
51 let freezer_number = files.number();
52
53 let mut tip = None;
54 if freezer_number > 1 {
55 let raw_block = files
56 .retrieve(freezer_number - 1)
57 .map_err(internal_error)?
58 .ok_or_else(|| internal_error("freezer inconsistent"))?;
59 let block = packed::BlockReader::from_compatible_slice(&raw_block)
60 .map_err(internal_error)?
61 .to_entity();
62 if block.count_extra_fields() > 1 {
63 return Err(internal_error("block has more than one extra fields"));
64 }
65 tip = Some(block.header().into_view());
66 }
67
68 let inner = Inner { files, tip };
69 Ok(Freezer {
70 number: Arc::clone(&inner.files.number),
71 inner: Arc::new(Mutex::new(inner)),
72 stopped: Arc::new(AtomicBool::new(false)),
73 _lock: Arc::new(lock),
74 })
75 }
76
77 pub fn open_in<P: AsRef<Path>>(path: P) -> Result<Freezer, Error> {
79 Self::open(path.as_ref().to_path_buf())
80 }
81
82 pub fn freeze<F>(
85 &self,
86 threshold: BlockNumber,
87 get_block_by_number: F,
88 ) -> Result<FreezeResult, Error>
89 where
90 F: Fn(BlockNumber) -> Option<BlockView>,
91 {
92 let number = self.number();
93 let mut guard = self.inner.lock();
94 let mut ret = BTreeMap::new();
95 ckb_logger::trace!(
96 "Freezer process initiated, starting from {}, threshold {}",
97 number,
98 threshold
99 );
100
101 for number in number..threshold {
102 if self.stopped.load(Ordering::SeqCst) {
103 guard.files.sync_all().map_err(internal_error)?;
104 return Ok(ret);
105 }
106
107 if let Some(block) = get_block_by_number(number) {
108 if let Some(ref header) = guard.tip {
109 if header.hash() != block.header().parent_hash() {
110 return Err(internal_error(format!(
111 "appending unexpected block expected parent_hash {} have {}",
112 header.hash(),
113 block.header().parent_hash()
114 )));
115 }
116 }
117 let raw_block = block.data();
118 guard
119 .files
120 .append(number, raw_block.as_slice())
121 .map_err(internal_error)?;
122
123 ret.insert(
124 block.header().hash(),
125 (number, block.transactions().len() as u32),
126 );
127 guard.tip = Some(block.header());
128 ckb_logger::trace!("Freezer block append {}", number);
129
130 if let Some(metrics) = ckb_metrics::handle() {
131 metrics.ckb_freezer_number.set(number as i64);
132 }
133 } else {
134 ckb_logger::error!("Freezer block missing {}", number);
135 break;
136 }
137 }
138 guard.files.sync_all().map_err(internal_error)?;
139 Ok(ret)
140 }
141
142 pub fn retrieve(&self, number: BlockNumber) -> Result<Option<Vec<u8>>, Error> {
144 self.inner
145 .lock()
146 .files
147 .retrieve(number)
148 .map_err(internal_error)
149 }
150
151 pub fn number(&self) -> BlockNumber {
153 self.number.load(Ordering::SeqCst)
154 }
155
156 pub fn truncate(&self, item: u64) -> Result<(), Error> {
158 if item > 0 && ((item + 1) < self.number()) {
159 let mut inner = self.inner.lock();
160 inner.files.truncate(item).map_err(internal_error)?;
161
162 let raw_block = inner
163 .files
164 .retrieve(item)
165 .map_err(internal_error)?
166 .expect("frozen number sync with files");
167 let block = packed::BlockReader::from_compatible_slice(&raw_block)
168 .map_err(internal_error)?
169 .to_entity();
170 if block.count_extra_fields() > 1 {
171 return Err(internal_error("block has more than one extra fields"));
172 }
173 inner.tip = Some(block.header().into_view());
174 }
175 Ok(())
176 }
177}