fuel_core_chain_config/config/state/
reader.rs

1use std::fmt::Debug;
2
3use fuel_core_storage::{
4    structured_storage::TableWithBlueprint,
5    Mappable,
6};
7use itertools::Itertools;
8
9use crate::{
10    config::table_entry::TableEntry,
11    AsTable,
12    ChainConfig,
13    LastBlockConfig,
14    StateConfig,
15    MAX_GROUP_SIZE,
16};
17
18pub struct Groups<T: Mappable> {
19    iter: GroupIter<T>,
20}
21
22impl<T> Groups<T>
23where
24    T: Mappable,
25{
26    pub fn len(&self) -> usize {
27        match &self.iter {
28            GroupIter::InMemory { groups } => groups.len(),
29            #[cfg(feature = "parquet")]
30            GroupIter::Parquet { decoder } => decoder.num_groups(),
31        }
32    }
33
34    pub fn is_empty(&self) -> bool {
35        self.len() == 0
36    }
37}
38
39impl<T> IntoIterator for Groups<T>
40where
41    T: Mappable,
42    GroupIter<T>: Iterator,
43{
44    type IntoIter = GroupIter<T>;
45    type Item = <Self::IntoIter as Iterator>::Item;
46
47    fn into_iter(self) -> Self::IntoIter {
48        self.iter
49    }
50}
51
52pub enum GroupIter<T>
53where
54    T: Mappable,
55{
56    InMemory {
57        groups: std::vec::IntoIter<anyhow::Result<Vec<TableEntry<T>>>>,
58    },
59    #[cfg(feature = "parquet")]
60    Parquet {
61        decoder: super::parquet::decode::Decoder<std::fs::File>,
62    },
63}
64
65#[cfg(feature = "parquet")]
66impl<T> Iterator for GroupIter<T>
67where
68    T: Mappable,
69    TableEntry<T>: serde::de::DeserializeOwned,
70{
71    type Item = anyhow::Result<Vec<TableEntry<T>>>;
72
73    fn next(&mut self) -> Option<Self::Item> {
74        match self {
75            GroupIter::InMemory { groups } => groups.next(),
76            GroupIter::Parquet { decoder } => {
77                let group = decoder.next()?.and_then(|byte_group| {
78                    byte_group
79                        .into_iter()
80                        .map(|group| {
81                            postcard::from_bytes(&group).map_err(|e| anyhow::anyhow!(e))
82                        })
83                        .collect()
84                });
85                Some(group)
86            }
87        }
88    }
89}
90
91#[cfg(not(feature = "parquet"))]
92impl<T> Iterator for GroupIter<T>
93where
94    T: Mappable,
95{
96    type Item = anyhow::Result<Vec<TableEntry<T>>>;
97
98    fn next(&mut self) -> Option<Self::Item> {
99        match self {
100            GroupIter::InMemory { groups } => groups.next(),
101        }
102    }
103}
104
105#[derive(Clone, Debug)]
106enum DataSource {
107    #[cfg(feature = "parquet")]
108    Parquet {
109        tables: std::collections::HashMap<String, std::path::PathBuf>,
110        latest_block_config: Option<LastBlockConfig>,
111    },
112    InMemory {
113        state: StateConfig,
114        group_size: usize,
115    },
116}
117
118#[derive(Clone, Debug)]
119pub struct SnapshotReader {
120    chain_config: ChainConfig,
121    data_source: DataSource,
122}
123
124impl SnapshotReader {
125    pub fn new_in_memory(chain_config: ChainConfig, state: StateConfig) -> Self {
126        Self {
127            chain_config,
128            data_source: DataSource::InMemory {
129                state,
130                group_size: MAX_GROUP_SIZE,
131            },
132        }
133    }
134
135    #[cfg(feature = "test-helpers")]
136    pub fn local_testnet() -> Self {
137        let state = StateConfig::local_testnet();
138        let chain_config = ChainConfig::local_testnet();
139        Self::new_in_memory(chain_config, state)
140    }
141
142    pub fn with_chain_config(self, chain_config: ChainConfig) -> Self {
143        Self {
144            chain_config,
145            ..self
146        }
147    }
148
149    pub fn with_state_config(self, state_config: StateConfig) -> Self {
150        Self {
151            data_source: DataSource::InMemory {
152                state: state_config,
153                group_size: MAX_GROUP_SIZE,
154            },
155            ..self
156        }
157    }
158
159    #[cfg(feature = "std")]
160    fn json(
161        state_file: impl AsRef<std::path::Path>,
162        chain_config: ChainConfig,
163        group_size: usize,
164    ) -> anyhow::Result<Self> {
165        use anyhow::Context;
166        use std::io::Read;
167        let state = {
168            let path = state_file.as_ref();
169            let mut json = String::new();
170            std::fs::File::open(path)
171                .with_context(|| format!("Could not open snapshot file: {path:?}"))?
172                .read_to_string(&mut json)?;
173            serde_json::from_str(json.as_str())?
174        };
175
176        Ok(Self {
177            data_source: DataSource::InMemory { state, group_size },
178            chain_config,
179        })
180    }
181
182    #[cfg(feature = "parquet")]
183    fn parquet(
184        tables: std::collections::HashMap<String, std::path::PathBuf>,
185        latest_block_config: std::path::PathBuf,
186        chain_config: ChainConfig,
187    ) -> anyhow::Result<Self> {
188        let latest_block_config = Self::read_config(&latest_block_config)?;
189        Ok(Self {
190            data_source: DataSource::Parquet {
191                tables,
192                latest_block_config,
193            },
194            chain_config,
195        })
196    }
197
198    #[cfg(feature = "parquet")]
199    fn read_config<Config>(path: &std::path::Path) -> anyhow::Result<Config>
200    where
201        Config: serde::de::DeserializeOwned,
202    {
203        use super::parquet::decode::Decoder;
204
205        let file = std::fs::File::open(path)?;
206        let group = Decoder::new(file)?
207            .next()
208            .ok_or_else(|| anyhow::anyhow!("No block height found"))??;
209        let config = group
210            .into_iter()
211            .next()
212            .ok_or_else(|| anyhow::anyhow!("No config found"))?;
213        postcard::from_bytes(&config).map_err(Into::into)
214    }
215
216    #[cfg(feature = "std")]
217    pub fn open(
218        snapshot_metadata: crate::config::SnapshotMetadata,
219    ) -> anyhow::Result<Self> {
220        Self::open_w_config(snapshot_metadata, MAX_GROUP_SIZE)
221    }
222
223    #[cfg(feature = "std")]
224    pub fn open_w_config(
225        snapshot_metadata: crate::config::SnapshotMetadata,
226        json_group_size: usize,
227    ) -> anyhow::Result<Self> {
228        use crate::TableEncoding;
229        let chain_config = ChainConfig::from_snapshot_metadata(&snapshot_metadata)?;
230
231        match snapshot_metadata.table_encoding {
232            TableEncoding::Json { filepath } => {
233                Self::json(filepath, chain_config, json_group_size)
234            }
235            #[cfg(feature = "parquet")]
236            TableEncoding::Parquet {
237                tables,
238                latest_block_config_path,
239                ..
240            } => Self::parquet(tables, latest_block_config_path, chain_config),
241        }
242    }
243
244    pub fn read<T>(&self) -> anyhow::Result<Groups<T>>
245    where
246        T: TableWithBlueprint,
247        StateConfig: AsTable<T>,
248        TableEntry<T>: serde::de::DeserializeOwned,
249    {
250        let iter = match &self.data_source {
251            #[cfg(feature = "parquet")]
252            DataSource::Parquet { tables, .. } => {
253                use anyhow::Context;
254                use fuel_core_storage::kv_store::StorageColumn;
255                let name = T::column().name();
256                let Some(path) = tables.get(name.as_str()) else {
257                    return Ok(Groups {
258                        iter: GroupIter::InMemory {
259                            groups: vec![].into_iter(),
260                        },
261                    });
262                };
263                let file = std::fs::File::open(path).with_context(|| {
264                    format!("Could not open {path:?} in order to read table '{name}'")
265                })?;
266
267                GroupIter::Parquet {
268                    decoder: super::parquet::decode::Decoder::new(file)?,
269                }
270            }
271            DataSource::InMemory { state, group_size } => {
272                let collection = state
273                    .as_table()
274                    .into_iter()
275                    .chunks(*group_size)
276                    .into_iter()
277                    .map(|vec_chunk| Ok(vec_chunk.collect()))
278                    .collect_vec();
279                GroupIter::InMemory {
280                    groups: collection.into_iter(),
281                }
282            }
283        };
284
285        Ok(Groups { iter })
286    }
287
288    pub fn chain_config(&self) -> &ChainConfig {
289        &self.chain_config
290    }
291
292    pub fn last_block_config(&self) -> Option<&LastBlockConfig> {
293        match &self.data_source {
294            DataSource::InMemory { state, .. } => state.last_block.as_ref(),
295            #[cfg(feature = "parquet")]
296            DataSource::Parquet {
297                latest_block_config: block,
298                ..
299            } => block.as_ref(),
300        }
301    }
302}