1use std::time::Duration;
2use std::{cmp, ops};
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_core::db::DatabaseTransaction;
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::epoch::ConsensusItem;
8use fedimint_core::module::registry::ModuleDecoderRegistry;
9use fedimint_core::module::{ApiVersion, ModuleCommon};
10use fedimint_core::session_outcome::{AcceptedItem, SessionStatus};
11use fedimint_core::task::{MaybeSend, MaybeSync, ShuttingDownError, TaskGroup};
12use fedimint_core::transaction::Transaction;
13use fedimint_core::{apply, async_trait_maybe_send, OutPoint};
14use fedimint_logging::LOG_CLIENT_RECOVERY;
15use futures::{Stream, StreamExt as _};
16use rand::{thread_rng, Rng as _};
17use serde::{Deserialize, Serialize};
18use tracing::{debug, info, trace, warn};
19
20use super::{ClientModuleInit, ClientModuleRecoverArgs};
21use crate::module::recovery::RecoveryProgress;
22use crate::module::{ClientContext, ClientModule};
23
24#[derive(Debug, Clone, Eq, PartialEq, Encodable, Decodable, Serialize, Deserialize)]
25pub struct RecoveryFromHistoryCommon {
27 start_session: u64,
28 next_session: u64,
29 end_session: u64,
30}
31
32impl RecoveryFromHistoryCommon {
33 pub fn new(start_session: u64, next_session: u64, end_session: u64) -> Self {
34 Self {
35 start_session,
36 next_session,
37 end_session,
38 }
39 }
40}
41
42#[apply(async_trait_maybe_send!)]
46pub trait RecoveryFromHistory: std::fmt::Debug + MaybeSend + MaybeSync + Clone {
47 type Init: ClientModuleInit;
49
50 async fn new(
52 init: &Self::Init,
53 args: &ClientModuleRecoverArgs<Self::Init>,
54 snapshot: Option<&<<Self::Init as ClientModuleInit>::Module as ClientModule>::Backup>,
55 ) -> anyhow::Result<(Self, u64)>;
56
57 async fn load_dbtx(
63 init: &Self::Init,
64 dbtx: &mut DatabaseTransaction<'_>,
65 args: &ClientModuleRecoverArgs<Self::Init>,
66 ) -> anyhow::Result<Option<(Self, RecoveryFromHistoryCommon)>>;
67
68 async fn store_dbtx(
72 &self,
73 dbtx: &mut DatabaseTransaction<'_>,
74 common: &RecoveryFromHistoryCommon,
75 );
76
77 async fn delete_dbtx(&self, dbtx: &mut DatabaseTransaction<'_>);
81
82 async fn load_finalized(dbtx: &mut DatabaseTransaction<'_>) -> Option<bool>;
86
87 async fn store_finalized(dbtx: &mut DatabaseTransaction<'_>, state: bool);
91
92 async fn handle_session(
102 &mut self,
103 client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
104 session_idx: u64,
105 session_items: &Vec<AcceptedItem>,
106 ) -> anyhow::Result<()> {
107 for accepted_item in session_items {
108 if let ConsensusItem::Transaction(ref transaction) = accepted_item.item {
109 self.handle_transaction(client_ctx, transaction, session_idx)
110 .await?;
111 }
112 }
113 Ok(())
114 }
115
116 async fn handle_transaction(
127 &mut self,
128 client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
129 transaction: &Transaction,
130 session_idx: u64,
131 ) -> anyhow::Result<()> {
132 trace!(
133 target: LOG_CLIENT_RECOVERY,
134 tx_hash = %transaction.tx_hash(),
135 input_num = transaction.inputs.len(),
136 output_num = transaction.outputs.len(),
137 "processing transaction"
138 );
139
140 for (idx, input) in transaction.inputs.iter().enumerate() {
141 trace!(
142 target: LOG_CLIENT_RECOVERY,
143 tx_hash = %transaction.tx_hash(),
144 idx,
145 module_id = input.module_instance_id(),
146 "found transaction input"
147 );
148
149 if let Some(own_input) = client_ctx.input_from_dyn(input) {
150 self.handle_input(client_ctx, idx, own_input, session_idx)
151 .await?;
152 }
153 }
154
155 for (out_idx, output) in transaction.outputs.iter().enumerate() {
156 trace!(
157 target: LOG_CLIENT_RECOVERY,
158 tx_hash = %transaction.tx_hash(),
159 idx = out_idx,
160 module_id = output.module_instance_id(),
161 "found transaction output"
162 );
163
164 if let Some(own_output) = client_ctx.output_from_dyn(output) {
165 let out_point = OutPoint {
166 txid: transaction.tx_hash(),
167 out_idx: out_idx as u64,
168 };
169
170 self.handle_output(client_ctx, out_point, own_output, session_idx)
171 .await?;
172 }
173 }
174
175 Ok(())
176 }
177
178 async fn handle_input(
182 &mut self,
183 _client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
184 _idx: usize,
185 _input: &<<<Self::Init as ClientModuleInit>::Module as ClientModule>::Common as ModuleCommon>::Input,
186 _session_idx: u64,
187 ) -> anyhow::Result<()> {
188 Ok(())
189 }
190
191 async fn handle_output(
195 &mut self,
196 _client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
197 _out_point: OutPoint,
198 _output: &<<<Self::Init as ClientModuleInit>::Module as ClientModule>::Common as ModuleCommon>::Output,
199 _session_idx: u64,
200 ) -> anyhow::Result<()> {
201 Ok(())
202 }
203
204 async fn pre_finalize(&mut self) -> anyhow::Result<()> {
207 Ok(())
208 }
209
210 async fn finalize_dbtx(&self, dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()>;
219}
220
221impl<Init> ClientModuleRecoverArgs<Init>
222where
223 Init: ClientModuleInit,
224{
225 pub async fn recover_from_history<Recovery>(
233 &self,
234 init: &Init,
235 snapshot: Option<&<<Init as ClientModuleInit>::Module as ClientModule>::Backup>,
236 ) -> anyhow::Result<()>
237 where
238 Recovery: RecoveryFromHistory<Init = Init> + std::fmt::Debug,
239 {
240 fn fetch_block_stream<'a>(
245 api: DynGlobalApi,
246 core_api_version: ApiVersion,
247 decoders: ModuleDecoderRegistry,
248 epoch_range: ops::Range<u64>,
249 task_group: TaskGroup,
250 ) -> impl futures::Stream<Item = Result<(u64, Vec<AcceptedItem>), ShuttingDownError>> + 'a
251 {
252 const PARALLISM_LEVEL: usize = 64;
254 const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
255 ApiVersion { major: 0, minor: 1 };
256
257 futures::stream::iter(epoch_range.clone())
258 .map(move |session_idx| {
259 let api = api.clone();
260 let decoders = decoders.clone();
261 let task_group = task_group.clone();
262
263 Box::pin(async move {
264 task_group.spawn_cancellable("recovery fetch block", async move {
268
269 info!(session_idx, "Fetching epoch");
270
271 let mut retry_sleep = Duration::from_millis(10);
272 let block = loop {
273 trace!(target: LOG_CLIENT_RECOVERY, session_idx, "Awaiting signed block");
274
275 let items_res = if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS {
276 api.await_block(session_idx, &decoders).await.map(|s| s.items)
277 } else {
278 api.get_session_status(session_idx, &decoders).await.map(|s| match s {
279 SessionStatus::Initial => panic!("Federation missing session that existed when we started recovery"),
280 SessionStatus::Pending(items) => items,
281 SessionStatus::Complete(s) => s.items,
282 })
283 };
284
285 match items_res {
286 Ok(block) => {
287 debug!(target: LOG_CLIENT_RECOVERY, session_idx, "Got signed session");
288 break block
289 },
290 Err(e) => {
291 const MAX_SLEEP: Duration = Duration::from_secs(120);
292
293 warn!(target: LOG_CLIENT_RECOVERY, e = %e, session_idx, "Error trying to fetch signed block");
294 if retry_sleep <= MAX_SLEEP {
297 retry_sleep = retry_sleep
298 + thread_rng().gen_range(Duration::ZERO..=retry_sleep);
299 }
300 fedimint_core::runtime::sleep(cmp::min(retry_sleep, MAX_SLEEP))
301 .await;
302 }
303 }
304 };
305
306 (session_idx, block)
307 }).await.expect("Can't fail")
308 })
309 })
310 .buffered(PARALLISM_LEVEL)
311 }
312
313 async fn make_progress<'a, Init, Recovery: RecoveryFromHistory<Init = Init>>(
316 client_ctx: &ClientContext<<Init as ClientModuleInit>::Module>,
317 common_state: &mut RecoveryFromHistoryCommon,
318 state: &mut Recovery,
319 block_stream: &mut (impl Stream<Item = Result<(u64, Vec<AcceptedItem>), ShuttingDownError>>
320 + Unpin),
321 ) -> anyhow::Result<()>
322 where
323 Init: ClientModuleInit,
324 {
325 const PROGRESS_SNAPSHOT_BLOCKS: u64 = 10;
333
334 let block_range = common_state.next_session
335 ..cmp::min(
336 common_state
337 .next_session
338 .wrapping_add(PROGRESS_SNAPSHOT_BLOCKS),
339 common_state.end_session,
340 );
341
342 debug!(
343 target: LOG_CLIENT_RECOVERY,
344 ?block_range,
345 "Processing blocks"
346 );
347
348 for _ in block_range {
349 let Some(res) = block_stream.next().await else {
350 break;
351 };
352
353 let (session_idx, accepted_items) = res?;
354
355 assert_eq!(common_state.next_session, session_idx);
356 state
357 .handle_session(client_ctx, session_idx, &accepted_items)
358 .await?;
359
360 common_state.next_session += 1;
361 }
362
363 Ok(())
364 }
365
366 let db = self.db();
367 let client_ctx = self.context();
368
369 if Recovery::load_finalized(&mut db.begin_transaction_nc().await)
370 .await
371 .unwrap_or_default()
372 {
373 warn!("Previously finalized, exiting");
394 return Ok(());
395 }
396 let current_session_count = client_ctx.global_api().session_count().await?;
397 debug!(target: LOG_CLIENT_RECOVERY, session_count = current_session_count, "Current session count");
398
399 let (mut state, mut common_state) =
400 if let Some((state, common_state)) = Recovery::load_dbtx(init, &mut db.begin_transaction_nc().await, self).await? {
403 (state, common_state)
404 } else {
405 let (state, start_session) = Recovery::new(init, self, snapshot).await?;
406
407 debug!(target: LOG_CLIENT_RECOVERY, start_session, "Recovery start session");
408 (state,
409 RecoveryFromHistoryCommon {
410 start_session,
411 next_session: start_session,
412 end_session: current_session_count + 1,
413 })
414 };
415
416 let block_stream_session_range = common_state.next_session..common_state.end_session;
417 debug!(target: LOG_CLIENT_RECOVERY, range = ?block_stream_session_range, "Starting block streaming");
418
419 let mut block_stream = fetch_block_stream(
420 self.api().clone(),
421 *self.core_api_version(),
422 client_ctx.decoders(),
423 block_stream_session_range,
424 self.task_group().clone(),
425 );
426 let client_ctx = self.context();
427
428 while common_state.next_session < common_state.end_session {
429 make_progress(
430 &client_ctx,
431 &mut common_state,
432 &mut state,
433 &mut block_stream,
434 )
435 .await?;
436
437 let mut dbtx = db.begin_transaction().await;
438 state.store_dbtx(&mut dbtx.to_ref_nc(), &common_state).await;
439 dbtx.commit_tx().await;
440
441 self.update_recovery_progress(RecoveryProgress {
442 complete: (common_state.next_session - common_state.start_session)
443 .try_into()
444 .unwrap_or(u32::MAX),
445 total: (common_state.end_session - common_state.start_session)
446 .try_into()
447 .unwrap_or(u32::MAX),
448 });
449 }
450
451 state.pre_finalize().await?;
452
453 let mut dbtx = db.begin_transaction().await;
454 state.store_dbtx(&mut dbtx.to_ref_nc(), &common_state).await;
455 dbtx.commit_tx().await;
456
457 debug!(
458 target: LOG_CLIENT_RECOVERY,
459 ?state,
460 "Finalizing restore"
461 );
462
463 db.autocommit(
464 |dbtx, _| {
465 let state = state.clone();
466 {
467 Box::pin(async move {
468 state.delete_dbtx(dbtx).await;
469 state.finalize_dbtx(dbtx).await?;
470 Recovery::store_finalized(dbtx, true).await;
471
472 Ok::<_, anyhow::Error>(())
473 })
474 }
475 },
476 None,
477 )
478 .await?;
479
480 Ok(())
481 }
482}