1use std::collections::BTreeMap;
2use std::time::Duration;
3use std::{cmp, ops};
4
5use bitcoin::secp256k1::PublicKey;
6use fedimint_api_client::api::{
7 DynGlobalApi, VERSION_THAT_INTRODUCED_GET_SESSION_STATUS,
8 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
9};
10use fedimint_core::db::DatabaseTransaction;
11use fedimint_core::encoding::{Decodable, Encodable};
12use fedimint_core::epoch::ConsensusItem;
13use fedimint_core::module::registry::ModuleDecoderRegistry;
14use fedimint_core::module::{ApiVersion, ModuleCommon};
15use fedimint_core::session_outcome::{AcceptedItem, SessionStatus};
16use fedimint_core::task::{MaybeSend, MaybeSync, ShuttingDownError, TaskGroup};
17use fedimint_core::transaction::Transaction;
18use fedimint_core::util::FmtCompactAnyhow as _;
19use fedimint_core::{apply, async_trait_maybe_send, OutPoint, PeerId};
20use fedimint_logging::LOG_CLIENT_RECOVERY;
21use futures::{Stream, StreamExt as _};
22use rand::{thread_rng, Rng as _};
23use serde::{Deserialize, Serialize};
24use tracing::{debug, trace, warn};
25
26use super::{ClientModuleInit, ClientModuleRecoverArgs};
27use crate::module::recovery::RecoveryProgress;
28use crate::module::{ClientContext, ClientModule};
29
30#[derive(Debug, Clone, Eq, PartialEq, Encodable, Decodable, Serialize, Deserialize)]
31pub struct RecoveryFromHistoryCommon {
33 start_session: u64,
34 next_session: u64,
35 end_session: u64,
36}
37
38impl RecoveryFromHistoryCommon {
39 pub fn new(start_session: u64, next_session: u64, end_session: u64) -> Self {
40 Self {
41 start_session,
42 next_session,
43 end_session,
44 }
45 }
46}
47
48#[apply(async_trait_maybe_send!)]
52pub trait RecoveryFromHistory: std::fmt::Debug + MaybeSend + MaybeSync + Clone {
53 type Init: ClientModuleInit;
55
56 async fn new(
58 init: &Self::Init,
59 args: &ClientModuleRecoverArgs<Self::Init>,
60 snapshot: Option<&<<Self::Init as ClientModuleInit>::Module as ClientModule>::Backup>,
61 ) -> anyhow::Result<(Self, u64)>;
62
63 async fn load_dbtx(
69 init: &Self::Init,
70 dbtx: &mut DatabaseTransaction<'_>,
71 args: &ClientModuleRecoverArgs<Self::Init>,
72 ) -> anyhow::Result<Option<(Self, RecoveryFromHistoryCommon)>>;
73
74 async fn store_dbtx(
78 &self,
79 dbtx: &mut DatabaseTransaction<'_>,
80 common: &RecoveryFromHistoryCommon,
81 );
82
83 async fn delete_dbtx(&self, dbtx: &mut DatabaseTransaction<'_>);
87
88 async fn load_finalized(dbtx: &mut DatabaseTransaction<'_>) -> Option<bool>;
92
93 async fn store_finalized(dbtx: &mut DatabaseTransaction<'_>, state: bool);
97
98 async fn handle_session(
108 &mut self,
109 client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
110 session_idx: u64,
111 session_items: &Vec<AcceptedItem>,
112 ) -> anyhow::Result<()> {
113 for accepted_item in session_items {
114 if let ConsensusItem::Transaction(ref transaction) = accepted_item.item {
115 self.handle_transaction(client_ctx, transaction, session_idx)
116 .await?;
117 }
118 }
119 Ok(())
120 }
121
122 async fn handle_transaction(
133 &mut self,
134 client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
135 transaction: &Transaction,
136 session_idx: u64,
137 ) -> anyhow::Result<()> {
138 trace!(
139 target: LOG_CLIENT_RECOVERY,
140 tx_hash = %transaction.tx_hash(),
141 input_num = transaction.inputs.len(),
142 output_num = transaction.outputs.len(),
143 "processing transaction"
144 );
145
146 for (idx, input) in transaction.inputs.iter().enumerate() {
147 trace!(
148 target: LOG_CLIENT_RECOVERY,
149 tx_hash = %transaction.tx_hash(),
150 idx,
151 module_id = input.module_instance_id(),
152 "found transaction input"
153 );
154
155 if let Some(own_input) = client_ctx.input_from_dyn(input) {
156 self.handle_input(client_ctx, idx, own_input, session_idx)
157 .await?;
158 }
159 }
160
161 for (out_idx, output) in transaction.outputs.iter().enumerate() {
162 trace!(
163 target: LOG_CLIENT_RECOVERY,
164 tx_hash = %transaction.tx_hash(),
165 idx = out_idx,
166 module_id = output.module_instance_id(),
167 "found transaction output"
168 );
169
170 if let Some(own_output) = client_ctx.output_from_dyn(output) {
171 let out_point = OutPoint {
172 txid: transaction.tx_hash(),
173 out_idx: out_idx as u64,
174 };
175
176 self.handle_output(client_ctx, out_point, own_output, session_idx)
177 .await?;
178 }
179 }
180
181 Ok(())
182 }
183
184 async fn handle_input(
188 &mut self,
189 _client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
190 _idx: usize,
191 _input: &<<<Self::Init as ClientModuleInit>::Module as ClientModule>::Common as ModuleCommon>::Input,
192 _session_idx: u64,
193 ) -> anyhow::Result<()> {
194 Ok(())
195 }
196
197 async fn handle_output(
201 &mut self,
202 _client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
203 _out_point: OutPoint,
204 _output: &<<<Self::Init as ClientModuleInit>::Module as ClientModule>::Common as ModuleCommon>::Output,
205 _session_idx: u64,
206 ) -> anyhow::Result<()> {
207 Ok(())
208 }
209
210 async fn pre_finalize(&mut self) -> anyhow::Result<()> {
213 Ok(())
214 }
215
216 async fn finalize_dbtx(&self, dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()>;
225}
226
227impl<Init> ClientModuleRecoverArgs<Init>
228where
229 Init: ClientModuleInit,
230{
231 pub async fn recover_from_history<Recovery>(
239 &self,
240 init: &Init,
241 snapshot: Option<&<<Init as ClientModuleInit>::Module as ClientModule>::Backup>,
242 ) -> anyhow::Result<()>
243 where
244 Recovery: RecoveryFromHistory<Init = Init> + std::fmt::Debug,
245 {
246 fn fetch_block_stream<'a>(
251 api: DynGlobalApi,
252 core_api_version: ApiVersion,
253 decoders: ModuleDecoderRegistry,
254 epoch_range: ops::Range<u64>,
255 broadcast_public_keys: Option<BTreeMap<PeerId, PublicKey>>,
256 task_group: TaskGroup,
257 ) -> impl futures::Stream<Item = Result<(u64, Vec<AcceptedItem>), ShuttingDownError>> + 'a
258 {
259 let parallelism_level =
261 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
262 64
263 } else {
264 128
265 };
266
267 futures::stream::iter(epoch_range.clone())
268 .map(move |session_idx| {
269 let api = api.clone();
270 let decoders = decoders.clone().with_fallback();
273 let task_group = task_group.clone();
274 let broadcast_public_keys = broadcast_public_keys.clone();
275
276 Box::pin(async move {
277 task_group.spawn_cancellable("recovery fetch block", async move {
281
282 let mut retry_sleep = Duration::from_millis(10);
283 let block = loop {
284 trace!(target: LOG_CLIENT_RECOVERY, session_idx, "Awaiting signed block");
285
286 let items_res = if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS {
287 api.await_block(session_idx, &decoders).await.map(|s| s.items)
288 } else {
289 api.get_session_status(session_idx, &decoders, core_api_version, broadcast_public_keys.as_ref()).await.map(|s| match s {
290 SessionStatus::Initial => panic!("Federation missing session that existed when we started recovery"),
291 SessionStatus::Pending(items) => items,
292 SessionStatus::Complete(s) => s.items,
293 })
294 };
295
296 match items_res {
297 Ok(block) => {
298 trace!(target: LOG_CLIENT_RECOVERY, session_idx, "Got signed session");
299 break block
300 },
301 Err(err) => {
302 const MAX_SLEEP: Duration = Duration::from_secs(120);
303
304 warn!(target: LOG_CLIENT_RECOVERY, err = %err.fmt_compact_anyhow(), session_idx, "Error trying to fetch signed block");
305 if retry_sleep <= MAX_SLEEP {
308 retry_sleep = retry_sleep
309 + thread_rng().gen_range(Duration::ZERO..=retry_sleep);
310 }
311 fedimint_core::runtime::sleep(cmp::min(retry_sleep, MAX_SLEEP))
312 .await;
313 }
314 }
315 };
316
317 (session_idx, block)
318 }).await.expect("Can't fail")
319 })
320 })
321 .buffered(parallelism_level)
322 }
323
324 async fn make_progress<'a, Init, Recovery: RecoveryFromHistory<Init = Init>>(
326 client_ctx: &ClientContext<<Init as ClientModuleInit>::Module>,
327 common_state: &mut RecoveryFromHistoryCommon,
328 state: &mut Recovery,
329 block_stream: &mut (impl Stream<Item = Result<(u64, Vec<AcceptedItem>), ShuttingDownError>>
330 + Unpin),
331 ) -> anyhow::Result<()>
332 where
333 Init: ClientModuleInit,
334 {
335 const PROGRESS_SNAPSHOT_BLOCKS: u64 = 5000;
340
341 let start = fedimint_core::time::now();
342
343 let block_range = common_state.next_session
344 ..cmp::min(
345 common_state
346 .next_session
347 .wrapping_add(PROGRESS_SNAPSHOT_BLOCKS),
348 common_state.end_session,
349 );
350
351 for _ in block_range {
352 let Some(res) = block_stream.next().await else {
353 break;
354 };
355
356 let (session_idx, accepted_items) = res?;
357
358 assert_eq!(common_state.next_session, session_idx);
359 state
360 .handle_session(client_ctx, session_idx, &accepted_items)
361 .await?;
362
363 common_state.next_session += 1;
364
365 if Duration::from_secs(10)
366 < fedimint_core::time::now()
367 .duration_since(start)
368 .unwrap_or_default()
369 {
370 break;
371 }
372 }
373
374 Ok(())
375 }
376
377 let db = self.db();
378 let client_ctx = self.context();
379
380 if Recovery::load_finalized(&mut db.begin_transaction_nc().await)
381 .await
382 .unwrap_or_default()
383 {
384 warn!(
405 target: LOG_CLIENT_RECOVERY,
406 "Previously finalized, exiting"
407 );
408 return Ok(());
409 }
410 let current_session_count = client_ctx.global_api().session_count().await?;
411 debug!(target: LOG_CLIENT_RECOVERY, session_count = current_session_count, "Current session count");
412
413 let (mut state, mut common_state) =
414 if let Some((state, common_state)) = Recovery::load_dbtx(init, &mut db.begin_transaction_nc().await, self).await? {
417 (state, common_state)
418 } else {
419 let (state, start_session) = Recovery::new(init, self, snapshot).await?;
420
421 debug!(target: LOG_CLIENT_RECOVERY, start_session, "Recovery start session");
422 (state,
423 RecoveryFromHistoryCommon {
424 start_session,
425 next_session: start_session,
426 end_session: current_session_count + 1,
427 })
428 };
429
430 let block_stream_session_range = common_state.next_session..common_state.end_session;
431 debug!(target: LOG_CLIENT_RECOVERY, range = ?block_stream_session_range, "Starting block streaming");
432
433 let mut block_stream = fetch_block_stream(
434 self.api().clone(),
435 *self.core_api_version(),
436 client_ctx.decoders(),
437 block_stream_session_range,
438 client_ctx
439 .get_config()
440 .await
441 .global
442 .broadcast_public_keys
443 .clone(),
444 self.task_group().clone(),
445 );
446 let client_ctx = self.context();
447
448 while common_state.next_session < common_state.end_session {
449 make_progress(
450 &client_ctx,
451 &mut common_state,
452 &mut state,
453 &mut block_stream,
454 )
455 .await?;
456
457 let mut dbtx = db.begin_transaction().await;
458 state.store_dbtx(&mut dbtx.to_ref_nc(), &common_state).await;
459 dbtx.commit_tx().await;
460
461 self.update_recovery_progress(RecoveryProgress {
462 complete: (common_state.next_session - common_state.start_session)
463 .try_into()
464 .unwrap_or(u32::MAX),
465 total: (common_state.end_session - common_state.start_session)
466 .try_into()
467 .unwrap_or(u32::MAX),
468 });
469 }
470
471 state.pre_finalize().await?;
472
473 let mut dbtx = db.begin_transaction().await;
474 state.store_dbtx(&mut dbtx.to_ref_nc(), &common_state).await;
475 dbtx.commit_tx().await;
476
477 debug!(
478 target: LOG_CLIENT_RECOVERY,
479 ?state,
480 "Finalizing restore"
481 );
482
483 db.autocommit(
484 |dbtx, _| {
485 let state = state.clone();
486 {
487 Box::pin(async move {
488 state.delete_dbtx(dbtx).await;
489 state.finalize_dbtx(dbtx).await?;
490 Recovery::store_finalized(dbtx, true).await;
491
492 Ok::<_, anyhow::Error>(())
493 })
494 }
495 },
496 None,
497 )
498 .await?;
499
500 Ok(())
501 }
502}