1use fail::fail_point;
2use lru::LruCache;
3use snap::raw::{Decoder as SnappyDecoder, Encoder as SnappyEncoder};
4use std::fs::{self, File};
5use std::io::{Error as IoError, ErrorKind as IoErrorKind};
6use std::io::{Read, Write};
7use std::io::{Seek, SeekFrom};
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12const MAX_FILE_SIZE: u64 = 2 * 1_000 * 1_000 * 1_000; const OPEN_FILES_LIMIT: usize = 256;
14const INDEX_FILE_NAME: &str = "INDEX";
15pub(crate) const INDEX_ENTRY_SIZE: u64 = 12;
16
17pub type FileId = u32;
19
20pub(crate) struct Head {
21 pub(crate) file: File,
22 pub(crate) bytes: u64,
24}
25
26impl Head {
27 pub fn new(file: File, bytes: u64) -> Self {
28 Head { file, bytes }
29 }
30
31 pub fn write(&mut self, data: &[u8]) -> Result<(), IoError> {
32 fail_point!("write-head");
33 self.file.write_all(data)?;
34 self.bytes += data.len() as u64;
35 Ok(())
36 }
37}
38
39pub struct FreezerFiles {
42 pub(crate) files: LruCache<FileId, File>,
44 pub(crate) head: Head,
46 pub(crate) number: Arc<AtomicU64>,
48 max_size: u64,
50 pub(crate) tail_id: FileId,
52 pub(crate) head_id: FileId,
54 file_path: PathBuf,
56 pub(crate) index: File,
58 pub(crate) enable_compression: bool,
60}
61
62#[derive(Default)]
64pub struct IndexEntry {
65 pub file_id: FileId,
66 pub offset: u64,
67}
68
69impl IndexEntry {
70 pub fn encode(&self) -> Vec<u8> {
72 fail_point!("IndexEntry encode");
73 let mut bytes = Vec::with_capacity(INDEX_ENTRY_SIZE as usize);
74 bytes.extend_from_slice(&self.file_id.to_le_bytes());
75 bytes.extend_from_slice(&self.offset.to_le_bytes());
76 bytes
77 }
78
79 pub fn decode(raw: &[u8]) -> Result<Self, IoError> {
81 fail_point!("IndexEntry decode");
82 debug_assert!(raw.len() == INDEX_ENTRY_SIZE as usize);
83 let (raw_file_id, raw_offset) = raw.split_at(::std::mem::size_of::<u32>());
84 let file_id = u32::from_le_bytes(
85 raw_file_id
86 .try_into()
87 .map_err(|e| IoError::new(IoErrorKind::Other, format!("decode file_id {e}")))?,
88 );
89 let offset = u64::from_le_bytes(
90 raw_offset
91 .try_into()
92 .map_err(|e| IoError::new(IoErrorKind::Other, format!("decode offset {e}")))?,
93 );
94 Ok(IndexEntry { offset, file_id })
95 }
96}
97
98impl FreezerFiles {
99 pub fn open(file_path: PathBuf) -> Result<FreezerFiles, IoError> {
101 let mut files = FreezerFilesBuilder::new(file_path).build()?;
102 files.preopen()?;
103 Ok(files)
104 }
105
106 #[inline]
108 pub fn number(&self) -> u64 {
109 self.number.load(Ordering::SeqCst)
110 }
111
112 pub fn append(&mut self, number: u64, input: &[u8]) -> Result<(), IoError> {
114 let expected = self.number.load(Ordering::SeqCst);
115 fail_point!("append-unexpected-number");
116 if expected != number {
117 return Err(IoError::new(
118 IoErrorKind::Other,
119 format!("appending unexpected block expected {expected} have {number}"),
120 ));
121 }
122
123 #[allow(unused_mut)]
125 let mut compressed_data;
126 let mut data = input;
127 if self.enable_compression {
128 compressed_data = SnappyEncoder::new()
129 .compress_vec(data)
130 .map_err(|e| IoError::new(IoErrorKind::Other, format!("compress error {e}")))?;
131 data = &compressed_data;
132 };
133
134 let data_size = data.len();
135 if self.head.bytes + data_size as u64 > self.max_size {
137 let head_id = self.head_id;
138 let next_id = head_id + 1;
139 let new_head_file = self.open_truncated(next_id)?;
140
141 self.release(head_id);
143 self.open_read_only(head_id)?;
144
145 self.head_id = next_id;
146 self.head = Head::new(new_head_file, 0);
147 }
148
149 self.head.write(data)?;
150 self.write_index(self.head_id, self.head.bytes)?;
151 self.number.fetch_add(1, Ordering::SeqCst);
152
153 if let Some(metrics) = ckb_metrics::handle() {
154 metrics
155 .ckb_freezer_size
156 .set(data_size as i64 + INDEX_ENTRY_SIZE as i64);
157 }
158 Ok(())
159 }
160
161 pub fn sync_all(&self) -> Result<(), IoError> {
163 self.head.file.sync_all()?;
164 self.index.sync_all()?;
165 Ok(())
166 }
167
168 pub fn retrieve(&mut self, item: u64) -> Result<Option<Vec<u8>>, IoError> {
170 if item < 1 {
171 return Ok(None);
172 }
173 if self.number.load(Ordering::SeqCst) <= item {
174 return Ok(None);
175 }
176
177 let bounds = self.get_bounds(item)?;
178 if let Some((start_offset, end_offset, file_id)) = bounds {
179 let open_read_only;
180
181 let mut file = if let Some(file) = self.files.get(&file_id) {
182 file
183 } else {
184 open_read_only = self.open_read_only(file_id)?;
185 &open_read_only
186 };
187
188 let size = (end_offset - start_offset) as usize;
189 let mut data = vec![0u8; size];
190 file.seek(SeekFrom::Start(start_offset))?;
191 file.read_exact(&mut data)?;
192
193 if self.enable_compression {
194 data = SnappyDecoder::new().decompress_vec(&data).map_err(|e| {
195 IoError::new(
196 IoErrorKind::Other,
197 format!(
198 "decompress file-id-{file_id} offset-{start_offset} size-{size}: error {e}"
199 ),
200 )
201 })?;
202 }
203
204 if let Some(metrics) = ckb_metrics::handle() {
205 metrics
206 .ckb_freezer_read
207 .inc_by(size as u64 + 2 * INDEX_ENTRY_SIZE);
208 }
209 Ok(Some(data))
210 } else {
211 Ok(None)
212 }
213 }
214
215 fn get_bounds(&self, item: u64) -> Result<Option<(u64, u64, FileId)>, IoError> {
216 let mut buffer = [0; INDEX_ENTRY_SIZE as usize];
217 let mut index = &self.index;
218 if let Err(e) = index.seek(SeekFrom::Start(item * INDEX_ENTRY_SIZE)) {
219 ckb_logger::trace!("Freezer get_bounds seek {} {}", item * INDEX_ENTRY_SIZE, e);
220 return Ok(None);
221 }
222
223 if let Err(e) = index.read_exact(&mut buffer) {
224 ckb_logger::trace!("Freezer get_bounds read_exact {}", e);
225 return Ok(None);
226 }
227 let end_index = IndexEntry::decode(&buffer)?;
228 if item == 1 {
229 return Ok(Some((0, end_index.offset, end_index.file_id)));
230 }
231
232 if let Err(e) = index.seek(SeekFrom::Start((item - 1) * INDEX_ENTRY_SIZE)) {
233 ckb_logger::trace!(
234 "Freezer get_bounds seek {} {}",
235 (item - 1) * INDEX_ENTRY_SIZE,
236 e
237 );
238 return Ok(None);
239 }
240 if let Err(e) = index.read_exact(&mut buffer) {
241 ckb_logger::trace!("Freezer get_bounds read_exact {}", e);
242 return Ok(None);
243 }
244 let start_index = IndexEntry::decode(&buffer)?;
245 if start_index.file_id != end_index.file_id {
246 return Ok(Some((0, end_index.offset, end_index.file_id)));
247 }
248
249 Ok(Some((
250 start_index.offset,
251 end_index.offset,
252 end_index.file_id,
253 )))
254 }
255
256 pub fn truncate(&mut self, item: u64) -> Result<(), IoError> {
258 if item < 1 || ((item + 1) >= self.number()) {
260 return Ok(());
261 }
262 ckb_logger::trace!("Freezer truncate items {}", item);
263
264 let mut buffer = [0; INDEX_ENTRY_SIZE as usize];
265 helper::truncate_file(&mut self.index, (item + 1) * INDEX_ENTRY_SIZE)?;
267 self.index.seek(SeekFrom::Start(item * INDEX_ENTRY_SIZE))?;
268 self.index.read_exact(&mut buffer)?;
269 let new_index = IndexEntry::decode(&buffer)?;
270
271 if new_index.file_id != self.head_id {
273 self.release(new_index.file_id);
274 let (new_head_file, offset) = self.open_append(new_index.file_id)?;
275
276 self.delete_after(new_index.file_id)?;
277
278 self.head_id = new_index.file_id;
279 self.head = Head::new(new_head_file, offset);
280 }
281 helper::truncate_file(&mut self.head.file, new_index.offset)?;
282 self.head.bytes = new_index.offset;
283 self.number.store(item + 1, Ordering::SeqCst);
284 Ok(())
285 }
286
287 pub fn preopen(&mut self) -> Result<(), IoError> {
289 self.release_all();
290
291 for id in self.tail_id..self.head_id {
292 self.open_read_only(id)?;
293 }
294 self.files.put(self.head_id, self.head.file.try_clone()?);
295 Ok(())
296 }
297
298 fn write_index(&mut self, file_id: FileId, offset: u64) -> Result<(), IoError> {
299 fail_point!("write-index");
300 let index = IndexEntry { file_id, offset };
301 self.index.seek(SeekFrom::End(0))?;
302 self.index.write_all(&index.encode())?;
303 Ok(())
304 }
305
306 fn release(&mut self, id: FileId) {
307 self.files.pop(&id);
308 }
309
310 fn release_all(&mut self) {
311 self.files.clear();
312 }
313
314 fn delete_after(&mut self, id: FileId) -> Result<(), IoError> {
315 let released: Vec<_> = self
316 .files
317 .iter()
318 .filter_map(|(k, _)| if k > &id { Some(k) } else { None })
319 .copied()
320 .collect();
321 for k in released.iter() {
322 self.files.pop(k);
323 }
324 self.delete_files_by_id(released.into_iter())
325 }
326
327 fn delete_files_by_id(&self, file_ids: impl Iterator<Item = FileId>) -> Result<(), IoError> {
328 for file_id in file_ids {
329 let path = self.file_path.join(helper::file_name(file_id));
330 fs::remove_file(path)?;
331 }
332 Ok(())
333 }
334
335 fn open_read_only(&mut self, id: FileId) -> Result<File, IoError> {
336 fail_point!("open_read_only");
337 let mut opt = fs::OpenOptions::new();
338 opt.read(true);
339 self.open_file(id, opt)
340 }
341
342 fn open_truncated(&mut self, id: FileId) -> Result<File, IoError> {
343 fail_point!("open_truncated");
344 let mut opt = fs::OpenOptions::new();
345 opt.create(true).read(true).write(true).truncate(true);
346 self.open_file(id, opt)
347 }
348
349 fn open_append(&mut self, id: FileId) -> Result<(File, u64), IoError> {
350 fail_point!("open_append");
351 let mut opt = fs::OpenOptions::new();
352 opt.create(true).read(true).write(true);
353 let mut file = self.open_file(id, opt)?;
354 let offset = file.seek(SeekFrom::End(0))?;
355 Ok((file, offset))
356 }
357
358 fn open_file(&mut self, id: FileId, opt: fs::OpenOptions) -> Result<File, IoError> {
359 let name = helper::file_name(id);
360 let file = opt.open(self.file_path.join(name))?;
361 self.files.put(id, file.try_clone()?);
362 Ok(file)
363 }
364}
365
366pub struct FreezerFilesBuilder {
368 file_path: PathBuf,
369 max_file_size: u64,
370 enable_compression: bool,
371 open_files_limit: usize,
372}
373
374impl FreezerFilesBuilder {
375 pub fn new(file_path: PathBuf) -> Self {
377 FreezerFilesBuilder {
378 file_path,
379 max_file_size: MAX_FILE_SIZE,
380 enable_compression: true,
381 open_files_limit: OPEN_FILES_LIMIT,
382 }
383 }
384
385 #[allow(dead_code)]
387 pub fn max_file_size(mut self, size: u64) -> Self {
388 self.max_file_size = size;
389 self
390 }
391
392 #[allow(dead_code)]
398 pub fn open_files_limit(mut self, limit: usize) -> Self {
399 assert!(limit > 1);
400 self.open_files_limit = limit;
401 self
402 }
403
404 #[allow(dead_code)]
406 pub fn enable_compression(mut self, enable_compression: bool) -> Self {
407 self.enable_compression = enable_compression;
408 self
409 }
410
411 pub fn build(self) -> Result<FreezerFiles, IoError> {
413 fs::create_dir_all(&self.file_path)?;
414 let (mut index, mut index_size) = self.open_index()?;
415
416 let mut buffer = [0; INDEX_ENTRY_SIZE as usize];
417 index.rewind()?;
418 index.read_exact(&mut buffer)?;
419 let tail_index = IndexEntry::decode(&buffer)?;
420 let tail_id = tail_index.file_id;
421
422 index.seek(SeekFrom::Start(index_size - INDEX_ENTRY_SIZE))?;
423 index.read_exact(&mut buffer)?;
424
425 ckb_logger::debug!("Freezer index_size {} head {:?}", index_size, buffer);
426
427 let mut head_index = IndexEntry::decode(&buffer)?;
428 let head_file_name = helper::file_name(head_index.file_id);
429 let (mut head, mut head_size) = self.open_append(self.file_path.join(head_file_name))?;
430 let mut expect_head_size = head_index.offset;
431
432 while expect_head_size != head_size {
435 if expect_head_size < head_size {
437 ckb_logger::warn!(
438 "Truncating dangling head {} {}",
439 head_size,
440 expect_head_size,
441 );
442 helper::truncate_file(&mut head, expect_head_size)?;
443 head_size = expect_head_size;
444 }
445
446 if expect_head_size > head_size {
448 ckb_logger::warn!(
449 "Truncating dangling indexes {} {}",
450 head_size,
451 expect_head_size,
452 );
453 helper::truncate_file(&mut index, index_size - INDEX_ENTRY_SIZE)?;
454 index_size -= INDEX_ENTRY_SIZE;
455
456 index.seek(SeekFrom::Start(index_size - INDEX_ENTRY_SIZE))?;
457 index.read_exact(&mut buffer)?;
458 let new_index = IndexEntry::decode(&buffer)?;
459
460 if new_index.file_id != head_index.file_id {
462 let head_file_name = helper::file_name(head_index.file_id);
463 let (new_head, size) = self.open_append(self.file_path.join(head_file_name))?;
464 head = new_head;
465 head_size = size;
466 }
467 expect_head_size = new_index.offset;
468 head_index = new_index;
469 }
470 }
471
472 head.sync_all()?;
474 index.sync_all()?;
475
476 let number = index_size / INDEX_ENTRY_SIZE;
477
478 Ok(FreezerFiles {
479 files: LruCache::new(self.open_files_limit),
480 head: Head::new(head, head_size),
481 tail_id,
482 number: Arc::new(AtomicU64::new(number)),
483 max_size: self.max_file_size,
484 head_id: head_index.file_id,
485 file_path: self.file_path,
486 index,
487 enable_compression: self.enable_compression,
488 })
489 }
490
491 fn open_append<P: AsRef<Path>>(&self, path: P) -> Result<(File, u64), IoError> {
497 let mut file = fs::OpenOptions::new()
498 .create(true)
499 .truncate(false)
500 .read(true)
501 .write(true)
502 .open(path)?;
503 let offset = file.seek(SeekFrom::End(0))?;
504 Ok((file, offset))
505 }
506
507 fn open_index(&self) -> Result<(File, u64), IoError> {
508 let (mut index, mut size) = self.open_append(self.file_path.join(INDEX_FILE_NAME))?;
509 if size == 0 {
511 index.write_all(&IndexEntry::default().encode())?;
512 size += INDEX_ENTRY_SIZE;
513 }
514
515 let tail = size % INDEX_ENTRY_SIZE;
517 if (tail != 0) && (size != 0) {
518 size -= tail;
519 helper::truncate_file(&mut index, size)?;
520 }
521 Ok((index, size))
522 }
523}
524
525pub(crate) mod helper {
526 use super::*;
527
528 pub(crate) fn truncate_file(file: &mut File, size: u64) -> Result<(), IoError> {
529 file.set_len(size)?;
530 file.seek(SeekFrom::End(0))?;
531 Ok(())
532 }
533
534 #[inline]
535 pub(crate) fn file_name(file_id: FileId) -> String {
536 format!("blk{file_id:06}")
537 }
538}