fedimint_client/module/init/
recovery.rs

1use std::time::Duration;
2use std::{cmp, ops};
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_core::db::DatabaseTransaction;
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::epoch::ConsensusItem;
8use fedimint_core::module::registry::ModuleDecoderRegistry;
9use fedimint_core::module::{ApiVersion, ModuleCommon};
10use fedimint_core::session_outcome::{AcceptedItem, SessionStatus};
11use fedimint_core::task::{MaybeSend, MaybeSync, ShuttingDownError, TaskGroup};
12use fedimint_core::transaction::Transaction;
13use fedimint_core::{apply, async_trait_maybe_send, OutPoint};
14use fedimint_logging::LOG_CLIENT_RECOVERY;
15use futures::{Stream, StreamExt as _};
16use rand::{thread_rng, Rng as _};
17use serde::{Deserialize, Serialize};
18use tracing::{debug, info, trace, warn};
19
20use super::{ClientModuleInit, ClientModuleRecoverArgs};
21use crate::module::recovery::RecoveryProgress;
22use crate::module::{ClientContext, ClientModule};
23
24#[derive(Debug, Clone, Eq, PartialEq, Encodable, Decodable, Serialize, Deserialize)]
25/// Common state tracked during recovery from history
26pub struct RecoveryFromHistoryCommon {
27    start_session: u64,
28    next_session: u64,
29    end_session: u64,
30}
31
32impl RecoveryFromHistoryCommon {
33    pub fn new(start_session: u64, next_session: u64, end_session: u64) -> Self {
34        Self {
35            start_session,
36            next_session,
37            end_session,
38        }
39    }
40}
41
42/// Module specific logic for [`ClientModuleRecoverArgs::recover_from_history`]
43///
44/// See [`ClientModuleRecoverArgs::recover_from_history`] for more information.
45#[apply(async_trait_maybe_send!)]
46pub trait RecoveryFromHistory: std::fmt::Debug + MaybeSend + MaybeSync + Clone {
47    /// [`ClientModuleInit`] of this recovery logic.
48    type Init: ClientModuleInit;
49
50    /// New empty state to start recovery from, and session number to start from
51    async fn new(
52        init: &Self::Init,
53        args: &ClientModuleRecoverArgs<Self::Init>,
54        snapshot: Option<&<<Self::Init as ClientModuleInit>::Module as ClientModule>::Backup>,
55    ) -> anyhow::Result<(Self, u64)>;
56
57    /// Try to load the existing state previously stored with
58    /// [`RecoveryFromHistory::store_dbtx`].
59    ///
60    /// Storing and restoring progress is used to save progress and
61    /// continue recovery if it was previously terminated before completion.
62    async fn load_dbtx(
63        init: &Self::Init,
64        dbtx: &mut DatabaseTransaction<'_>,
65        args: &ClientModuleRecoverArgs<Self::Init>,
66    ) -> anyhow::Result<Option<(Self, RecoveryFromHistoryCommon)>>;
67
68    /// Store the current recovery state in the database
69    ///
70    /// See [`Self::load_dbtx`].
71    async fn store_dbtx(
72        &self,
73        dbtx: &mut DatabaseTransaction<'_>,
74        common: &RecoveryFromHistoryCommon,
75    );
76
77    /// Delete the recovery state from the database
78    ///
79    /// See [`Self::load_dbtx`].
80    async fn delete_dbtx(&self, dbtx: &mut DatabaseTransaction<'_>);
81
82    /// Read the finalization status
83    ///
84    /// See [`Self::load_dbtx`].
85    async fn load_finalized(dbtx: &mut DatabaseTransaction<'_>) -> Option<bool>;
86
87    /// Store finalization status
88    ///
89    /// See [`Self::load_finalized`].
90    async fn store_finalized(dbtx: &mut DatabaseTransaction<'_>, state: bool);
91
92    /// Handle session outcome, adjusting the current state
93    ///
94    /// It is expected that most implementations don't need to override this
95    /// function, and override more granular ones instead (e.g.
96    /// [`Self::handle_input`] and/or [`Self::handle_output`]).
97    ///
98    /// The default implementation will loop through items in the
99    /// `session.items` and forward them one by one to respective functions
100    /// (see [`Self::handle_transaction`]).
101    async fn handle_session(
102        &mut self,
103        client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
104        session_idx: u64,
105        session_items: &Vec<AcceptedItem>,
106    ) -> anyhow::Result<()> {
107        for accepted_item in session_items {
108            if let ConsensusItem::Transaction(ref transaction) = accepted_item.item {
109                self.handle_transaction(client_ctx, transaction, session_idx)
110                    .await?;
111            }
112        }
113        Ok(())
114    }
115
116    /// Handle session outcome, adjusting the current state
117    ///
118    /// It is expected that most implementations don't need to override this
119    /// function, and override more granular ones instead (e.g.
120    /// [`Self::handle_input`] and/or [`Self::handle_output`]).
121    ///
122    /// The default implementation will loop through inputs and outputs
123    /// of the transaction, filter and downcast ones matching current module
124    /// and forward them one by one to respective functions
125    /// (e.g. [`Self::handle_input`], [`Self::handle_output`]).
126    async fn handle_transaction(
127        &mut self,
128        client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
129        transaction: &Transaction,
130        session_idx: u64,
131    ) -> anyhow::Result<()> {
132        trace!(
133            target: LOG_CLIENT_RECOVERY,
134            tx_hash = %transaction.tx_hash(),
135            input_num = transaction.inputs.len(),
136            output_num = transaction.outputs.len(),
137            "processing transaction"
138        );
139
140        for (idx, input) in transaction.inputs.iter().enumerate() {
141            trace!(
142                target: LOG_CLIENT_RECOVERY,
143                tx_hash = %transaction.tx_hash(),
144                idx,
145                module_id = input.module_instance_id(),
146                "found transaction input"
147            );
148
149            if let Some(own_input) = client_ctx.input_from_dyn(input) {
150                self.handle_input(client_ctx, idx, own_input, session_idx)
151                    .await?;
152            }
153        }
154
155        for (out_idx, output) in transaction.outputs.iter().enumerate() {
156            trace!(
157                target: LOG_CLIENT_RECOVERY,
158                tx_hash = %transaction.tx_hash(),
159                idx = out_idx,
160                module_id = output.module_instance_id(),
161                "found transaction output"
162            );
163
164            if let Some(own_output) = client_ctx.output_from_dyn(output) {
165                let out_point = OutPoint {
166                    txid: transaction.tx_hash(),
167                    out_idx: out_idx as u64,
168                };
169
170                self.handle_output(client_ctx, out_point, own_output, session_idx)
171                    .await?;
172            }
173        }
174
175        Ok(())
176    }
177
178    /// Handle transaction input, adjusting the current state
179    ///
180    /// Default implementation does nothing.
181    async fn handle_input(
182        &mut self,
183        _client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
184        _idx: usize,
185        _input: &<<<Self::Init as ClientModuleInit>::Module as ClientModule>::Common as ModuleCommon>::Input,
186        _session_idx: u64,
187    ) -> anyhow::Result<()> {
188        Ok(())
189    }
190
191    /// Handle transaction output, adjusting the current state
192    ///
193    /// Default implementation does nothing.
194    async fn handle_output(
195        &mut self,
196        _client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
197        _out_point: OutPoint,
198        _output: &<<<Self::Init as ClientModuleInit>::Module as ClientModule>::Common as ModuleCommon>::Output,
199        _session_idx: u64,
200    ) -> anyhow::Result<()> {
201        Ok(())
202    }
203
204    /// Called before `finalize_dbtx`, to allow final state changes outside
205    /// of retriable database transaction.
206    async fn pre_finalize(&mut self) -> anyhow::Result<()> {
207        Ok(())
208    }
209
210    /// Finalize the recovery converting the tracked state to final
211    /// changes in the database.
212    ///
213    /// This is the only place during recovery where module gets a chance to
214    /// create state machines, etc.
215    ///
216    /// Notably this function is running in a database-autocommit wrapper, so
217    /// might be called again on database commit failure.
218    async fn finalize_dbtx(&self, dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()>;
219}
220
221impl<Init> ClientModuleRecoverArgs<Init>
222where
223    Init: ClientModuleInit,
224{
225    /// Run recover of a module from federation consensus history
226    ///
227    /// It is expected that most modules will implement their recovery
228    /// by following Federation consensus history to restore their
229    /// state. This function implement such a recovery by being generic
230    /// over [`RecoveryFromHistory`] trait, which provides module-specific
231    /// parts of recovery logic.
232    pub async fn recover_from_history<Recovery>(
233        &self,
234        init: &Init,
235        snapshot: Option<&<<Init as ClientModuleInit>::Module as ClientModule>::Backup>,
236    ) -> anyhow::Result<()>
237    where
238        Recovery: RecoveryFromHistory<Init = Init> + std::fmt::Debug,
239    {
240        /// Fetch epochs in a given range and send them over `sender`
241        ///
242        /// Since WASM's `spawn` does not support join handles, we indicate
243        /// errors via `sender` itself.
244        fn fetch_block_stream<'a>(
245            api: DynGlobalApi,
246            core_api_version: ApiVersion,
247            decoders: ModuleDecoderRegistry,
248            epoch_range: ops::Range<u64>,
249            task_group: TaskGroup,
250        ) -> impl futures::Stream<Item = Result<(u64, Vec<AcceptedItem>), ShuttingDownError>> + 'a
251        {
252            // How many request for blocks to run in parallel (streaming).
253            const PARALLISM_LEVEL: usize = 64;
254            const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
255                ApiVersion { major: 0, minor: 1 };
256
257            futures::stream::iter(epoch_range.clone())
258                .map(move |session_idx| {
259                    let api = api.clone();
260                    let decoders = decoders.clone();
261                    let task_group = task_group.clone();
262
263                    Box::pin(async move {
264                        // NOTE: Each block is fetched in a spawned task. This avoids a footgun
265                        // of stuff in streams not making any progress when the stream itself
266                        // is not being polled, and possibly can increase the fetching performance.
267                        task_group.spawn_cancellable("recovery fetch block", async move {
268
269                            info!(session_idx, "Fetching epoch");
270
271                            let mut retry_sleep = Duration::from_millis(10);
272                            let block = loop {
273                                trace!(target: LOG_CLIENT_RECOVERY, session_idx, "Awaiting signed block");
274
275                                let items_res = if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS {
276                                    api.await_block(session_idx, &decoders).await.map(|s| s.items)
277                                } else {
278                                    api.get_session_status(session_idx, &decoders).await.map(|s| match s {
279                                        SessionStatus::Initial => panic!("Federation missing session that existed when we started recovery"),
280                                        SessionStatus::Pending(items) => items,
281                                        SessionStatus::Complete(s) => s.items,
282                                    })
283                                };
284
285                                match items_res {
286                                    Ok(block) => {
287                                        debug!(target: LOG_CLIENT_RECOVERY, session_idx, "Got signed session");
288                                        break block
289                                    },
290                                    Err(e) => {
291                                        const MAX_SLEEP: Duration = Duration::from_secs(120);
292
293                                        warn!(target: LOG_CLIENT_RECOVERY, e = %e, session_idx, "Error trying to fetch signed block");
294                                        // We don't want PARALLISM_LEVEL tasks hammering Federation
295                                        // with requests, so max sleep is significant
296                                        if retry_sleep <= MAX_SLEEP {
297                                            retry_sleep = retry_sleep
298                                                + thread_rng().gen_range(Duration::ZERO..=retry_sleep);
299                                        }
300                                        fedimint_core::runtime::sleep(cmp::min(retry_sleep, MAX_SLEEP))
301                                            .await;
302                                    }
303                                }
304                            };
305
306                            (session_idx, block)
307                        }).await.expect("Can't fail")
308                    })
309                })
310                .buffered(PARALLISM_LEVEL)
311        }
312
313        /// Make enough progress to justify saving a state snapshot
314
315        async fn make_progress<'a, Init, Recovery: RecoveryFromHistory<Init = Init>>(
316            client_ctx: &ClientContext<<Init as ClientModuleInit>::Module>,
317            common_state: &mut RecoveryFromHistoryCommon,
318            state: &mut Recovery,
319            block_stream: &mut (impl Stream<Item = Result<(u64, Vec<AcceptedItem>), ShuttingDownError>>
320                      + Unpin),
321        ) -> anyhow::Result<()>
322        where
323            Init: ClientModuleInit,
324        {
325            /// the amount of blocks after which we save progress in the
326            /// database (return from this function)
327            ///
328            /// TODO: Instead of a fixed range of session
329            /// indexes, make the loop time-based, so the amount of
330            /// progress we can loose on termination is time-bound,
331            /// and thus more adaptive.
332            const PROGRESS_SNAPSHOT_BLOCKS: u64 = 10;
333
334            let block_range = common_state.next_session
335                ..cmp::min(
336                    common_state
337                        .next_session
338                        .wrapping_add(PROGRESS_SNAPSHOT_BLOCKS),
339                    common_state.end_session,
340                );
341
342            debug!(
343                target: LOG_CLIENT_RECOVERY,
344                ?block_range,
345                "Processing blocks"
346            );
347
348            for _ in block_range {
349                let Some(res) = block_stream.next().await else {
350                    break;
351                };
352
353                let (session_idx, accepted_items) = res?;
354
355                assert_eq!(common_state.next_session, session_idx);
356                state
357                    .handle_session(client_ctx, session_idx, &accepted_items)
358                    .await?;
359
360                common_state.next_session += 1;
361            }
362
363            Ok(())
364        }
365
366        let db = self.db();
367        let client_ctx = self.context();
368
369        if Recovery::load_finalized(&mut db.begin_transaction_nc().await)
370            .await
371            .unwrap_or_default()
372        {
373            // In rare circumstances, the finalization could complete, yet the completion
374            // of `recover` function not yet persisted in the database. So
375            // it's possible that `recovery` would be called again on an
376            // already finalized state. Because of this we store a
377            // finalization marker in the same dbtx as the finalization itself, detect this
378            // here and exit early.
379            //
380            // Example sequence how this happens (if `finalize_dbtx` didn't exist):
381            //
382            // 0. module recovery is complete and progress saved to the db
383            // 1. `dbtx` with finalization commits, progress deleted, completing recovery on
384            //    the client module side
385            // 2. client crashes/gets terminated (tricky corner case)
386            // 3. client starts again
387            // 4. client never observed/persisted that the module finished recovery, so
388            //    calls module recovery again
389            // 5. module doesn't see progress, starts recovery again, eventually completes
390            //    again and moves to finalization
391            // 6. module runs finalization again and probably fails because it's actually
392            //    not idempotent and doesn't expect the already existing state.
393            warn!("Previously finalized, exiting");
394            return Ok(());
395        }
396        let current_session_count = client_ctx.global_api().session_count().await?;
397        debug!(target: LOG_CLIENT_RECOVERY, session_count = current_session_count, "Current session count");
398
399        let (mut state, mut common_state) =
400            // TODO: if load fails (e.g. module didn't migrate an existing recovery state and failed to decode it),
401            // we could just ... start from scratch? at least being able to force this behavior might be useful
402            if let Some((state, common_state)) = Recovery::load_dbtx(init, &mut db.begin_transaction_nc().await, self).await? {
403                (state, common_state)
404            } else {
405                let (state, start_session) = Recovery::new(init, self, snapshot).await?;
406
407                debug!(target: LOG_CLIENT_RECOVERY, start_session, "Recovery start session");
408                (state,
409                RecoveryFromHistoryCommon {
410                    start_session,
411                    next_session: start_session,
412                    end_session: current_session_count + 1,
413                })
414            };
415
416        let block_stream_session_range = common_state.next_session..common_state.end_session;
417        debug!(target: LOG_CLIENT_RECOVERY, range = ?block_stream_session_range, "Starting block streaming");
418
419        let mut block_stream = fetch_block_stream(
420            self.api().clone(),
421            *self.core_api_version(),
422            client_ctx.decoders(),
423            block_stream_session_range,
424            self.task_group().clone(),
425        );
426        let client_ctx = self.context();
427
428        while common_state.next_session < common_state.end_session {
429            make_progress(
430                &client_ctx,
431                &mut common_state,
432                &mut state,
433                &mut block_stream,
434            )
435            .await?;
436
437            let mut dbtx = db.begin_transaction().await;
438            state.store_dbtx(&mut dbtx.to_ref_nc(), &common_state).await;
439            dbtx.commit_tx().await;
440
441            self.update_recovery_progress(RecoveryProgress {
442                complete: (common_state.next_session - common_state.start_session)
443                    .try_into()
444                    .unwrap_or(u32::MAX),
445                total: (common_state.end_session - common_state.start_session)
446                    .try_into()
447                    .unwrap_or(u32::MAX),
448            });
449        }
450
451        state.pre_finalize().await?;
452
453        let mut dbtx = db.begin_transaction().await;
454        state.store_dbtx(&mut dbtx.to_ref_nc(), &common_state).await;
455        dbtx.commit_tx().await;
456
457        debug!(
458            target: LOG_CLIENT_RECOVERY,
459            ?state,
460            "Finalizing restore"
461        );
462
463        db.autocommit(
464            |dbtx, _| {
465                let state = state.clone();
466                {
467                    Box::pin(async move {
468                        state.delete_dbtx(dbtx).await;
469                        state.finalize_dbtx(dbtx).await?;
470                        Recovery::store_finalized(dbtx, true).await;
471
472                        Ok::<_, anyhow::Error>(())
473                    })
474                }
475            },
476            None,
477        )
478        .await?;
479
480        Ok(())
481    }
482}