fedimint_client/sm/
executor.rs

1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Read, Write};
5use std::mem;
6use std::sync::Arc;
7use std::time::SystemTime;
8
9use anyhow::anyhow;
10use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
11use fedimint_core::db::{
12    AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
13    IDatabaseTransactionOpsCoreTyped,
14};
15use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
16use fedimint_core::fmt_utils::AbbreviateJson;
17use fedimint_core::maybe_add_send_sync;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::task::TaskGroup;
20use fedimint_core::util::BoxFuture;
21use fedimint_logging::LOG_CLIENT_REACTOR;
22use futures::future::{self, select_all};
23use futures::stream::{FuturesUnordered, StreamExt};
24use tokio::select;
25use tokio::sync::{mpsc, oneshot};
26use tracing::{debug, error, info, trace, warn, Instrument};
27
28use super::state::StateTransitionFunction;
29use crate::sm::notifier::Notifier;
30use crate::sm::state::{DynContext, DynState};
31use crate::sm::{ClientSMDatabaseTransaction, State, StateTransition};
32use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
33
34/// After how many attempts a DB transaction is aborted with an error
35const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
36
37pub type ContextGen =
38    Arc<maybe_add_send_sync!(dyn Fn(ModuleInstanceId, OperationId) -> DynGlobalClientContext)>;
39
40/// Prefixes for executor DB entries
41enum ExecutorDbPrefixes {
42    /// See [`ActiveStateKey`]
43    ActiveStates = 0xa1,
44    /// See [`InactiveStateKey`]
45    InactiveStates = 0xa2,
46}
47
48/// Executor that drives forward state machines under its management.
49///
50/// Each state transition is atomic and supposed to be idempotent such that a
51/// stop/crash of the executor at any point can be recovered from on restart.
52/// The executor is aware of the concept of Fedimint modules and can give state
53/// machines a different [execution context](super::state::Context) depending on
54/// the owning module, making it very flexible.
55#[derive(Clone, Debug)]
56pub struct Executor {
57    inner: Arc<ExecutorInner>,
58}
59
60struct ExecutorInner {
61    db: Database,
62    state: std::sync::RwLock<ExecutorState>,
63    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
64    valid_module_ids: BTreeSet<ModuleInstanceId>,
65    notifier: Notifier,
66    /// Any time executor should notice state machine update (e.g. because it
67    /// was created), it's must be sent through this channel for it to notice.
68    sm_update_tx: mpsc::UnboundedSender<DynState>,
69    client_task_group: TaskGroup,
70}
71
72enum ExecutorState {
73    Unstarted {
74        sm_update_rx: mpsc::UnboundedReceiver<DynState>,
75    },
76    Running {
77        context_gen: ContextGen,
78        shutdown_sender: oneshot::Sender<()>,
79    },
80    Stopped,
81}
82
83impl ExecutorState {
84    /// Starts the executor, returning a receiver that will be signalled when
85    /// the executor is stopped and a receiver for state machine updates.
86    /// Returns `None` if the executor has already been started and/or stopped.
87    fn start(
88        &mut self,
89        context: ContextGen,
90    ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
91        let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
92
93        let previous_state = mem::replace(
94            self,
95            ExecutorState::Running {
96                context_gen: context,
97                shutdown_sender,
98            },
99        );
100
101        if let ExecutorState::Unstarted { sm_update_rx } = previous_state {
102            Some((shutdown_receiver, sm_update_rx))
103        } else {
104            // Replace the previous state, undoing the `mem::replace` above.
105            *self = previous_state;
106
107            debug!("Executor already started, ignoring start request");
108            None
109        }
110    }
111
112    /// Stops the executor, returning `Some(())` if the executor was running and
113    /// `None` if it was in any other state.
114    fn stop(&mut self) -> Option<()> {
115        let previous_state = mem::replace(self, ExecutorState::Stopped);
116
117        if let ExecutorState::Running {
118            shutdown_sender, ..
119        } = previous_state
120        {
121            if shutdown_sender.send(()).is_err() {
122                warn!("Failed to send shutdown signal to executor, already dead?");
123            }
124            Some(())
125        } else {
126            // Replace the previous state, undoing the `mem::replace` above.
127            *self = previous_state;
128
129            debug!("Executor not running, ignoring stop request");
130            None
131        }
132    }
133
134    fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
135        let ExecutorState::Running { context_gen, .. } = self else {
136            return None;
137        };
138        Some(context_gen(
139            state.module_instance_id(),
140            state.operation_id(),
141        ))
142    }
143}
144
145/// Builder to which module clients can be attached and used to build an
146/// [`Executor`] supporting these.
147#[derive(Debug, Default)]
148pub struct ExecutorBuilder {
149    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
150    valid_module_ids: BTreeSet<ModuleInstanceId>,
151}
152
153impl Executor {
154    /// Creates an [`ExecutorBuilder`]
155    pub fn builder() -> ExecutorBuilder {
156        ExecutorBuilder::default()
157    }
158
159    pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
160        self.inner.get_active_states().await
161    }
162
163    /// Adds a number of state machines to the executor atomically. They will be
164    /// driven to completion automatically in the background.
165    ///
166    /// **Attention**: do not use before background task is started!
167    // TODO: remove warning once finality is an inherent state attribute
168    pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
169        self.inner
170            .db
171            .autocommit(
172                |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
173                MAX_DB_ATTEMPTS,
174            )
175            .await
176            .map_err(|e| match e {
177                AutocommitError::CommitFailed {
178                    last_error,
179                    attempts,
180                } => last_error.context(format!("Failed to commit after {attempts} attempts")),
181                AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
182            })?;
183
184        // TODO: notify subscribers to state changes?
185
186        Ok(())
187    }
188
189    /// Adds a number of state machines to the executor atomically with other DB
190    /// changes is `dbtx`. See [`Executor::add_state_machines`] for more
191    /// details.
192    ///
193    /// ## Panics
194    /// If called before background task is started using
195    /// [`Executor::start_executor`]!
196    // TODO: remove warning once finality is an inherent state attribute
197    pub async fn add_state_machines_dbtx(
198        &self,
199        dbtx: &mut DatabaseTransaction<'_>,
200        states: Vec<DynState>,
201    ) -> AddStateMachinesResult {
202        for state in states {
203            if !self
204                .inner
205                .valid_module_ids
206                .contains(&state.module_instance_id())
207            {
208                return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
209            }
210
211            let is_active_state = dbtx
212                .get_value(&ActiveStateKey::from_state(state.clone()))
213                .await
214                .is_some();
215            let is_inactive_state = dbtx
216                .get_value(&InactiveStateKey::from_state(state.clone()))
217                .await
218                .is_some();
219
220            if is_active_state || is_inactive_state {
221                return Err(AddStateMachinesError::StateAlreadyExists);
222            }
223
224            // In case of recovery functions, the module itself is not yet initialized,
225            // so we can't check if the state is terminal. However the
226            // [`Self::get_transitions_for`] function will double check and
227            // deactivate any terminal states that would slip past this check.
228            if let Some(module_context) =
229                self.inner.module_contexts.get(&state.module_instance_id())
230            {
231                if let Some(context) = self
232                    .inner
233                    .state
234                    .read()
235                    .expect("locking failed")
236                    .gen_context(&state)
237                {
238                    if state.is_terminal(module_context, &context) {
239                        return Err(AddStateMachinesError::Other(anyhow!(
240                        "State is already terminal, adding it to the executor doesn't make sense."
241                    )));
242                    }
243                } else {
244                    warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
245                }
246            }
247
248            dbtx.insert_new_entry(
249                &ActiveStateKey::from_state(state.clone()),
250                &ActiveStateMeta::default(),
251            )
252            .await;
253
254            let notify_sender = self.inner.notifier.sender();
255            let sm_updates_tx = self.inner.sm_update_tx.clone();
256            dbtx.on_commit(move || {
257                notify_sender.notify(state.clone());
258                let _ = sm_updates_tx.send(state);
259            });
260        }
261
262        Ok(())
263    }
264
265    /// **Mostly used for testing**
266    ///
267    /// Check if state exists in the database as part of an actively running
268    /// state machine.
269    pub async fn contains_active_state<S: State>(
270        &self,
271        instance: ModuleInstanceId,
272        state: S,
273    ) -> bool {
274        let state = DynState::from_typed(instance, state);
275        self.inner
276            .get_active_states()
277            .await
278            .into_iter()
279            .any(|(s, _)| s == state)
280    }
281
282    // TODO: unify querying fns
283    /// **Mostly used for testing**
284    ///
285    /// Check if state exists in the database as inactive. If the state is
286    /// terminal it means the corresponding state machine finished its
287    /// execution. If the state is non-terminal it means the state machine was
288    /// in that state at some point but moved on since then.
289    pub async fn contains_inactive_state<S: State>(
290        &self,
291        instance: ModuleInstanceId,
292        state: S,
293    ) -> bool {
294        let state = DynState::from_typed(instance, state);
295        self.inner
296            .get_inactive_states()
297            .await
298            .into_iter()
299            .any(|(s, _)| s == state)
300    }
301
302    pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
303        self.inner
304            .db
305            .wait_key_exists(&InactiveStateKey::from_state(state))
306            .await
307    }
308
309    pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
310        self.inner
311            .db
312            .wait_key_exists(&ActiveStateKey::from_state(state))
313            .await
314    }
315
316    /// Only meant for debug tooling
317    pub async fn get_operation_states(
318        &self,
319        operation_id: OperationId,
320    ) -> (
321        Vec<(DynState, ActiveStateMeta)>,
322        Vec<(DynState, InactiveStateMeta)>,
323    ) {
324        let mut dbtx = self.inner.db.begin_transaction_nc().await;
325        let active_states: Vec<_> = dbtx
326            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
327            .await
328            .map(|(active_key, active_meta)| (active_key.state, active_meta))
329            .collect()
330            .await;
331        let inactive_states: Vec<_> = dbtx
332            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
333            .await
334            .map(|(active_key, inactive_meta)| (active_key.state, inactive_meta))
335            .collect()
336            .await;
337
338        (active_states, inactive_states)
339    }
340
341    /// Starts the background thread that runs the state machines. This cannot
342    /// be done when building the executor since some global contexts in turn
343    /// may depend on the executor, forming a cyclic dependency.
344    ///
345    /// ## Panics
346    /// If called more than once.
347    pub fn start_executor(&self, context_gen: ContextGen) {
348        let Some((shutdown_receiver, sm_update_rx)) = self
349            .inner
350            .state
351            .write()
352            .expect("locking can't fail")
353            .start(context_gen.clone())
354        else {
355            panic!("start_executor was called previously");
356        };
357
358        let task_runner_inner = self.inner.clone();
359        let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
360            let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
361            let task_group_shutdown_rx = task_handle.make_shutdown_rx();
362            select! {
363                () = task_group_shutdown_rx => {
364                    debug!("Shutting down state machine executor runner due to task group shutdown signal");
365                },
366                shutdown_happened_sender = shutdown_receiver => {
367                    match shutdown_happened_sender {
368                        Ok(()) => {
369                            debug!("Shutting down state machine executor runner due to explicit shutdown signal");
370                        },
371                        Err(_) => {
372                            warn!("Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)");
373                        }
374                    }
375                },
376                () = executor_runner => {
377                    error!("State machine executor runner exited unexpectedly!");
378                },
379            };
380        });
381    }
382
383    /// Stops the background task that runs the state machines.
384    ///
385    /// If a shutdown signal was sent it returns a [`oneshot::Receiver`] that
386    /// will be signalled when the main loop of the background task has
387    /// exited. This can be useful to block until the executor has stopped
388    /// to avoid errors due to the async runtime shutting down while the
389    /// task is still running.
390    ///
391    /// If no shutdown signal was sent it returns `None`. This can happen if
392    /// `stop_executor` is called multiple times.
393    ///
394    /// ## Panics
395    /// If called in parallel with [`start_executor`](Self::start_executor).
396    pub fn stop_executor(&self) -> Option<()> {
397        self.inner.stop_executor()
398    }
399
400    /// Returns a reference to the [`Notifier`] that can be used to subscribe to
401    /// state transitions
402    pub fn notifier(&self) -> &Notifier {
403        &self.inner.notifier
404    }
405}
406
407impl Drop for ExecutorInner {
408    fn drop(&mut self) {
409        self.stop_executor();
410    }
411}
412
413struct TransitionForActiveState {
414    outcome: serde_json::Value,
415    state: DynState,
416    meta: ActiveStateMeta,
417    transition_fn: StateTransitionFunction<DynState>,
418}
419
420impl ExecutorInner {
421    async fn run(
422        &self,
423        global_context_gen: ContextGen,
424        sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
425    ) {
426        debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
427        if let Err(err) = self
428            .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
429            .await
430        {
431            warn!(
432                %err,
433                "An unexpected error occurred during a state transition"
434            );
435        }
436    }
437
438    async fn get_transition_for(
439        &self,
440        state: &DynState,
441        meta: ActiveStateMeta,
442        global_context_gen: &ContextGen,
443    ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
444        let module_instance = state.module_instance_id();
445        let context = &self
446            .module_contexts
447            .get(&module_instance)
448            .expect("Unknown module");
449        let transitions = state
450            .transitions(
451                context,
452                &global_context_gen(module_instance, state.operation_id()),
453            )
454            .into_iter()
455            .map(|transition| {
456                let state = state.clone();
457                let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
458                    let StateTransition {
459                        trigger,
460                        transition,
461                    } = transition;
462                    TransitionForActiveState {
463                        outcome: trigger.await,
464                        state,
465                        transition_fn: transition,
466                        meta,
467                    }
468                });
469                f
470            })
471            .collect::<Vec<_>>();
472        if transitions.is_empty() {
473            // In certain cases a terminal (no transitions) state could get here due to
474            // module bug. Inactivate it to prevent accumulation of such states.
475            // See [`Self::add_state_machines_dbtx`].
476            warn!(module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream.");
477            self.db
478                .autocommit::<_, _, anyhow::Error>(
479                    |dbtx, _| {
480                        Box::pin(async {
481                            let k = InactiveStateKey::from_state(state.clone());
482                            let v = ActiveStateMeta::default().into_inactive();
483                            dbtx.remove_entry(&ActiveStateKey::from_state(state.clone()))
484                                .await;
485                            dbtx.insert_entry(&k, &v).await;
486                            Ok(())
487                        })
488                    },
489                    None,
490                )
491                .await
492                .expect("Autocommit here can't fail");
493        }
494
495        transitions
496    }
497
498    async fn run_state_machines_executor_inner(
499        &self,
500        global_context_gen: ContextGen,
501        mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
502    ) -> anyhow::Result<()> {
503        /// All futures in the executor resolve to this type, so the handling
504        /// code can tell them apart.
505        enum ExecutorLoopEvent {
506            /// Notification about `DynState` arrived and should be handled,
507            /// usually added to the list of pending futures.
508            New { state: DynState },
509            /// One of trigger futures of a state machine finished and
510            /// returned transition function to run
511            Triggered(TransitionForActiveState),
512            /// The state machine did not need to run, so it was canceled
513            Invalid { state: DynState },
514            /// Transition function and all the accounting around it are done
515            Completed {
516                state: DynState,
517                outcome: ActiveOrInactiveState,
518            },
519            /// New job receiver disconnected, that can only mean termination
520            Disconnected,
521        }
522
523        let active_states = self.get_active_states().await;
524        trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
525        for (state, _meta) in active_states {
526            self.sm_update_tx
527                .send(state)
528                .expect("Must be able to send state machine to own opened channel");
529        }
530
531        // Keeps track of things already running, so we can deduplicate, just
532        // in case.
533        let mut currently_running_sms = HashSet::<DynState>::new();
534        // All things happening in parallel go into here
535        // NOTE: `FuturesUnordered` is a footgun: when it's not being polled
536        // (e.g. we picked an event and are awaiting on something to process it),
537        // nothing inside `futures` will be making progress, which in extreme cases
538        // could lead to hangs. For this reason we try really hard in the code here,
539        // to pick an event from `futures` and spawn a new task, avoiding any `await`,
540        // just so we can get back to `futures.next()` ASAP.
541        let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
542            FuturesUnordered::new();
543
544        loop {
545            let event = tokio::select! {
546                new = sm_update_rx.recv() => {
547                    if let Some(new) = new {
548                        ExecutorLoopEvent::New {
549                            state: new,
550                        }
551                    } else {
552                        ExecutorLoopEvent::Disconnected
553                    }
554                },
555
556                event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
557            };
558
559            // main reactor loop: wait for next thing that completed, react (possibly adding
560            // more things to `futures`)
561            match event {
562                ExecutorLoopEvent::New { state } => {
563                    if currently_running_sms.contains(&state) {
564                        warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
565                        continue;
566                    }
567                    currently_running_sms.insert(state.clone());
568                    let futures_len = futures.len();
569                    let global_context_gen = &global_context_gen;
570                    trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
571                    futures.push(Box::pin(async move {
572                        let Some(meta) = self.get_active_state(&state).await else {
573                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
574                            return ExecutorLoopEvent::Invalid { state: state.clone() };
575                        };
576
577                        let transitions = self
578                            .get_transition_for(&state, meta, global_context_gen)
579                            .await;
580                        if transitions.is_empty() {
581                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
582                            return ExecutorLoopEvent::Invalid { state: state.clone() };
583                        }
584                        let transitions_num = transitions.len();
585
586                        debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
587
588                        let (first_completed_result, _index, _unused_transitions) =
589                            select_all(transitions).await;
590                        ExecutorLoopEvent::Triggered(first_completed_result)
591                    }));
592                }
593                ExecutorLoopEvent::Triggered(TransitionForActiveState {
594                    outcome,
595                    state,
596                    meta,
597                    transition_fn,
598                }) => {
599                    debug!(
600                        target: LOG_CLIENT_REACTOR,
601                        operation_id = %state.operation_id().fmt_short(),
602                        "Triggered state transition",
603                    );
604                    let span = tracing::debug_span!(
605                        target: LOG_CLIENT_REACTOR,
606                        "sm_transition",
607                        operation_id = %state.operation_id().fmt_short()
608                    );
609                    // Perform the transition as another future, so transitions can happen in
610                    // parallel.
611                    // Database write conflicts might be happening quite often here,
612                    // but transaction functions are supposed to be idempotent anyway,
613                    // so it seems like a good stress-test in the worst case.
614                    futures.push({
615                        let sm_update_tx = self.sm_update_tx.clone();
616                        let db = self.db.clone();
617                        let notifier = self.notifier.clone();
618                        let module_contexts = self.module_contexts.clone();
619                        let global_context_gen = global_context_gen.clone();
620                        Box::pin(
621                            async move {
622                                debug!(
623                                    target: LOG_CLIENT_REACTOR,
624                                    "Executing state transition",
625                                );
626                                trace!(
627                                    target: LOG_CLIENT_REACTOR,
628                                    ?state,
629                                    outcome = ?AbbreviateJson(&outcome),
630                                    "Executing state transition (details)",
631                                );
632
633                                let module_contexts = &module_contexts;
634                                let global_context_gen = &global_context_gen;
635
636                                let outcome = db
637                                    .autocommit::<'_, '_, _, _, Infallible>(
638                                        |dbtx, _| {
639                                            let state = state.clone();
640                                            let transition_fn = transition_fn.clone();
641                                            let transition_outcome = outcome.clone();
642                                            Box::pin(async move {
643                                                let new_state = transition_fn(
644                                                    &mut ClientSMDatabaseTransaction::new(
645                                                        &mut dbtx.to_ref(),
646                                                        state.module_instance_id(),
647                                                    ),
648                                                    transition_outcome.clone(),
649                                                    state.clone(),
650                                                )
651                                                .await;
652                                                dbtx.remove_entry(&ActiveStateKey::from_state(
653                                                    state.clone(),
654                                                ))
655                                                .await;
656                                                dbtx.insert_entry(
657                                                    &InactiveStateKey::from_state(state.clone()),
658                                                    &meta.into_inactive(),
659                                                )
660                                                .await;
661
662                                                let context = &module_contexts
663                                                    .get(&state.module_instance_id())
664                                                    .expect("Unknown module");
665
666                                                let operation_id = state.operation_id();
667                                                let global_context = global_context_gen(
668                                                    state.module_instance_id(),
669                                                    operation_id,
670                                                );
671
672                                                let is_terminal = new_state.is_terminal(context, &global_context);
673
674                                                if is_terminal {
675                                                    let k = InactiveStateKey::from_state(
676                                                        new_state.clone(),
677                                                    );
678                                                    let v = ActiveStateMeta::default().into_inactive();
679                                                    dbtx.insert_entry(&k, &v).await;
680                                                    Ok(ActiveOrInactiveState::Inactive {
681                                                        dyn_state: new_state,
682                                                    })
683                                                } else {
684                                                    let k = ActiveStateKey::from_state(
685                                                        new_state.clone(),
686                                                    );
687                                                    let v = ActiveStateMeta::default();
688                                                    dbtx.insert_entry(&k, &v).await;
689                                                    Ok(ActiveOrInactiveState::Active {
690                                                        dyn_state: new_state,
691                                                        meta: v,
692                                                    })
693                                                }
694                                            })
695                                        },
696                                        None,
697                                    )
698                                    .await
699                                    .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
700
701                                debug!(
702                                    target: LOG_CLIENT_REACTOR,
703                                    terminal = !outcome.is_active(),
704                                    ?outcome,
705                                    "State transition complete",
706                                );
707
708                                match &outcome {
709                                    ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
710                                        sm_update_tx
711                                            .send(dyn_state.clone())
712                                            .expect("can't fail: we are the receiving end");
713                                        notifier.notify(dyn_state.clone());
714                                    }
715                                    ActiveOrInactiveState::Inactive { dyn_state } => {
716                                        notifier.notify(dyn_state.clone());
717                                    }
718                                }
719                                ExecutorLoopEvent::Completed { state, outcome }
720                            }
721                            .instrument(span),
722                        )
723                    });
724                }
725                ExecutorLoopEvent::Invalid { state } => {
726                    trace!(
727                        target: LOG_CLIENT_REACTOR,
728                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
729                        "State invalid"
730                    );
731                    assert!(
732                        currently_running_sms.remove(&state),
733                        "State must have been recorded"
734                    );
735                }
736
737                ExecutorLoopEvent::Completed { state, outcome } => {
738                    assert!(
739                        currently_running_sms.remove(&state),
740                        "State must have been recorded"
741                    );
742                    debug!(
743                        target: LOG_CLIENT_REACTOR,
744                        operation_id = %state.operation_id().fmt_short(),
745                        outcome_active = outcome.is_active(),
746                        total = futures.len(),
747                        "State transition complete"
748                    );
749                    trace!(
750                        target: LOG_CLIENT_REACTOR,
751                        ?outcome,
752                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
753                        "State transition complete"
754                    );
755                }
756                ExecutorLoopEvent::Disconnected => {
757                    break;
758                }
759            }
760        }
761
762        info!(target: LOG_CLIENT_REACTOR, "Terminated.");
763        Ok(())
764    }
765
766    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
767        self.db
768            .begin_transaction_nc()
769            .await
770            .find_by_prefix(&ActiveStateKeyPrefix)
771            .await
772            // ignore states from modules that are not initialized yet
773            .filter(|(state, _)| {
774                future::ready(
775                    self.module_contexts
776                        .contains_key(&state.state.module_instance_id()),
777                )
778            })
779            .map(|(state, meta)| (state.state, meta))
780            .collect::<Vec<_>>()
781            .await
782    }
783
784    async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
785        // ignore states from modules that are not initialized yet
786        if !self
787            .module_contexts
788            .contains_key(&state.module_instance_id())
789        {
790            return None;
791        }
792        self.db
793            .begin_transaction_nc()
794            .await
795            .get_value(&ActiveStateKey::from_state(state.clone()))
796            .await
797    }
798
799    async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
800        self.db
801            .begin_transaction_nc()
802            .await
803            .find_by_prefix(&InactiveStateKeyPrefix)
804            .await
805            // ignore states from modules that are not initialized yet
806            .filter(|(state, _)| {
807                future::ready(
808                    self.module_contexts
809                        .contains_key(&state.state.module_instance_id()),
810                )
811            })
812            .map(|(state, meta)| (state.state, meta))
813            .collect::<Vec<_>>()
814            .await
815    }
816}
817
818impl ExecutorInner {
819    /// See [`Executor::stop_executor`].
820    fn stop_executor(&self) -> Option<()> {
821        let mut state = self.state.write().expect("Locking can't fail");
822
823        state.stop()
824    }
825}
826
827impl Debug for ExecutorInner {
828    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
829        writeln!(f, "ExecutorInner {{}}")
830    }
831}
832
833impl ExecutorBuilder {
834    /// Allow executor being built to run state machines associated with the
835    /// supplied module
836    pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
837    where
838        C: IntoDynInstance<DynType = DynContext>,
839    {
840        self.with_module_dyn(context.into_dyn(instance_id));
841    }
842
843    /// Allow executor being built to run state machines associated with the
844    /// supplied module
845    pub fn with_module_dyn(&mut self, context: DynContext) {
846        self.valid_module_ids.insert(context.module_instance_id());
847
848        if self
849            .module_contexts
850            .insert(context.module_instance_id(), context)
851            .is_some()
852        {
853            panic!("Tried to add two modules with the same instance id!");
854        }
855    }
856
857    /// Allow executor to build state machines associated with the module id,
858    /// for which the module itself might not be available yet (otherwise it
859    /// would be registered with `[Self::with_module_dyn]`).
860    pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
861        self.valid_module_ids.insert(module_id);
862    }
863
864    /// Build [`Executor`] and spawn background task in `tasks` executing active
865    /// state machines. The supplied database `db` must support isolation, so
866    /// cannot be an isolated DB instance itself.
867    pub fn build(self, db: Database, notifier: Notifier, client_task_group: TaskGroup) -> Executor {
868        let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
869
870        let inner = Arc::new(ExecutorInner {
871            db,
872            state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
873            module_contexts: self.module_contexts,
874            valid_module_ids: self.valid_module_ids,
875            notifier,
876            sm_update_tx,
877            client_task_group,
878        });
879
880        debug!(
881            instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
882            "Initialized state machine executor with module instances"
883        );
884        Executor { inner }
885    }
886}
887
888/// A state that is able to make progress eventually
889#[derive(Debug)]
890pub struct ActiveStateKey {
891    // TODO: remove redundant operation id from state trait
892    pub operation_id: OperationId,
893    // TODO: state being a key... seems ... risky?
894    pub state: DynState,
895}
896
897impl ActiveStateKey {
898    pub fn from_state(state: DynState) -> ActiveStateKey {
899        ActiveStateKey {
900            operation_id: state.operation_id(),
901            state,
902        }
903    }
904}
905
906impl Encodable for ActiveStateKey {
907    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
908        let mut len = 0;
909        len += self.operation_id.consensus_encode(writer)?;
910        len += self.state.consensus_encode(writer)?;
911        Ok(len)
912    }
913}
914
915impl Decodable for ActiveStateKey {
916    fn consensus_decode<R: Read>(
917        reader: &mut R,
918        modules: &ModuleDecoderRegistry,
919    ) -> Result<Self, DecodeError> {
920        let operation_id = OperationId::consensus_decode(reader, modules)?;
921        let state = DynState::consensus_decode(reader, modules)?;
922
923        Ok(ActiveStateKey {
924            operation_id,
925            state,
926        })
927    }
928}
929
930#[derive(Debug)]
931pub struct ActiveStateKeyBytes {
932    pub operation_id: OperationId,
933    pub module_instance_id: ModuleInstanceId,
934    pub state: Vec<u8>,
935}
936
937impl Encodable for ActiveStateKeyBytes {
938    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
939        let mut len = 0;
940        len += self.operation_id.consensus_encode(writer)?;
941        len += writer.write(self.state.as_slice())?;
942        Ok(len)
943    }
944}
945
946impl Decodable for ActiveStateKeyBytes {
947    fn consensus_decode<R: std::io::Read>(
948        reader: &mut R,
949        modules: &ModuleDecoderRegistry,
950    ) -> Result<Self, DecodeError> {
951        let operation_id = OperationId::consensus_decode(reader, modules)?;
952        let module_instance_id = ModuleInstanceId::consensus_decode(reader, modules)?;
953        let mut bytes = Vec::new();
954        reader
955            .read_to_end(&mut bytes)
956            .map_err(DecodeError::from_err)?;
957
958        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
959        instance_bytes.append(&mut bytes);
960
961        Ok(ActiveStateKeyBytes {
962            operation_id,
963            module_instance_id,
964            state: instance_bytes,
965        })
966    }
967}
968
969#[derive(Debug)]
970pub(crate) struct ActiveOperationStateKeyPrefix {
971    pub operation_id: OperationId,
972}
973
974impl Encodable for ActiveOperationStateKeyPrefix {
975    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
976        self.operation_id.consensus_encode(writer)
977    }
978}
979
980impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
981    type Record = ActiveStateKey;
982}
983
984#[derive(Debug)]
985pub(crate) struct ActiveModuleOperationStateKeyPrefix {
986    pub operation_id: OperationId,
987    pub module_instance: ModuleInstanceId,
988}
989
990impl Encodable for ActiveModuleOperationStateKeyPrefix {
991    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
992        let mut len = 0;
993        len += self.operation_id.consensus_encode(writer)?;
994        len += self.module_instance.consensus_encode(writer)?;
995        Ok(len)
996    }
997}
998
999impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
1000    type Record = ActiveStateKey;
1001}
1002
1003#[derive(Debug)]
1004pub struct ActiveStateKeyPrefix;
1005
1006impl Encodable for ActiveStateKeyPrefix {
1007    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> {
1008        Ok(0)
1009    }
1010}
1011
1012#[derive(Debug, Copy, Clone, Encodable, Decodable)]
1013pub struct ActiveStateMeta {
1014    pub created_at: SystemTime,
1015}
1016
1017impl ::fedimint_core::db::DatabaseRecord for ActiveStateKey {
1018    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1019    const NOTIFY_ON_MODIFY: bool = true;
1020    type Key = Self;
1021    type Value = ActiveStateMeta;
1022}
1023
1024impl DatabaseKeyWithNotify for ActiveStateKey {}
1025
1026impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1027    type Record = ActiveStateKey;
1028}
1029
1030#[derive(Debug, Encodable, Decodable)]
1031pub(crate) struct ActiveStateKeyPrefixBytes;
1032
1033impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1034    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1035    const NOTIFY_ON_MODIFY: bool = false;
1036    type Key = Self;
1037    type Value = ActiveStateMeta;
1038}
1039
1040impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1041    type Record = ActiveStateKeyBytes;
1042}
1043
1044impl Default for ActiveStateMeta {
1045    fn default() -> Self {
1046        Self {
1047            created_at: fedimint_core::time::now(),
1048        }
1049    }
1050}
1051
1052impl ActiveStateMeta {
1053    fn into_inactive(self) -> InactiveStateMeta {
1054        InactiveStateMeta {
1055            created_at: self.created_at,
1056            exited_at: fedimint_core::time::now(),
1057        }
1058    }
1059}
1060
1061/// A past or final state of a state machine
1062#[derive(Debug, Clone)]
1063pub struct InactiveStateKey {
1064    // TODO: remove redundant operation id from state trait
1065    pub operation_id: OperationId,
1066    pub state: DynState,
1067}
1068
1069impl InactiveStateKey {
1070    pub fn from_state(state: DynState) -> InactiveStateKey {
1071        InactiveStateKey {
1072            operation_id: state.operation_id(),
1073            state,
1074        }
1075    }
1076}
1077
1078impl Encodable for InactiveStateKey {
1079    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1080        let mut len = 0;
1081        len += self.operation_id.consensus_encode(writer)?;
1082        len += self.state.consensus_encode(writer)?;
1083        Ok(len)
1084    }
1085}
1086
1087impl Decodable for InactiveStateKey {
1088    fn consensus_decode<R: Read>(
1089        reader: &mut R,
1090        modules: &ModuleDecoderRegistry,
1091    ) -> Result<Self, DecodeError> {
1092        let operation_id = OperationId::consensus_decode(reader, modules)?;
1093        let state = DynState::consensus_decode(reader, modules)?;
1094
1095        Ok(InactiveStateKey {
1096            operation_id,
1097            state,
1098        })
1099    }
1100}
1101
1102#[derive(Debug)]
1103pub struct InactiveStateKeyBytes {
1104    pub operation_id: OperationId,
1105    pub module_instance_id: ModuleInstanceId,
1106    pub state: Vec<u8>,
1107}
1108
1109impl Encodable for InactiveStateKeyBytes {
1110    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
1111        let mut len = 0;
1112        len += self.operation_id.consensus_encode(writer)?;
1113        len += writer.write(self.state.as_slice())?;
1114        Ok(len)
1115    }
1116}
1117
1118impl Decodable for InactiveStateKeyBytes {
1119    fn consensus_decode<R: std::io::Read>(
1120        reader: &mut R,
1121        modules: &ModuleDecoderRegistry,
1122    ) -> Result<Self, DecodeError> {
1123        let operation_id = OperationId::consensus_decode(reader, modules)?;
1124        let module_instance_id = ModuleInstanceId::consensus_decode(reader, modules)?;
1125        let mut bytes = Vec::new();
1126        reader
1127            .read_to_end(&mut bytes)
1128            .map_err(DecodeError::from_err)?;
1129
1130        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1131        instance_bytes.append(&mut bytes);
1132
1133        Ok(InactiveStateKeyBytes {
1134            operation_id,
1135            module_instance_id,
1136            state: instance_bytes,
1137        })
1138    }
1139}
1140
1141#[derive(Debug)]
1142pub(crate) struct InactiveOperationStateKeyPrefix {
1143    pub operation_id: OperationId,
1144}
1145
1146impl Encodable for InactiveOperationStateKeyPrefix {
1147    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1148        self.operation_id.consensus_encode(writer)
1149    }
1150}
1151
1152impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1153    type Record = InactiveStateKey;
1154}
1155
1156#[derive(Debug)]
1157pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1158    pub operation_id: OperationId,
1159    pub module_instance: ModuleInstanceId,
1160}
1161
1162impl Encodable for InactiveModuleOperationStateKeyPrefix {
1163    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1164        let mut len = 0;
1165        len += self.operation_id.consensus_encode(writer)?;
1166        len += self.module_instance.consensus_encode(writer)?;
1167        Ok(len)
1168    }
1169}
1170
1171impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1172    type Record = InactiveStateKey;
1173}
1174
1175#[derive(Debug, Clone)]
1176pub struct InactiveStateKeyPrefix;
1177
1178impl Encodable for InactiveStateKeyPrefix {
1179    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> {
1180        Ok(0)
1181    }
1182}
1183
1184#[derive(Debug, Encodable, Decodable)]
1185pub(crate) struct InactiveStateKeyPrefixBytes;
1186
1187impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1188    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1189    const NOTIFY_ON_MODIFY: bool = false;
1190    type Key = Self;
1191    type Value = InactiveStateMeta;
1192}
1193
1194impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1195    type Record = InactiveStateKeyBytes;
1196}
1197
1198#[derive(Debug, Copy, Clone, Decodable, Encodable)]
1199pub struct InactiveStateMeta {
1200    pub created_at: SystemTime,
1201    pub exited_at: SystemTime,
1202}
1203
1204impl ::fedimint_core::db::DatabaseRecord for InactiveStateKey {
1205    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1206    const NOTIFY_ON_MODIFY: bool = true;
1207    type Key = Self;
1208    type Value = InactiveStateMeta;
1209}
1210
1211impl DatabaseKeyWithNotify for InactiveStateKey {}
1212
1213impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1214    type Record = InactiveStateKey;
1215}
1216
1217#[derive(Debug)]
1218enum ActiveOrInactiveState {
1219    Active {
1220        dyn_state: DynState,
1221        #[allow(dead_code)] // currently not printed anywhere, but useful in the db
1222        meta: ActiveStateMeta,
1223    },
1224    Inactive {
1225        dyn_state: DynState,
1226    },
1227}
1228
1229impl ActiveOrInactiveState {
1230    fn is_active(&self) -> bool {
1231        match self {
1232            ActiveOrInactiveState::Active { .. } => true,
1233            ActiveOrInactiveState::Inactive { .. } => false,
1234        }
1235    }
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240    use std::fmt::Debug;
1241    use std::sync::Arc;
1242    use std::time::Duration;
1243
1244    use fedimint_core::core::{
1245        Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId,
1246    };
1247    use fedimint_core::db::mem_impl::MemDatabase;
1248    use fedimint_core::db::Database;
1249    use fedimint_core::encoding::{Decodable, Encodable};
1250    use fedimint_core::module::registry::ModuleDecoderRegistry;
1251    use fedimint_core::runtime;
1252    use fedimint_core::task::TaskGroup;
1253    use tokio::sync::broadcast::Sender;
1254    use tracing::{info, trace};
1255
1256    use crate::sm::state::{Context, DynContext, DynState};
1257    use crate::sm::{Executor, Notifier, State, StateTransition};
1258    use crate::DynGlobalClientContext;
1259
1260    #[derive(Debug, Clone, Eq, PartialEq, Decodable, Encodable, Hash)]
1261    enum MockStateMachine {
1262        Start,
1263        ReceivedNonNull(u64),
1264        Final,
1265    }
1266
1267    impl State for MockStateMachine {
1268        type ModuleContext = MockContext;
1269
1270        fn transitions(
1271            &self,
1272            context: &Self::ModuleContext,
1273            _global_context: &DynGlobalClientContext,
1274        ) -> Vec<StateTransition<Self>> {
1275            match self {
1276                MockStateMachine::Start => {
1277                    let mut receiver1 = context.broadcast.subscribe();
1278                    let mut receiver2 = context.broadcast.subscribe();
1279                    vec![
1280                        StateTransition::new(
1281                            async move {
1282                                loop {
1283                                    let val = receiver1.recv().await.unwrap();
1284                                    if val == 0 {
1285                                        trace!("State transition Start->Final");
1286                                        break;
1287                                    }
1288                                }
1289                            },
1290                            |_dbtx, (), _state| Box::pin(async { MockStateMachine::Final }),
1291                        ),
1292                        StateTransition::new(
1293                            async move {
1294                                loop {
1295                                    let val = receiver2.recv().await.unwrap();
1296                                    if val != 0 {
1297                                        trace!("State transition Start->ReceivedNonNull");
1298                                        break val;
1299                                    }
1300                                }
1301                            },
1302                            |_dbtx, value, _state| {
1303                                Box::pin(async move { MockStateMachine::ReceivedNonNull(value) })
1304                            },
1305                        ),
1306                    ]
1307                }
1308                MockStateMachine::ReceivedNonNull(prev_val) => {
1309                    let prev_val = *prev_val;
1310                    let mut receiver = context.broadcast.subscribe();
1311                    vec![StateTransition::new(
1312                        async move {
1313                            loop {
1314                                let val = receiver.recv().await.unwrap();
1315                                if val == prev_val {
1316                                    trace!("State transition ReceivedNonNull->Final");
1317                                    break;
1318                                }
1319                            }
1320                        },
1321                        |_dbtx, (), _state| Box::pin(async { MockStateMachine::Final }),
1322                    )]
1323                }
1324                MockStateMachine::Final => {
1325                    vec![]
1326                }
1327            }
1328        }
1329
1330        fn operation_id(&self) -> OperationId {
1331            OperationId([0u8; 32])
1332        }
1333    }
1334
1335    impl IntoDynInstance for MockStateMachine {
1336        type DynType = DynState;
1337
1338        fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1339            DynState::from_typed(instance_id, self)
1340        }
1341    }
1342
1343    #[derive(Debug, Clone)]
1344    struct MockContext {
1345        broadcast: tokio::sync::broadcast::Sender<u64>,
1346    }
1347
1348    impl IntoDynInstance for MockContext {
1349        type DynType = DynContext;
1350
1351        fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1352            DynContext::from_typed(instance_id, self)
1353        }
1354    }
1355
1356    impl Context for MockContext {
1357        const KIND: Option<ModuleKind> = None;
1358    }
1359
1360    fn get_executor() -> (Executor, Sender<u64>, Database) {
1361        let (broadcast, _) = tokio::sync::broadcast::channel(10);
1362
1363        let mut decoder_builder = Decoder::builder();
1364        decoder_builder.with_decodable_type::<MockStateMachine>();
1365        let decoder = decoder_builder.build();
1366
1367        let decoders =
1368            ModuleDecoderRegistry::new(vec![(42, ModuleKind::from_static_str("test"), decoder)]);
1369        let db = Database::new(MemDatabase::new(), decoders);
1370
1371        let mut executor_builder = Executor::builder();
1372        executor_builder.with_module(
1373            42,
1374            MockContext {
1375                broadcast: broadcast.clone(),
1376            },
1377        );
1378        let executor =
1379            executor_builder.build(db.clone(), Notifier::new(db.clone()), TaskGroup::new());
1380        executor.start_executor(Arc::new(|_, _| DynGlobalClientContext::new_fake()));
1381
1382        info!("Initialized test executor");
1383        (executor, broadcast, db)
1384    }
1385
1386    #[tokio::test]
1387    #[tracing_test::traced_test]
1388    async fn test_executor() {
1389        const MOCK_INSTANCE_1: ModuleInstanceId = 42;
1390        const MOCK_INSTANCE_2: ModuleInstanceId = 21;
1391
1392        let (executor, sender, _db) = get_executor();
1393        executor
1394            .add_state_machines(vec![DynState::from_typed(
1395                MOCK_INSTANCE_1,
1396                MockStateMachine::Start,
1397            )])
1398            .await
1399            .unwrap();
1400
1401        assert!(
1402            executor
1403                .add_state_machines(vec![DynState::from_typed(
1404                    MOCK_INSTANCE_1,
1405                    MockStateMachine::Start
1406                )])
1407                .await
1408                .is_err(),
1409            "Running the same state machine a second time should fail"
1410        );
1411
1412        assert!(
1413            executor
1414                .contains_active_state(MOCK_INSTANCE_1, MockStateMachine::Start)
1415                .await,
1416            "State was written to DB and waits for broadcast"
1417        );
1418        assert!(
1419            !executor
1420                .contains_active_state(MOCK_INSTANCE_2, MockStateMachine::Start)
1421                .await,
1422            "Instance separation works"
1423        );
1424
1425        // TODO build await fn+timeout or allow manual driving of executor
1426        runtime::sleep(Duration::from_secs(1)).await;
1427        sender.send(0).unwrap();
1428        runtime::sleep(Duration::from_secs(2)).await;
1429
1430        assert!(
1431            executor
1432                .contains_inactive_state(MOCK_INSTANCE_1, MockStateMachine::Final)
1433                .await,
1434            "State was written to DB and waits for broadcast"
1435        );
1436    }
1437}