ckb_freezer/
freezer_files.rs

1use fail::fail_point;
2use lru::LruCache;
3use snap::raw::{Decoder as SnappyDecoder, Encoder as SnappyEncoder};
4use std::fs::{self, File};
5use std::io::{Error as IoError, ErrorKind as IoErrorKind};
6use std::io::{Read, Write};
7use std::io::{Seek, SeekFrom};
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12const MAX_FILE_SIZE: u64 = 2 * 1_000 * 1_000 * 1_000; // 2G
13const OPEN_FILES_LIMIT: usize = 256;
14const INDEX_FILE_NAME: &str = "INDEX";
15pub(crate) const INDEX_ENTRY_SIZE: u64 = 12;
16
17/// File id alias
18pub type FileId = u32;
19
20pub(crate) struct Head {
21    pub(crate) file: File,
22    // number of bytes written to the head file
23    pub(crate) bytes: u64,
24}
25
26impl Head {
27    pub fn new(file: File, bytes: u64) -> Self {
28        Head { file, bytes }
29    }
30
31    pub fn write(&mut self, data: &[u8]) -> Result<(), IoError> {
32        fail_point!("write-head");
33        self.file.write_all(data)?;
34        self.bytes += data.len() as u64;
35        Ok(())
36    }
37}
38
39/// FreezerFiles represents a single chained block data,
40/// it consists of a data file and an index file
41pub struct FreezerFiles {
42    // opened files
43    pub(crate) files: LruCache<FileId, File>,
44    // head file
45    pub(crate) head: Head,
46    // number of frozen
47    pub(crate) number: Arc<AtomicU64>,
48    // max size for data-files
49    max_size: u64,
50    // number of the earliest file
51    pub(crate) tail_id: FileId,
52    // number of the currently active head file
53    pub(crate) head_id: FileId,
54    // data file path
55    file_path: PathBuf,
56    // index for freezer files
57    pub(crate) index: File,
58    // enable compression
59    pub(crate) enable_compression: bool,
60}
61
62/// An instance of IndexEntry represents an entry inside of a index files
63#[derive(Default)]
64pub struct IndexEntry {
65    pub file_id: FileId,
66    pub offset: u64,
67}
68
69impl IndexEntry {
70    /// Encodes this entry into the provided byte buffer
71    pub fn encode(&self) -> Vec<u8> {
72        fail_point!("IndexEntry encode");
73        let mut bytes = Vec::with_capacity(INDEX_ENTRY_SIZE as usize);
74        bytes.extend_from_slice(&self.file_id.to_le_bytes());
75        bytes.extend_from_slice(&self.offset.to_le_bytes());
76        bytes
77    }
78
79    /// Decode entry from the provided bytes
80    pub fn decode(raw: &[u8]) -> Result<Self, IoError> {
81        fail_point!("IndexEntry decode");
82        debug_assert!(raw.len() == INDEX_ENTRY_SIZE as usize);
83        let (raw_file_id, raw_offset) = raw.split_at(::std::mem::size_of::<u32>());
84        let file_id = u32::from_le_bytes(
85            raw_file_id
86                .try_into()
87                .map_err(|e| IoError::new(IoErrorKind::Other, format!("decode file_id {e}")))?,
88        );
89        let offset = u64::from_le_bytes(
90            raw_offset
91                .try_into()
92                .map_err(|e| IoError::new(IoErrorKind::Other, format!("decode offset {e}")))?,
93        );
94        Ok(IndexEntry { offset, file_id })
95    }
96}
97
98impl FreezerFiles {
99    /// Opens freezer files at path.
100    pub fn open(file_path: PathBuf) -> Result<FreezerFiles, IoError> {
101        let mut files = FreezerFilesBuilder::new(file_path).build()?;
102        files.preopen()?;
103        Ok(files)
104    }
105
106    /// Return frozen item number
107    #[inline]
108    pub fn number(&self) -> u64 {
109        self.number.load(Ordering::SeqCst)
110    }
111
112    /// Append item into freezer files
113    pub fn append(&mut self, number: u64, input: &[u8]) -> Result<(), IoError> {
114        let expected = self.number.load(Ordering::SeqCst);
115        fail_point!("append-unexpected-number");
116        if expected != number {
117            return Err(IoError::new(
118                IoErrorKind::Other,
119                format!("appending unexpected block expected {expected} have {number}"),
120            ));
121        }
122
123        // https://github.com/rust-lang/rust/issues/49171
124        #[allow(unused_mut)]
125        let mut compressed_data;
126        let mut data = input;
127        if self.enable_compression {
128            compressed_data = SnappyEncoder::new()
129                .compress_vec(data)
130                .map_err(|e| IoError::new(IoErrorKind::Other, format!("compress error {e}")))?;
131            data = &compressed_data;
132        };
133
134        let data_size = data.len();
135        // open a new file
136        if self.head.bytes + data_size as u64 > self.max_size {
137            let head_id = self.head_id;
138            let next_id = head_id + 1;
139            let new_head_file = self.open_truncated(next_id)?;
140
141            // release old head, reopen with read only
142            self.release(head_id);
143            self.open_read_only(head_id)?;
144
145            self.head_id = next_id;
146            self.head = Head::new(new_head_file, 0);
147        }
148
149        self.head.write(data)?;
150        self.write_index(self.head_id, self.head.bytes)?;
151        self.number.fetch_add(1, Ordering::SeqCst);
152
153        if let Some(metrics) = ckb_metrics::handle() {
154            metrics
155                .ckb_freezer_size
156                .set(data_size as i64 + INDEX_ENTRY_SIZE as i64);
157        }
158        Ok(())
159    }
160
161    /// Attempts to sync all OS-internal metadata to disk.
162    pub fn sync_all(&self) -> Result<(), IoError> {
163        self.head.file.sync_all()?;
164        self.index.sync_all()?;
165        Ok(())
166    }
167
168    /// Retrieve frozen item by number
169    pub fn retrieve(&mut self, item: u64) -> Result<Option<Vec<u8>>, IoError> {
170        if item < 1 {
171            return Ok(None);
172        }
173        if self.number.load(Ordering::SeqCst) <= item {
174            return Ok(None);
175        }
176
177        let bounds = self.get_bounds(item)?;
178        if let Some((start_offset, end_offset, file_id)) = bounds {
179            let open_read_only;
180
181            let mut file = if let Some(file) = self.files.get(&file_id) {
182                file
183            } else {
184                open_read_only = self.open_read_only(file_id)?;
185                &open_read_only
186            };
187
188            let size = (end_offset - start_offset) as usize;
189            let mut data = vec![0u8; size];
190            file.seek(SeekFrom::Start(start_offset))?;
191            file.read_exact(&mut data)?;
192
193            if self.enable_compression {
194                data = SnappyDecoder::new().decompress_vec(&data).map_err(|e| {
195                    IoError::new(
196                        IoErrorKind::Other,
197                        format!(
198                            "decompress file-id-{file_id} offset-{start_offset} size-{size}: error {e}"
199                        ),
200                    )
201                })?;
202            }
203
204            if let Some(metrics) = ckb_metrics::handle() {
205                metrics
206                    .ckb_freezer_read
207                    .inc_by(size as u64 + 2 * INDEX_ENTRY_SIZE);
208            }
209            Ok(Some(data))
210        } else {
211            Ok(None)
212        }
213    }
214
215    fn get_bounds(&self, item: u64) -> Result<Option<(u64, u64, FileId)>, IoError> {
216        let mut buffer = [0; INDEX_ENTRY_SIZE as usize];
217        let mut index = &self.index;
218        if let Err(e) = index.seek(SeekFrom::Start(item * INDEX_ENTRY_SIZE)) {
219            ckb_logger::trace!("Freezer get_bounds seek {} {}", item * INDEX_ENTRY_SIZE, e);
220            return Ok(None);
221        }
222
223        if let Err(e) = index.read_exact(&mut buffer) {
224            ckb_logger::trace!("Freezer get_bounds read_exact {}", e);
225            return Ok(None);
226        }
227        let end_index = IndexEntry::decode(&buffer)?;
228        if item == 1 {
229            return Ok(Some((0, end_index.offset, end_index.file_id)));
230        }
231
232        if let Err(e) = index.seek(SeekFrom::Start((item - 1) * INDEX_ENTRY_SIZE)) {
233            ckb_logger::trace!(
234                "Freezer get_bounds seek {} {}",
235                (item - 1) * INDEX_ENTRY_SIZE,
236                e
237            );
238            return Ok(None);
239        }
240        if let Err(e) = index.read_exact(&mut buffer) {
241            ckb_logger::trace!("Freezer get_bounds read_exact {}", e);
242            return Ok(None);
243        }
244        let start_index = IndexEntry::decode(&buffer)?;
245        if start_index.file_id != end_index.file_id {
246            return Ok(Some((0, end_index.offset, end_index.file_id)));
247        }
248
249        Ok(Some((
250            start_index.offset,
251            end_index.offset,
252            end_index.file_id,
253        )))
254    }
255
256    /// keeping the provided threshold number item and dropping the rest.
257    pub fn truncate(&mut self, item: u64) -> Result<(), IoError> {
258        // out of bound, this has no effect.
259        if item < 1 || ((item + 1) >= self.number()) {
260            return Ok(());
261        }
262        ckb_logger::trace!("Freezer truncate items {}", item);
263
264        let mut buffer = [0; INDEX_ENTRY_SIZE as usize];
265        // truncate the index
266        helper::truncate_file(&mut self.index, (item + 1) * INDEX_ENTRY_SIZE)?;
267        self.index.seek(SeekFrom::Start(item * INDEX_ENTRY_SIZE))?;
268        self.index.read_exact(&mut buffer)?;
269        let new_index = IndexEntry::decode(&buffer)?;
270
271        // truncate files
272        if new_index.file_id != self.head_id {
273            self.release(new_index.file_id);
274            let (new_head_file, offset) = self.open_append(new_index.file_id)?;
275
276            self.delete_after(new_index.file_id)?;
277
278            self.head_id = new_index.file_id;
279            self.head = Head::new(new_head_file, offset);
280        }
281        helper::truncate_file(&mut self.head.file, new_index.offset)?;
282        self.head.bytes = new_index.offset;
283        self.number.store(item + 1, Ordering::SeqCst);
284        Ok(())
285    }
286
287    /// Attempts to open files, initialize fd map
288    pub fn preopen(&mut self) -> Result<(), IoError> {
289        self.release_all();
290
291        for id in self.tail_id..self.head_id {
292            self.open_read_only(id)?;
293        }
294        self.files.put(self.head_id, self.head.file.try_clone()?);
295        Ok(())
296    }
297
298    fn write_index(&mut self, file_id: FileId, offset: u64) -> Result<(), IoError> {
299        fail_point!("write-index");
300        let index = IndexEntry { file_id, offset };
301        self.index.seek(SeekFrom::End(0))?;
302        self.index.write_all(&index.encode())?;
303        Ok(())
304    }
305
306    fn release(&mut self, id: FileId) {
307        self.files.pop(&id);
308    }
309
310    fn release_all(&mut self) {
311        self.files.clear();
312    }
313
314    fn delete_after(&mut self, id: FileId) -> Result<(), IoError> {
315        let released: Vec<_> = self
316            .files
317            .iter()
318            .filter_map(|(k, _)| if k > &id { Some(k) } else { None })
319            .copied()
320            .collect();
321        for k in released.iter() {
322            self.files.pop(k);
323        }
324        self.delete_files_by_id(released.into_iter())
325    }
326
327    fn delete_files_by_id(&self, file_ids: impl Iterator<Item = FileId>) -> Result<(), IoError> {
328        for file_id in file_ids {
329            let path = self.file_path.join(helper::file_name(file_id));
330            fs::remove_file(path)?;
331        }
332        Ok(())
333    }
334
335    fn open_read_only(&mut self, id: FileId) -> Result<File, IoError> {
336        fail_point!("open_read_only");
337        let mut opt = fs::OpenOptions::new();
338        opt.read(true);
339        self.open_file(id, opt)
340    }
341
342    fn open_truncated(&mut self, id: FileId) -> Result<File, IoError> {
343        fail_point!("open_truncated");
344        let mut opt = fs::OpenOptions::new();
345        opt.create(true).read(true).write(true).truncate(true);
346        self.open_file(id, opt)
347    }
348
349    fn open_append(&mut self, id: FileId) -> Result<(File, u64), IoError> {
350        fail_point!("open_append");
351        let mut opt = fs::OpenOptions::new();
352        opt.create(true).read(true).write(true);
353        let mut file = self.open_file(id, opt)?;
354        let offset = file.seek(SeekFrom::End(0))?;
355        Ok((file, offset))
356    }
357
358    fn open_file(&mut self, id: FileId, opt: fs::OpenOptions) -> Result<File, IoError> {
359        let name = helper::file_name(id);
360        let file = opt.open(self.file_path.join(name))?;
361        self.files.put(id, file.try_clone()?);
362        Ok(file)
363    }
364}
365
366/// Freezer factory, which can be used in order to configure the properties of a new freezer.
367pub struct FreezerFilesBuilder {
368    file_path: PathBuf,
369    max_file_size: u64,
370    enable_compression: bool,
371    open_files_limit: usize,
372}
373
374impl FreezerFilesBuilder {
375    /// Generates the base configuration for a new freezer instance
376    pub fn new(file_path: PathBuf) -> Self {
377        FreezerFilesBuilder {
378            file_path,
379            max_file_size: MAX_FILE_SIZE,
380            enable_compression: true,
381            open_files_limit: OPEN_FILES_LIMIT,
382        }
383    }
384
385    /// Sets the max size of the file (in bytes) for the new freezer.
386    #[allow(dead_code)]
387    pub fn max_file_size(mut self, size: u64) -> Self {
388        self.max_file_size = size;
389        self
390    }
391
392    /// Sets the limit of opened files for the new freezer.
393    ///
394    /// # Panics
395    ///
396    /// Panics when `limit <= 1`, meaning freezer must open at least 2 files.
397    #[allow(dead_code)]
398    pub fn open_files_limit(mut self, limit: usize) -> Self {
399        assert!(limit > 1);
400        self.open_files_limit = limit;
401        self
402    }
403
404    /// Sets the compression enable for the new freezer.
405    #[allow(dead_code)]
406    pub fn enable_compression(mut self, enable_compression: bool) -> Self {
407        self.enable_compression = enable_compression;
408        self
409    }
410
411    /// Creates the freezer with the options configured in this builder.
412    pub fn build(self) -> Result<FreezerFiles, IoError> {
413        fs::create_dir_all(&self.file_path)?;
414        let (mut index, mut index_size) = self.open_index()?;
415
416        let mut buffer = [0; INDEX_ENTRY_SIZE as usize];
417        index.rewind()?;
418        index.read_exact(&mut buffer)?;
419        let tail_index = IndexEntry::decode(&buffer)?;
420        let tail_id = tail_index.file_id;
421
422        index.seek(SeekFrom::Start(index_size - INDEX_ENTRY_SIZE))?;
423        index.read_exact(&mut buffer)?;
424
425        ckb_logger::debug!("Freezer index_size {} head {:?}", index_size, buffer);
426
427        let mut head_index = IndexEntry::decode(&buffer)?;
428        let head_file_name = helper::file_name(head_index.file_id);
429        let (mut head, mut head_size) = self.open_append(self.file_path.join(head_file_name))?;
430        let mut expect_head_size = head_index.offset;
431
432        // try repair cross checks the head and the index file and truncates them to
433        // be in sync with each other after a potential crash/data loss.
434        while expect_head_size != head_size {
435            // truncate the head file to the last offset
436            if expect_head_size < head_size {
437                ckb_logger::warn!(
438                    "Truncating dangling head {} {}",
439                    head_size,
440                    expect_head_size,
441                );
442                helper::truncate_file(&mut head, expect_head_size)?;
443                head_size = expect_head_size;
444            }
445
446            // truncate the index to matching the head file
447            if expect_head_size > head_size {
448                ckb_logger::warn!(
449                    "Truncating dangling indexes {} {}",
450                    head_size,
451                    expect_head_size,
452                );
453                helper::truncate_file(&mut index, index_size - INDEX_ENTRY_SIZE)?;
454                index_size -= INDEX_ENTRY_SIZE;
455
456                index.seek(SeekFrom::Start(index_size - INDEX_ENTRY_SIZE))?;
457                index.read_exact(&mut buffer)?;
458                let new_index = IndexEntry::decode(&buffer)?;
459
460                // slipped back into an earlier head-file
461                if new_index.file_id != head_index.file_id {
462                    let head_file_name = helper::file_name(head_index.file_id);
463                    let (new_head, size) = self.open_append(self.file_path.join(head_file_name))?;
464                    head = new_head;
465                    head_size = size;
466                }
467                expect_head_size = new_index.offset;
468                head_index = new_index;
469            }
470        }
471
472        // ensure flush to disk
473        head.sync_all()?;
474        index.sync_all()?;
475
476        let number = index_size / INDEX_ENTRY_SIZE;
477
478        Ok(FreezerFiles {
479            files: LruCache::new(self.open_files_limit),
480            head: Head::new(head, head_size),
481            tail_id,
482            number: Arc::new(AtomicU64::new(number)),
483            max_size: self.max_file_size,
484            head_id: head_index.file_id,
485            file_path: self.file_path,
486            index,
487            enable_compression: self.enable_compression,
488        })
489    }
490
491    // Open the file without append mode
492    // If a file is opened with both read and append access,
493    // after opening, and after every write,
494    // the position for reading may be set at the end of the file.
495    // it has differing behaviour on different OS
496    fn open_append<P: AsRef<Path>>(&self, path: P) -> Result<(File, u64), IoError> {
497        let mut file = fs::OpenOptions::new()
498            .create(true)
499            .truncate(false)
500            .read(true)
501            .write(true)
502            .open(path)?;
503        let offset = file.seek(SeekFrom::End(0))?;
504        Ok((file, offset))
505    }
506
507    fn open_index(&self) -> Result<(File, u64), IoError> {
508        let (mut index, mut size) = self.open_append(self.file_path.join(INDEX_FILE_NAME))?;
509        // fill a default entry within empty index
510        if size == 0 {
511            index.write_all(&IndexEntry::default().encode())?;
512            size += INDEX_ENTRY_SIZE;
513        }
514
515        // ensure the index is a multiple of INDEX_ENTRY_SIZE bytes
516        let tail = size % INDEX_ENTRY_SIZE;
517        if (tail != 0) && (size != 0) {
518            size -= tail;
519            helper::truncate_file(&mut index, size)?;
520        }
521        Ok((index, size))
522    }
523}
524
525pub(crate) mod helper {
526    use super::*;
527
528    pub(crate) fn truncate_file(file: &mut File, size: u64) -> Result<(), IoError> {
529        file.set_len(size)?;
530        file.seek(SeekFrom::End(0))?;
531        Ok(())
532    }
533
534    #[inline]
535    pub(crate) fn file_name(file_id: FileId) -> String {
536        format!("blk{file_id:06}")
537    }
538}