1use crate::{
2 config::table_entry::TableEntry,
3 AddTable,
4 ChainConfig,
5 LastBlockConfig,
6 SnapshotMetadata,
7 StateConfigBuilder,
8 TableEncoding,
9};
10use fuel_core_storage::structured_storage::TableWithBlueprint;
11use std::path::PathBuf;
12
13#[cfg(feature = "parquet")]
14use super::parquet;
15
16enum EncoderType {
17 Json {
18 builder: StateConfigBuilder,
19 },
20 #[cfg(feature = "parquet")]
21 Parquet {
22 compression: ZstdCompressionLevel,
23 table_encoders: TableEncoders,
24 },
25}
26
27pub struct SnapshotWriter {
28 dir: PathBuf,
29 encoder: EncoderType,
30}
31
32#[allow(dead_code)]
33#[derive(
34 Debug,
35 Clone,
36 Copy,
37 PartialEq,
38 Eq,
39 PartialOrd,
40 Ord,
41 serde::Serialize,
42 serde::Deserialize,
43)]
44#[cfg(feature = "parquet")]
45#[cfg_attr(test, derive(strum::EnumIter))]
46pub enum ZstdCompressionLevel {
47 Uncompressed,
48 Level1,
49 Level2,
50 Level3,
51 Level4,
52 Level5,
53 Level6,
54 Level7,
55 Level8,
56 Level9,
57 Level10,
58 Level11,
59 Level12,
60 Level13,
61 Level14,
62 Level15,
63 Level16,
64 Level17,
65 Level18,
66 Level19,
67 Level20,
68 Level21,
69 Max,
70}
71
72#[cfg(feature = "parquet")]
73impl TryFrom<u8> for ZstdCompressionLevel {
74 type Error = anyhow::Error;
75 fn try_from(value: u8) -> Result<Self, Self::Error> {
76 match value {
77 0 => Ok(Self::Uncompressed),
78 1 => Ok(Self::Level1),
79 2 => Ok(Self::Level2),
80 3 => Ok(Self::Level3),
81 4 => Ok(Self::Level4),
82 5 => Ok(Self::Level5),
83 6 => Ok(Self::Level6),
84 7 => Ok(Self::Level7),
85 8 => Ok(Self::Level8),
86 9 => Ok(Self::Level9),
87 10 => Ok(Self::Level10),
88 11 => Ok(Self::Level11),
89 12 => Ok(Self::Level12),
90 13 => Ok(Self::Level13),
91 14 => Ok(Self::Level14),
92 15 => Ok(Self::Level15),
93 16 => Ok(Self::Level16),
94 17 => Ok(Self::Level17),
95 18 => Ok(Self::Level18),
96 19 => Ok(Self::Level19),
97 20 => Ok(Self::Level20),
98 21 => Ok(Self::Level21),
99 22 => Ok(Self::Max),
100 _ => {
101 anyhow::bail!("Compression level {value} outside of allowed range 0..=22")
102 }
103 }
104 }
105}
106
107#[cfg(feature = "parquet")]
108impl From<ZstdCompressionLevel> for u8 {
109 fn from(value: ZstdCompressionLevel) -> Self {
110 match value {
111 ZstdCompressionLevel::Uncompressed => 0,
112 ZstdCompressionLevel::Level1 => 1,
113 ZstdCompressionLevel::Level2 => 2,
114 ZstdCompressionLevel::Level3 => 3,
115 ZstdCompressionLevel::Level4 => 4,
116 ZstdCompressionLevel::Level5 => 5,
117 ZstdCompressionLevel::Level6 => 6,
118 ZstdCompressionLevel::Level7 => 7,
119 ZstdCompressionLevel::Level8 => 8,
120 ZstdCompressionLevel::Level9 => 9,
121 ZstdCompressionLevel::Level10 => 10,
122 ZstdCompressionLevel::Level11 => 11,
123 ZstdCompressionLevel::Level12 => 12,
124 ZstdCompressionLevel::Level13 => 13,
125 ZstdCompressionLevel::Level14 => 14,
126 ZstdCompressionLevel::Level15 => 15,
127 ZstdCompressionLevel::Level16 => 16,
128 ZstdCompressionLevel::Level17 => 17,
129 ZstdCompressionLevel::Level18 => 18,
130 ZstdCompressionLevel::Level19 => 19,
131 ZstdCompressionLevel::Level20 => 20,
132 ZstdCompressionLevel::Level21 => 21,
133 ZstdCompressionLevel::Max => 22,
134 }
135 }
136}
137
138#[cfg(feature = "parquet")]
139impl From<ZstdCompressionLevel> for ::parquet::basic::Compression {
140 fn from(value: ZstdCompressionLevel) -> Self {
141 if let ZstdCompressionLevel::Uncompressed = value {
142 Self::UNCOMPRESSED
143 } else {
144 let level = i32::from(u8::from(value));
145 let level = ::parquet::basic::ZstdLevel::try_new(level)
146 .expect("our range to mimic the parquet zstd range");
147 Self::ZSTD(level)
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
153enum FragmentData {
154 Json {
155 builder: StateConfigBuilder,
156 },
157 #[cfg(feature = "parquet")]
158 Parquet {
159 tables: std::collections::HashMap<String, PathBuf>,
160 compression: ZstdCompressionLevel,
161 },
162}
163impl FragmentData {
164 fn merge(mut self, data: FragmentData) -> anyhow::Result<Self> {
165 match (&mut self, data) {
166 (
167 FragmentData::Json { builder, .. },
168 FragmentData::Json {
169 builder: other_builder,
170 ..
171 },
172 ) => {
173 builder.merge(other_builder);
174 }
175 #[cfg(feature = "parquet")]
176 (
177 FragmentData::Parquet {
178 tables,
179 compression,
180 },
181 FragmentData::Parquet {
182 tables: their_tables,
183 compression: their_compression,
184 },
185 ) => {
186 tables.extend(their_tables);
187 anyhow::ensure!(*compression == their_compression, "Fragments use different compressions.")
188 }
189 #[cfg(feature="parquet")]
190 (a,b) => anyhow::bail!("Fragments don't have the same encoding and cannot be merged. Fragments: {a:?} and {b:?}"),
191 };
192
193 Ok(self)
194 }
195}
196
197#[derive(Debug, Clone)]
198pub struct SnapshotFragment {
199 dir: PathBuf,
200 data: FragmentData,
201}
202
203impl SnapshotFragment {
204 pub fn merge(mut self, fragment: Self) -> anyhow::Result<Self> {
205 self.data = self.data.merge(fragment.data)?;
206 Ok(self)
207 }
208
209 pub fn finalize(
210 self,
211 latest_block_config: Option<LastBlockConfig>,
212 chain_config: &ChainConfig,
213 ) -> anyhow::Result<SnapshotMetadata> {
214 let table_encoding = match self.data {
215 FragmentData::Json { builder } => {
216 let state_config = builder.build(latest_block_config)?;
217 std::fs::create_dir_all(&self.dir)?;
218 let state_file_path = self.dir.join("state_config.json");
219 let file = std::fs::File::create(&state_file_path)?;
220 serde_json::to_writer_pretty(file, &state_config)?;
221
222 TableEncoding::Json {
223 filepath: state_file_path,
224 }
225 }
226 #[cfg(feature = "parquet")]
227 FragmentData::Parquet {
228 tables,
229 compression,
230 } => {
231 let latest_block_config_path =
232 self.dir.join("latest_block_config.parquet");
233 SnapshotWriter::write_single_el_parquet(
234 &latest_block_config_path,
235 latest_block_config,
236 compression,
237 )?;
238
239 TableEncoding::Parquet {
240 tables,
241 latest_block_config_path,
242 }
243 }
244 };
245
246 SnapshotWriter::write_chain_config_and_metadata(
247 &self.dir,
248 chain_config,
249 table_encoding,
250 )
251 }
252}
253
254impl SnapshotWriter {
255 const CHAIN_CONFIG_FILENAME: &'static str = "chain_config.json";
256 pub fn json(dir: impl Into<PathBuf>) -> Self {
257 Self {
258 encoder: EncoderType::Json {
259 builder: StateConfigBuilder::default(),
260 },
261 dir: dir.into(),
262 }
263 }
264
265 #[cfg(feature = "parquet")]
266 pub fn parquet(
267 dir: impl Into<::std::path::PathBuf>,
268 compression_level: ZstdCompressionLevel,
269 ) -> anyhow::Result<Self> {
270 let dir = dir.into();
271 std::fs::create_dir_all(&dir)?;
272 Ok(Self {
273 encoder: EncoderType::Parquet {
274 table_encoders: TableEncoders::new(dir.clone(), compression_level),
275 compression: compression_level,
276 },
277 dir,
278 })
279 }
280
281 #[cfg(feature = "test-helpers")]
282 pub fn write_state_config(
283 mut self,
284 state_config: crate::StateConfig,
285 chain_config: &ChainConfig,
286 ) -> anyhow::Result<SnapshotMetadata> {
287 use fuel_core_storage::tables::{
288 Coins,
289 ContractsAssets,
290 ContractsLatestUtxo,
291 ContractsRawCode,
292 ContractsState,
293 Messages,
294 };
295 use fuel_core_types::fuel_vm::BlobData;
296
297 use crate::AsTable;
298
299 self.write::<Coins>(state_config.as_table())?;
300 self.write::<Messages>(state_config.as_table())?;
301 self.write::<BlobData>(state_config.as_table())?;
302 self.write::<ContractsRawCode>(state_config.as_table())?;
303 self.write::<ContractsLatestUtxo>(state_config.as_table())?;
304 self.write::<ContractsState>(state_config.as_table())?;
305 self.write::<ContractsAssets>(state_config.as_table())?;
306 self.close(state_config.last_block, chain_config)
307 }
308
309 pub fn write<T>(&mut self, elements: Vec<TableEntry<T>>) -> anyhow::Result<()>
310 where
311 T: TableWithBlueprint,
312 TableEntry<T>: serde::Serialize,
313 StateConfigBuilder: AddTable<T>,
314 {
315 match &mut self.encoder {
316 EncoderType::Json { builder, .. } => {
317 builder.add(elements);
318 Ok(())
319 }
320 #[cfg(feature = "parquet")]
321 EncoderType::Parquet { table_encoders, .. } => {
322 table_encoders.encoder::<T>()?.write::<T>(elements)
323 }
324 }
325 }
326
327 #[cfg(feature = "parquet")]
328 fn write_single_el_parquet(
329 path: &std::path::Path,
330 data: impl serde::Serialize,
331 compression: ZstdCompressionLevel,
332 ) -> anyhow::Result<()> {
333 let mut encoder = parquet::encode::Encoder::new(
334 std::fs::File::create(path)?,
335 compression.into(),
336 )?;
337 encoder.write(vec![postcard::to_stdvec(&data)?])?;
338 encoder.close()
339 }
340
341 pub fn close(
342 self,
343 latest_block_config: Option<LastBlockConfig>,
344 chain_config: &ChainConfig,
345 ) -> anyhow::Result<SnapshotMetadata> {
346 self.partial_close()?
347 .finalize(latest_block_config, chain_config)
348 }
349
350 fn write_chain_config_and_metadata(
351 dir: &std::path::Path,
352 chain_config: &ChainConfig,
353 table_encoding: TableEncoding,
354 ) -> anyhow::Result<SnapshotMetadata> {
355 let chain_config_path = dir.join(Self::CHAIN_CONFIG_FILENAME);
356 chain_config.write(&chain_config_path)?;
357
358 let metadata = SnapshotMetadata {
359 chain_config: chain_config_path,
360 table_encoding,
361 };
362 metadata.clone().write(dir)?;
363 Ok(metadata)
364 }
365
366 pub fn partial_close(self) -> anyhow::Result<SnapshotFragment> {
367 let data = match self.encoder {
368 EncoderType::Json { builder } => FragmentData::Json { builder },
369 #[cfg(feature = "parquet")]
370 EncoderType::Parquet {
371 table_encoders,
372 compression,
373 ..
374 } => {
375 let tables = table_encoders.close()?;
376 FragmentData::Parquet {
377 tables,
378 compression,
379 }
380 }
381 };
382 let snapshot_fragment = SnapshotFragment {
383 dir: self.dir,
384 data,
385 };
386 Ok(snapshot_fragment)
387 }
388}
389
390#[cfg(feature = "parquet")]
391struct PostcardParquetEncoder {
392 path: PathBuf,
393 encoder: parquet::encode::Encoder<std::fs::File>,
394}
395
396#[cfg(feature = "parquet")]
397impl PostcardParquetEncoder {
398 pub fn new(path: PathBuf, encoder: parquet::encode::Encoder<std::fs::File>) -> Self {
399 Self { path, encoder }
400 }
401
402 fn write<T>(&mut self, elements: Vec<TableEntry<T>>) -> anyhow::Result<()>
403 where
404 T: fuel_core_storage::Mappable,
405 TableEntry<T>: serde::Serialize,
406 {
407 use itertools::Itertools;
408 let encoded: Vec<_> = elements
409 .into_iter()
410 .map(|entry| postcard::to_stdvec(&entry))
411 .try_collect()?;
412 self.encoder.write(encoded)
413 }
414}
415
416#[cfg(feature = "parquet")]
417struct TableEncoders {
418 dir: PathBuf,
419 compression: ZstdCompressionLevel,
420 encoders: std::collections::HashMap<String, PostcardParquetEncoder>,
421}
422
423#[cfg(feature = "parquet")]
424impl TableEncoders {
425 fn new(dir: PathBuf, compression: ZstdCompressionLevel) -> Self {
426 Self {
427 dir,
428 compression,
429 encoders: Default::default(),
430 }
431 }
432
433 fn encoder<T: fuel_core_storage::structured_storage::TableWithBlueprint>(
434 &mut self,
435 ) -> anyhow::Result<&mut PostcardParquetEncoder> {
436 use fuel_core_storage::kv_store::StorageColumn;
437
438 let name = StorageColumn::name(&T::column()).to_string();
439
440 let encoder = match self.encoders.entry(name) {
441 std::collections::hash_map::Entry::Occupied(encoder) => encoder.into_mut(),
442 std::collections::hash_map::Entry::Vacant(vacant) => {
443 let name = vacant.key();
444 let file_path = self.dir.join(format!("{name}.parquet"));
445 let file = std::fs::File::create(&file_path)?;
446 let encoder = PostcardParquetEncoder::new(
447 file_path,
448 parquet::encode::Encoder::new(file, self.compression.into())?,
449 );
450 vacant.insert(encoder)
451 }
452 };
453
454 Ok(encoder)
455 }
456
457 fn close(self) -> anyhow::Result<std::collections::HashMap<String, PathBuf>> {
458 let mut files = std::collections::HashMap::new();
459 for (file, encoder) in self.encoders {
460 encoder.encoder.close()?;
461 files.insert(file, encoder.path);
462 }
463 Ok(files)
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use std::path::Path;
470
471 use fuel_core_storage::{
472 kv_store::StorageColumn,
473 structured_storage::TableWithBlueprint,
474 tables::{
475 Coins,
476 ContractsAssets,
477 ContractsLatestUtxo,
478 ContractsRawCode,
479 ContractsState,
480 Messages,
481 },
482 };
483 use rand::{
484 rngs::StdRng,
485 SeedableRng,
486 };
487
488 use crate::StateConfig;
489
490 use super::*;
491
492 #[test]
493 fn can_roundtrip_compression_level() {
494 use strum::IntoEnumIterator;
495
496 for level in crate::ZstdCompressionLevel::iter() {
497 let u8_level = u8::from(level);
498 let roundtrip = ZstdCompressionLevel::try_from(u8_level).unwrap();
499 assert_eq!(level, roundtrip);
500 }
501 }
502
503 #[test]
504 fn parquet_encoder_encodes_tables_in_expected_files() {
505 fn file_created_and_present_in_metadata<T>()
506 where
507 T: TableWithBlueprint,
508 T::OwnedKey: serde::Serialize,
509 T::OwnedValue: serde::Serialize,
510 StateConfigBuilder: AddTable<T>,
511 {
512 use pretty_assertions::assert_eq;
514
515 let dir = tempfile::tempdir().unwrap();
516 let mut writer =
517 SnapshotWriter::parquet(dir.path(), ZstdCompressionLevel::Uncompressed)
518 .unwrap();
519
520 writer.write::<T>(vec![]).unwrap();
522 let snapshot = writer.close(None, &ChainConfig::local_testnet()).unwrap();
523
524 assert!(snapshot.chain_config.exists());
526 let TableEncoding::Parquet { tables, .. } = snapshot.table_encoding else {
527 panic!("Expected parquet encoding")
528 };
529 assert_eq!(tables.len(), 1, "Expected single table");
530 let (table_name, path) = tables.into_iter().next().unwrap();
531
532 assert_eq!(table_name, T::column().name());
533 assert!(dir.path().join(path).exists());
534 }
535
536 file_created_and_present_in_metadata::<Coins>();
537 file_created_and_present_in_metadata::<Messages>();
538 file_created_and_present_in_metadata::<ContractsRawCode>();
539 file_created_and_present_in_metadata::<ContractsLatestUtxo>();
540 file_created_and_present_in_metadata::<ContractsState>();
541 file_created_and_present_in_metadata::<ContractsAssets>();
542 }
543
544 #[test]
545 fn all_compressions_are_valid() {
546 use ::parquet::basic::Compression;
547 use strum::IntoEnumIterator;
548 for level in ZstdCompressionLevel::iter() {
549 let _ = Compression::from(level);
550 }
551 }
552
553 #[test]
554 fn json_snapshot_is_human_readable() {
555 use crate::Randomize;
557 let dir = tempfile::tempdir().unwrap();
558 let writer = SnapshotWriter::json(dir.path());
559 let mut rng = StdRng::from_seed([0; 32]);
560 let state = StateConfig::randomize(&mut rng);
561
562 let snapshot = writer
564 .write_state_config(state, &ChainConfig::local_testnet())
565 .unwrap();
566
567 let TableEncoding::Json { filepath } = snapshot.table_encoding else {
569 panic!("Expected json encoding")
570 };
571 let encoded_json = std::fs::read_to_string(filepath).unwrap();
572
573 insta::assert_snapshot!(encoded_json);
574 }
575
576 fn given_parquet_writer(path: &Path) -> SnapshotWriter {
577 SnapshotWriter::parquet(path, ZstdCompressionLevel::Uncompressed).unwrap()
578 }
579
580 fn given_json_writer(path: &Path) -> SnapshotWriter {
581 SnapshotWriter::json(path)
582 }
583
584 #[test_case::test_case(given_parquet_writer)]
585 #[test_case::test_case(given_json_writer)]
586 fn can_partially_close_without_chain_and_block_height(
587 writer: impl Fn(&Path) -> SnapshotWriter + Copy,
588 ) {
589 let dir = tempfile::tempdir().unwrap();
591 let writer = writer(dir.path());
592
593 let result = writer.partial_close();
595
596 assert!(result.is_ok());
598 }
599
600 #[test]
601 fn merging_json_and_parquet_fragments_fails() {
602 let dir = tempfile::tempdir().unwrap();
604 let json_writer = super::SnapshotWriter::json(dir.path());
605 let parquet_writer = super::SnapshotWriter::parquet(
606 dir.path(),
607 super::ZstdCompressionLevel::Uncompressed,
608 );
609
610 let json_fragment = json_writer.partial_close().unwrap();
611 let parquet_fragment = parquet_writer.unwrap().partial_close().unwrap();
612
613 {
614 let result = json_fragment.clone().merge(parquet_fragment.clone());
616
617 let err = result.unwrap_err();
619 assert!(err.to_string().contains(
620 "Fragments don't have the same encoding and cannot be merged."
621 ));
622 }
623 {
624 let result = parquet_fragment.merge(json_fragment);
626
627 let err = result.unwrap_err();
629 assert!(err.to_string().contains(
630 "Fragments don't have the same encoding and cannot be merged."
631 ));
632 }
633 }
634
635 #[test]
637 #[ignore]
638 fn fragment_must_be_send_sync() {
639 fn _assert_send<T: Send + Sync>() {}
640 _assert_send::<super::SnapshotFragment>();
641 }
642}