fuel_core_chain_config/config/state/
writer.rs

1use crate::{
2    config::table_entry::TableEntry,
3    AddTable,
4    ChainConfig,
5    LastBlockConfig,
6    SnapshotMetadata,
7    StateConfigBuilder,
8    TableEncoding,
9};
10use fuel_core_storage::structured_storage::TableWithBlueprint;
11use std::path::PathBuf;
12
13#[cfg(feature = "parquet")]
14use super::parquet;
15
16enum EncoderType {
17    Json {
18        builder: StateConfigBuilder,
19    },
20    #[cfg(feature = "parquet")]
21    Parquet {
22        compression: ZstdCompressionLevel,
23        table_encoders: TableEncoders,
24    },
25}
26
27pub struct SnapshotWriter {
28    dir: PathBuf,
29    encoder: EncoderType,
30}
31
32#[allow(dead_code)]
33#[derive(
34    Debug,
35    Clone,
36    Copy,
37    PartialEq,
38    Eq,
39    PartialOrd,
40    Ord,
41    serde::Serialize,
42    serde::Deserialize,
43)]
44#[cfg(feature = "parquet")]
45#[cfg_attr(test, derive(strum::EnumIter))]
46pub enum ZstdCompressionLevel {
47    Uncompressed,
48    Level1,
49    Level2,
50    Level3,
51    Level4,
52    Level5,
53    Level6,
54    Level7,
55    Level8,
56    Level9,
57    Level10,
58    Level11,
59    Level12,
60    Level13,
61    Level14,
62    Level15,
63    Level16,
64    Level17,
65    Level18,
66    Level19,
67    Level20,
68    Level21,
69    Max,
70}
71
72#[cfg(feature = "parquet")]
73impl TryFrom<u8> for ZstdCompressionLevel {
74    type Error = anyhow::Error;
75    fn try_from(value: u8) -> Result<Self, Self::Error> {
76        match value {
77            0 => Ok(Self::Uncompressed),
78            1 => Ok(Self::Level1),
79            2 => Ok(Self::Level2),
80            3 => Ok(Self::Level3),
81            4 => Ok(Self::Level4),
82            5 => Ok(Self::Level5),
83            6 => Ok(Self::Level6),
84            7 => Ok(Self::Level7),
85            8 => Ok(Self::Level8),
86            9 => Ok(Self::Level9),
87            10 => Ok(Self::Level10),
88            11 => Ok(Self::Level11),
89            12 => Ok(Self::Level12),
90            13 => Ok(Self::Level13),
91            14 => Ok(Self::Level14),
92            15 => Ok(Self::Level15),
93            16 => Ok(Self::Level16),
94            17 => Ok(Self::Level17),
95            18 => Ok(Self::Level18),
96            19 => Ok(Self::Level19),
97            20 => Ok(Self::Level20),
98            21 => Ok(Self::Level21),
99            22 => Ok(Self::Max),
100            _ => {
101                anyhow::bail!("Compression level {value} outside of allowed range 0..=22")
102            }
103        }
104    }
105}
106
107#[cfg(feature = "parquet")]
108impl From<ZstdCompressionLevel> for u8 {
109    fn from(value: ZstdCompressionLevel) -> Self {
110        match value {
111            ZstdCompressionLevel::Uncompressed => 0,
112            ZstdCompressionLevel::Level1 => 1,
113            ZstdCompressionLevel::Level2 => 2,
114            ZstdCompressionLevel::Level3 => 3,
115            ZstdCompressionLevel::Level4 => 4,
116            ZstdCompressionLevel::Level5 => 5,
117            ZstdCompressionLevel::Level6 => 6,
118            ZstdCompressionLevel::Level7 => 7,
119            ZstdCompressionLevel::Level8 => 8,
120            ZstdCompressionLevel::Level9 => 9,
121            ZstdCompressionLevel::Level10 => 10,
122            ZstdCompressionLevel::Level11 => 11,
123            ZstdCompressionLevel::Level12 => 12,
124            ZstdCompressionLevel::Level13 => 13,
125            ZstdCompressionLevel::Level14 => 14,
126            ZstdCompressionLevel::Level15 => 15,
127            ZstdCompressionLevel::Level16 => 16,
128            ZstdCompressionLevel::Level17 => 17,
129            ZstdCompressionLevel::Level18 => 18,
130            ZstdCompressionLevel::Level19 => 19,
131            ZstdCompressionLevel::Level20 => 20,
132            ZstdCompressionLevel::Level21 => 21,
133            ZstdCompressionLevel::Max => 22,
134        }
135    }
136}
137
138#[cfg(feature = "parquet")]
139impl From<ZstdCompressionLevel> for ::parquet::basic::Compression {
140    fn from(value: ZstdCompressionLevel) -> Self {
141        if let ZstdCompressionLevel::Uncompressed = value {
142            Self::UNCOMPRESSED
143        } else {
144            let level = i32::from(u8::from(value));
145            let level = ::parquet::basic::ZstdLevel::try_new(level)
146                .expect("our range to mimic the parquet zstd range");
147            Self::ZSTD(level)
148        }
149    }
150}
151
152#[derive(Debug, Clone)]
153enum FragmentData {
154    Json {
155        builder: StateConfigBuilder,
156    },
157    #[cfg(feature = "parquet")]
158    Parquet {
159        tables: std::collections::HashMap<String, PathBuf>,
160        compression: ZstdCompressionLevel,
161    },
162}
163impl FragmentData {
164    fn merge(mut self, data: FragmentData) -> anyhow::Result<Self> {
165        match (&mut self, data) {
166            (
167                FragmentData::Json { builder, .. },
168                FragmentData::Json {
169                    builder: other_builder,
170                    ..
171                },
172            ) => {
173                builder.merge(other_builder);
174            }
175            #[cfg(feature = "parquet")]
176            (
177                FragmentData::Parquet {
178                    tables,
179                    compression,
180                },
181                FragmentData::Parquet {
182                    tables: their_tables,
183                    compression: their_compression,
184                },
185            ) => {
186                tables.extend(their_tables);
187                anyhow::ensure!(*compression == their_compression, "Fragments use different compressions.")
188            }
189            #[cfg(feature="parquet")]
190            (a,b) => anyhow::bail!("Fragments don't have the same encoding and cannot be merged. Fragments: {a:?} and {b:?}"),
191        };
192
193        Ok(self)
194    }
195}
196
197#[derive(Debug, Clone)]
198pub struct SnapshotFragment {
199    dir: PathBuf,
200    data: FragmentData,
201}
202
203impl SnapshotFragment {
204    pub fn merge(mut self, fragment: Self) -> anyhow::Result<Self> {
205        self.data = self.data.merge(fragment.data)?;
206        Ok(self)
207    }
208
209    pub fn finalize(
210        self,
211        latest_block_config: Option<LastBlockConfig>,
212        chain_config: &ChainConfig,
213    ) -> anyhow::Result<SnapshotMetadata> {
214        let table_encoding = match self.data {
215            FragmentData::Json { builder } => {
216                let state_config = builder.build(latest_block_config)?;
217                std::fs::create_dir_all(&self.dir)?;
218                let state_file_path = self.dir.join("state_config.json");
219                let file = std::fs::File::create(&state_file_path)?;
220                serde_json::to_writer_pretty(file, &state_config)?;
221
222                TableEncoding::Json {
223                    filepath: state_file_path,
224                }
225            }
226            #[cfg(feature = "parquet")]
227            FragmentData::Parquet {
228                tables,
229                compression,
230            } => {
231                let latest_block_config_path =
232                    self.dir.join("latest_block_config.parquet");
233                SnapshotWriter::write_single_el_parquet(
234                    &latest_block_config_path,
235                    latest_block_config,
236                    compression,
237                )?;
238
239                TableEncoding::Parquet {
240                    tables,
241                    latest_block_config_path,
242                }
243            }
244        };
245
246        SnapshotWriter::write_chain_config_and_metadata(
247            &self.dir,
248            chain_config,
249            table_encoding,
250        )
251    }
252}
253
254impl SnapshotWriter {
255    const CHAIN_CONFIG_FILENAME: &'static str = "chain_config.json";
256    pub fn json(dir: impl Into<PathBuf>) -> Self {
257        Self {
258            encoder: EncoderType::Json {
259                builder: StateConfigBuilder::default(),
260            },
261            dir: dir.into(),
262        }
263    }
264
265    #[cfg(feature = "parquet")]
266    pub fn parquet(
267        dir: impl Into<::std::path::PathBuf>,
268        compression_level: ZstdCompressionLevel,
269    ) -> anyhow::Result<Self> {
270        let dir = dir.into();
271        std::fs::create_dir_all(&dir)?;
272        Ok(Self {
273            encoder: EncoderType::Parquet {
274                table_encoders: TableEncoders::new(dir.clone(), compression_level),
275                compression: compression_level,
276            },
277            dir,
278        })
279    }
280
281    #[cfg(feature = "test-helpers")]
282    pub fn write_state_config(
283        mut self,
284        state_config: crate::StateConfig,
285        chain_config: &ChainConfig,
286    ) -> anyhow::Result<SnapshotMetadata> {
287        use fuel_core_storage::tables::{
288            Coins,
289            ContractsAssets,
290            ContractsLatestUtxo,
291            ContractsRawCode,
292            ContractsState,
293            Messages,
294        };
295        use fuel_core_types::fuel_vm::BlobData;
296
297        use crate::AsTable;
298
299        self.write::<Coins>(state_config.as_table())?;
300        self.write::<Messages>(state_config.as_table())?;
301        self.write::<BlobData>(state_config.as_table())?;
302        self.write::<ContractsRawCode>(state_config.as_table())?;
303        self.write::<ContractsLatestUtxo>(state_config.as_table())?;
304        self.write::<ContractsState>(state_config.as_table())?;
305        self.write::<ContractsAssets>(state_config.as_table())?;
306        self.close(state_config.last_block, chain_config)
307    }
308
309    pub fn write<T>(&mut self, elements: Vec<TableEntry<T>>) -> anyhow::Result<()>
310    where
311        T: TableWithBlueprint,
312        TableEntry<T>: serde::Serialize,
313        StateConfigBuilder: AddTable<T>,
314    {
315        match &mut self.encoder {
316            EncoderType::Json { builder, .. } => {
317                builder.add(elements);
318                Ok(())
319            }
320            #[cfg(feature = "parquet")]
321            EncoderType::Parquet { table_encoders, .. } => {
322                table_encoders.encoder::<T>()?.write::<T>(elements)
323            }
324        }
325    }
326
327    #[cfg(feature = "parquet")]
328    fn write_single_el_parquet(
329        path: &std::path::Path,
330        data: impl serde::Serialize,
331        compression: ZstdCompressionLevel,
332    ) -> anyhow::Result<()> {
333        let mut encoder = parquet::encode::Encoder::new(
334            std::fs::File::create(path)?,
335            compression.into(),
336        )?;
337        encoder.write(vec![postcard::to_stdvec(&data)?])?;
338        encoder.close()
339    }
340
341    pub fn close(
342        self,
343        latest_block_config: Option<LastBlockConfig>,
344        chain_config: &ChainConfig,
345    ) -> anyhow::Result<SnapshotMetadata> {
346        self.partial_close()?
347            .finalize(latest_block_config, chain_config)
348    }
349
350    fn write_chain_config_and_metadata(
351        dir: &std::path::Path,
352        chain_config: &ChainConfig,
353        table_encoding: TableEncoding,
354    ) -> anyhow::Result<SnapshotMetadata> {
355        let chain_config_path = dir.join(Self::CHAIN_CONFIG_FILENAME);
356        chain_config.write(&chain_config_path)?;
357
358        let metadata = SnapshotMetadata {
359            chain_config: chain_config_path,
360            table_encoding,
361        };
362        metadata.clone().write(dir)?;
363        Ok(metadata)
364    }
365
366    pub fn partial_close(self) -> anyhow::Result<SnapshotFragment> {
367        let data = match self.encoder {
368            EncoderType::Json { builder } => FragmentData::Json { builder },
369            #[cfg(feature = "parquet")]
370            EncoderType::Parquet {
371                table_encoders,
372                compression,
373                ..
374            } => {
375                let tables = table_encoders.close()?;
376                FragmentData::Parquet {
377                    tables,
378                    compression,
379                }
380            }
381        };
382        let snapshot_fragment = SnapshotFragment {
383            dir: self.dir,
384            data,
385        };
386        Ok(snapshot_fragment)
387    }
388}
389
390#[cfg(feature = "parquet")]
391struct PostcardParquetEncoder {
392    path: PathBuf,
393    encoder: parquet::encode::Encoder<std::fs::File>,
394}
395
396#[cfg(feature = "parquet")]
397impl PostcardParquetEncoder {
398    pub fn new(path: PathBuf, encoder: parquet::encode::Encoder<std::fs::File>) -> Self {
399        Self { path, encoder }
400    }
401
402    fn write<T>(&mut self, elements: Vec<TableEntry<T>>) -> anyhow::Result<()>
403    where
404        T: fuel_core_storage::Mappable,
405        TableEntry<T>: serde::Serialize,
406    {
407        use itertools::Itertools;
408        let encoded: Vec<_> = elements
409            .into_iter()
410            .map(|entry| postcard::to_stdvec(&entry))
411            .try_collect()?;
412        self.encoder.write(encoded)
413    }
414}
415
416#[cfg(feature = "parquet")]
417struct TableEncoders {
418    dir: PathBuf,
419    compression: ZstdCompressionLevel,
420    encoders: std::collections::HashMap<String, PostcardParquetEncoder>,
421}
422
423#[cfg(feature = "parquet")]
424impl TableEncoders {
425    fn new(dir: PathBuf, compression: ZstdCompressionLevel) -> Self {
426        Self {
427            dir,
428            compression,
429            encoders: Default::default(),
430        }
431    }
432
433    fn encoder<T: fuel_core_storage::structured_storage::TableWithBlueprint>(
434        &mut self,
435    ) -> anyhow::Result<&mut PostcardParquetEncoder> {
436        use fuel_core_storage::kv_store::StorageColumn;
437
438        let name = StorageColumn::name(&T::column()).to_string();
439
440        let encoder = match self.encoders.entry(name) {
441            std::collections::hash_map::Entry::Occupied(encoder) => encoder.into_mut(),
442            std::collections::hash_map::Entry::Vacant(vacant) => {
443                let name = vacant.key();
444                let file_path = self.dir.join(format!("{name}.parquet"));
445                let file = std::fs::File::create(&file_path)?;
446                let encoder = PostcardParquetEncoder::new(
447                    file_path,
448                    parquet::encode::Encoder::new(file, self.compression.into())?,
449                );
450                vacant.insert(encoder)
451            }
452        };
453
454        Ok(encoder)
455    }
456
457    fn close(self) -> anyhow::Result<std::collections::HashMap<String, PathBuf>> {
458        let mut files = std::collections::HashMap::new();
459        for (file, encoder) in self.encoders {
460            encoder.encoder.close()?;
461            files.insert(file, encoder.path);
462        }
463        Ok(files)
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use std::path::Path;
470
471    use fuel_core_storage::{
472        kv_store::StorageColumn,
473        structured_storage::TableWithBlueprint,
474        tables::{
475            Coins,
476            ContractsAssets,
477            ContractsLatestUtxo,
478            ContractsRawCode,
479            ContractsState,
480            Messages,
481        },
482    };
483    use rand::{
484        rngs::StdRng,
485        SeedableRng,
486    };
487
488    use crate::StateConfig;
489
490    use super::*;
491
492    #[test]
493    fn can_roundtrip_compression_level() {
494        use strum::IntoEnumIterator;
495
496        for level in crate::ZstdCompressionLevel::iter() {
497            let u8_level = u8::from(level);
498            let roundtrip = ZstdCompressionLevel::try_from(u8_level).unwrap();
499            assert_eq!(level, roundtrip);
500        }
501    }
502
503    #[test]
504    fn parquet_encoder_encodes_tables_in_expected_files() {
505        fn file_created_and_present_in_metadata<T>()
506        where
507            T: TableWithBlueprint,
508            T::OwnedKey: serde::Serialize,
509            T::OwnedValue: serde::Serialize,
510            StateConfigBuilder: AddTable<T>,
511        {
512            // given
513            use pretty_assertions::assert_eq;
514
515            let dir = tempfile::tempdir().unwrap();
516            let mut writer =
517                SnapshotWriter::parquet(dir.path(), ZstdCompressionLevel::Uncompressed)
518                    .unwrap();
519
520            // when
521            writer.write::<T>(vec![]).unwrap();
522            let snapshot = writer.close(None, &ChainConfig::local_testnet()).unwrap();
523
524            // then
525            assert!(snapshot.chain_config.exists());
526            let TableEncoding::Parquet { tables, .. } = snapshot.table_encoding else {
527                panic!("Expected parquet encoding")
528            };
529            assert_eq!(tables.len(), 1, "Expected single table");
530            let (table_name, path) = tables.into_iter().next().unwrap();
531
532            assert_eq!(table_name, T::column().name());
533            assert!(dir.path().join(path).exists());
534        }
535
536        file_created_and_present_in_metadata::<Coins>();
537        file_created_and_present_in_metadata::<Messages>();
538        file_created_and_present_in_metadata::<ContractsRawCode>();
539        file_created_and_present_in_metadata::<ContractsLatestUtxo>();
540        file_created_and_present_in_metadata::<ContractsState>();
541        file_created_and_present_in_metadata::<ContractsAssets>();
542    }
543
544    #[test]
545    fn all_compressions_are_valid() {
546        use ::parquet::basic::Compression;
547        use strum::IntoEnumIterator;
548        for level in ZstdCompressionLevel::iter() {
549            let _ = Compression::from(level);
550        }
551    }
552
553    #[test]
554    fn json_snapshot_is_human_readable() {
555        // given
556        use crate::Randomize;
557        let dir = tempfile::tempdir().unwrap();
558        let writer = SnapshotWriter::json(dir.path());
559        let mut rng = StdRng::from_seed([0; 32]);
560        let state = StateConfig::randomize(&mut rng);
561
562        // when
563        let snapshot = writer
564            .write_state_config(state, &ChainConfig::local_testnet())
565            .unwrap();
566
567        // then
568        let TableEncoding::Json { filepath } = snapshot.table_encoding else {
569            panic!("Expected json encoding")
570        };
571        let encoded_json = std::fs::read_to_string(filepath).unwrap();
572
573        insta::assert_snapshot!(encoded_json);
574    }
575
576    fn given_parquet_writer(path: &Path) -> SnapshotWriter {
577        SnapshotWriter::parquet(path, ZstdCompressionLevel::Uncompressed).unwrap()
578    }
579
580    fn given_json_writer(path: &Path) -> SnapshotWriter {
581        SnapshotWriter::json(path)
582    }
583
584    #[test_case::test_case(given_parquet_writer)]
585    #[test_case::test_case(given_json_writer)]
586    fn can_partially_close_without_chain_and_block_height(
587        writer: impl Fn(&Path) -> SnapshotWriter + Copy,
588    ) {
589        // given
590        let dir = tempfile::tempdir().unwrap();
591        let writer = writer(dir.path());
592
593        // when
594        let result = writer.partial_close();
595
596        // then
597        assert!(result.is_ok());
598    }
599
600    #[test]
601    fn merging_json_and_parquet_fragments_fails() {
602        // given
603        let dir = tempfile::tempdir().unwrap();
604        let json_writer = super::SnapshotWriter::json(dir.path());
605        let parquet_writer = super::SnapshotWriter::parquet(
606            dir.path(),
607            super::ZstdCompressionLevel::Uncompressed,
608        );
609
610        let json_fragment = json_writer.partial_close().unwrap();
611        let parquet_fragment = parquet_writer.unwrap().partial_close().unwrap();
612
613        {
614            // when
615            let result = json_fragment.clone().merge(parquet_fragment.clone());
616
617            // when
618            let err = result.unwrap_err();
619            assert!(err.to_string().contains(
620                "Fragments don't have the same encoding and cannot be merged."
621            ));
622        }
623        {
624            // when
625            let result = parquet_fragment.merge(json_fragment);
626
627            // when
628            let err = result.unwrap_err();
629            assert!(err.to_string().contains(
630                "Fragments don't have the same encoding and cannot be merged."
631            ));
632        }
633    }
634
635    // It is enough just for the test to compile
636    #[test]
637    #[ignore]
638    fn fragment_must_be_send_sync() {
639        fn _assert_send<T: Send + Sync>() {}
640        _assert_send::<super::SnapshotFragment>();
641    }
642}