fuel_core_chain_config/config/state/
reader.rs1use std::fmt::Debug;
2
3use fuel_core_storage::{
4 structured_storage::TableWithBlueprint,
5 Mappable,
6};
7use itertools::Itertools;
8
9use crate::{
10 config::table_entry::TableEntry,
11 AsTable,
12 ChainConfig,
13 LastBlockConfig,
14 StateConfig,
15 MAX_GROUP_SIZE,
16};
17
18pub struct Groups<T: Mappable> {
19 iter: GroupIter<T>,
20}
21
22impl<T> Groups<T>
23where
24 T: Mappable,
25{
26 pub fn len(&self) -> usize {
27 match &self.iter {
28 GroupIter::InMemory { groups } => groups.len(),
29 #[cfg(feature = "parquet")]
30 GroupIter::Parquet { decoder } => decoder.num_groups(),
31 }
32 }
33
34 pub fn is_empty(&self) -> bool {
35 self.len() == 0
36 }
37}
38
39impl<T> IntoIterator for Groups<T>
40where
41 T: Mappable,
42 GroupIter<T>: Iterator,
43{
44 type IntoIter = GroupIter<T>;
45 type Item = <Self::IntoIter as Iterator>::Item;
46
47 fn into_iter(self) -> Self::IntoIter {
48 self.iter
49 }
50}
51
52pub enum GroupIter<T>
53where
54 T: Mappable,
55{
56 InMemory {
57 groups: std::vec::IntoIter<anyhow::Result<Vec<TableEntry<T>>>>,
58 },
59 #[cfg(feature = "parquet")]
60 Parquet {
61 decoder: super::parquet::decode::Decoder<std::fs::File>,
62 },
63}
64
65#[cfg(feature = "parquet")]
66impl<T> Iterator for GroupIter<T>
67where
68 T: Mappable,
69 TableEntry<T>: serde::de::DeserializeOwned,
70{
71 type Item = anyhow::Result<Vec<TableEntry<T>>>;
72
73 fn next(&mut self) -> Option<Self::Item> {
74 match self {
75 GroupIter::InMemory { groups } => groups.next(),
76 GroupIter::Parquet { decoder } => {
77 let group = decoder.next()?.and_then(|byte_group| {
78 byte_group
79 .into_iter()
80 .map(|group| {
81 postcard::from_bytes(&group).map_err(|e| anyhow::anyhow!(e))
82 })
83 .collect()
84 });
85 Some(group)
86 }
87 }
88 }
89}
90
91#[cfg(not(feature = "parquet"))]
92impl<T> Iterator for GroupIter<T>
93where
94 T: Mappable,
95{
96 type Item = anyhow::Result<Vec<TableEntry<T>>>;
97
98 fn next(&mut self) -> Option<Self::Item> {
99 match self {
100 GroupIter::InMemory { groups } => groups.next(),
101 }
102 }
103}
104
105#[derive(Clone, Debug)]
106enum DataSource {
107 #[cfg(feature = "parquet")]
108 Parquet {
109 tables: std::collections::HashMap<String, std::path::PathBuf>,
110 latest_block_config: Option<LastBlockConfig>,
111 },
112 InMemory {
113 state: StateConfig,
114 group_size: usize,
115 },
116}
117
118#[derive(Clone, Debug)]
119pub struct SnapshotReader {
120 chain_config: ChainConfig,
121 data_source: DataSource,
122}
123
124impl SnapshotReader {
125 pub fn new_in_memory(chain_config: ChainConfig, state: StateConfig) -> Self {
126 Self {
127 chain_config,
128 data_source: DataSource::InMemory {
129 state,
130 group_size: MAX_GROUP_SIZE,
131 },
132 }
133 }
134
135 #[cfg(feature = "test-helpers")]
136 pub fn local_testnet() -> Self {
137 let state = StateConfig::local_testnet();
138 let chain_config = ChainConfig::local_testnet();
139 Self::new_in_memory(chain_config, state)
140 }
141
142 pub fn with_chain_config(self, chain_config: ChainConfig) -> Self {
143 Self {
144 chain_config,
145 ..self
146 }
147 }
148
149 pub fn with_state_config(self, state_config: StateConfig) -> Self {
150 Self {
151 data_source: DataSource::InMemory {
152 state: state_config,
153 group_size: MAX_GROUP_SIZE,
154 },
155 ..self
156 }
157 }
158
159 #[cfg(feature = "std")]
160 fn json(
161 state_file: impl AsRef<std::path::Path>,
162 chain_config: ChainConfig,
163 group_size: usize,
164 ) -> anyhow::Result<Self> {
165 use anyhow::Context;
166 use std::io::Read;
167 let state = {
168 let path = state_file.as_ref();
169 let mut json = String::new();
170 std::fs::File::open(path)
171 .with_context(|| format!("Could not open snapshot file: {path:?}"))?
172 .read_to_string(&mut json)?;
173 serde_json::from_str(json.as_str())?
174 };
175
176 Ok(Self {
177 data_source: DataSource::InMemory { state, group_size },
178 chain_config,
179 })
180 }
181
182 #[cfg(feature = "parquet")]
183 fn parquet(
184 tables: std::collections::HashMap<String, std::path::PathBuf>,
185 latest_block_config: std::path::PathBuf,
186 chain_config: ChainConfig,
187 ) -> anyhow::Result<Self> {
188 let latest_block_config = Self::read_config(&latest_block_config)?;
189 Ok(Self {
190 data_source: DataSource::Parquet {
191 tables,
192 latest_block_config,
193 },
194 chain_config,
195 })
196 }
197
198 #[cfg(feature = "parquet")]
199 fn read_config<Config>(path: &std::path::Path) -> anyhow::Result<Config>
200 where
201 Config: serde::de::DeserializeOwned,
202 {
203 use super::parquet::decode::Decoder;
204
205 let file = std::fs::File::open(path)?;
206 let group = Decoder::new(file)?
207 .next()
208 .ok_or_else(|| anyhow::anyhow!("No block height found"))??;
209 let config = group
210 .into_iter()
211 .next()
212 .ok_or_else(|| anyhow::anyhow!("No config found"))?;
213 postcard::from_bytes(&config).map_err(Into::into)
214 }
215
216 #[cfg(feature = "std")]
217 pub fn open(
218 snapshot_metadata: crate::config::SnapshotMetadata,
219 ) -> anyhow::Result<Self> {
220 Self::open_w_config(snapshot_metadata, MAX_GROUP_SIZE)
221 }
222
223 #[cfg(feature = "std")]
224 pub fn open_w_config(
225 snapshot_metadata: crate::config::SnapshotMetadata,
226 json_group_size: usize,
227 ) -> anyhow::Result<Self> {
228 use crate::TableEncoding;
229 let chain_config = ChainConfig::from_snapshot_metadata(&snapshot_metadata)?;
230
231 match snapshot_metadata.table_encoding {
232 TableEncoding::Json { filepath } => {
233 Self::json(filepath, chain_config, json_group_size)
234 }
235 #[cfg(feature = "parquet")]
236 TableEncoding::Parquet {
237 tables,
238 latest_block_config_path,
239 ..
240 } => Self::parquet(tables, latest_block_config_path, chain_config),
241 }
242 }
243
244 pub fn read<T>(&self) -> anyhow::Result<Groups<T>>
245 where
246 T: TableWithBlueprint,
247 StateConfig: AsTable<T>,
248 TableEntry<T>: serde::de::DeserializeOwned,
249 {
250 let iter = match &self.data_source {
251 #[cfg(feature = "parquet")]
252 DataSource::Parquet { tables, .. } => {
253 use anyhow::Context;
254 use fuel_core_storage::kv_store::StorageColumn;
255 let name = T::column().name();
256 let Some(path) = tables.get(name.as_str()) else {
257 return Ok(Groups {
258 iter: GroupIter::InMemory {
259 groups: vec![].into_iter(),
260 },
261 });
262 };
263 let file = std::fs::File::open(path).with_context(|| {
264 format!("Could not open {path:?} in order to read table '{name}'")
265 })?;
266
267 GroupIter::Parquet {
268 decoder: super::parquet::decode::Decoder::new(file)?,
269 }
270 }
271 DataSource::InMemory { state, group_size } => {
272 let collection = state
273 .as_table()
274 .into_iter()
275 .chunks(*group_size)
276 .into_iter()
277 .map(|vec_chunk| Ok(vec_chunk.collect()))
278 .collect_vec();
279 GroupIter::InMemory {
280 groups: collection.into_iter(),
281 }
282 }
283 };
284
285 Ok(Groups { iter })
286 }
287
288 pub fn chain_config(&self) -> &ChainConfig {
289 &self.chain_config
290 }
291
292 pub fn last_block_config(&self) -> Option<&LastBlockConfig> {
293 match &self.data_source {
294 DataSource::InMemory { state, .. } => state.last_block.as_ref(),
295 #[cfg(feature = "parquet")]
296 DataSource::Parquet {
297 latest_block_config: block,
298 ..
299 } => block.as_ref(),
300 }
301 }
302}