fedimint_client/sm/
executor.rs

1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Read, Write};
5use std::mem;
6use std::sync::Arc;
7use std::time::SystemTime;
8
9use anyhow::anyhow;
10use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
11use fedimint_core::db::{
12    AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
13    IDatabaseTransactionOpsCoreTyped,
14};
15use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
16use fedimint_core::fmt_utils::AbbreviateJson;
17use fedimint_core::maybe_add_send_sync;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::task::TaskGroup;
20use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
21use fedimint_logging::LOG_CLIENT_REACTOR;
22use futures::future::{self, select_all};
23use futures::stream::{FuturesUnordered, StreamExt};
24use tokio::select;
25use tokio::sync::{mpsc, oneshot};
26use tracing::{debug, error, info, trace, warn, Instrument};
27
28use super::state::StateTransitionFunction;
29use crate::sm::notifier::Notifier;
30use crate::sm::state::{DynContext, DynState};
31use crate::sm::{ClientSMDatabaseTransaction, State, StateTransition};
32use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
33
34/// After how many attempts a DB transaction is aborted with an error
35const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
36
37pub type ContextGen =
38    Arc<maybe_add_send_sync!(dyn Fn(ModuleInstanceId, OperationId) -> DynGlobalClientContext)>;
39
40/// Prefixes for executor DB entries
41enum ExecutorDbPrefixes {
42    /// See [`ActiveStateKey`]
43    ActiveStates = 0xa1,
44    /// See [`InactiveStateKey`]
45    InactiveStates = 0xa2,
46}
47
48/// Executor that drives forward state machines under its management.
49///
50/// Each state transition is atomic and supposed to be idempotent such that a
51/// stop/crash of the executor at any point can be recovered from on restart.
52/// The executor is aware of the concept of Fedimint modules and can give state
53/// machines a different [execution context](super::state::Context) depending on
54/// the owning module, making it very flexible.
55#[derive(Clone, Debug)]
56pub struct Executor {
57    inner: Arc<ExecutorInner>,
58}
59
60struct ExecutorInner {
61    db: Database,
62    state: std::sync::RwLock<ExecutorState>,
63    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
64    valid_module_ids: BTreeSet<ModuleInstanceId>,
65    notifier: Notifier,
66    /// Any time executor should notice state machine update (e.g. because it
67    /// was created), it's must be sent through this channel for it to notice.
68    sm_update_tx: mpsc::UnboundedSender<DynState>,
69    client_task_group: TaskGroup,
70}
71
72enum ExecutorState {
73    Unstarted {
74        sm_update_rx: mpsc::UnboundedReceiver<DynState>,
75    },
76    Running {
77        context_gen: ContextGen,
78        shutdown_sender: oneshot::Sender<()>,
79    },
80    Stopped,
81}
82
83impl ExecutorState {
84    /// Starts the executor, returning a receiver that will be signalled when
85    /// the executor is stopped and a receiver for state machine updates.
86    /// Returns `None` if the executor has already been started and/or stopped.
87    fn start(
88        &mut self,
89        context: ContextGen,
90    ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
91        let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
92
93        let previous_state = mem::replace(
94            self,
95            ExecutorState::Running {
96                context_gen: context,
97                shutdown_sender,
98            },
99        );
100
101        if let ExecutorState::Unstarted { sm_update_rx } = previous_state {
102            Some((shutdown_receiver, sm_update_rx))
103        } else {
104            // Replace the previous state, undoing the `mem::replace` above.
105            *self = previous_state;
106
107            debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
108            None
109        }
110    }
111
112    /// Stops the executor, returning `Some(())` if the executor was running and
113    /// `None` if it was in any other state.
114    fn stop(&mut self) -> Option<()> {
115        let previous_state = mem::replace(self, ExecutorState::Stopped);
116
117        if let ExecutorState::Running {
118            shutdown_sender, ..
119        } = previous_state
120        {
121            if shutdown_sender.send(()).is_err() {
122                warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
123            }
124            Some(())
125        } else {
126            // Replace the previous state, undoing the `mem::replace` above.
127            *self = previous_state;
128
129            debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
130            None
131        }
132    }
133
134    fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
135        let ExecutorState::Running { context_gen, .. } = self else {
136            return None;
137        };
138        Some(context_gen(
139            state.module_instance_id(),
140            state.operation_id(),
141        ))
142    }
143}
144
145/// Builder to which module clients can be attached and used to build an
146/// [`Executor`] supporting these.
147#[derive(Debug, Default)]
148pub struct ExecutorBuilder {
149    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
150    valid_module_ids: BTreeSet<ModuleInstanceId>,
151}
152
153impl Executor {
154    /// Creates an [`ExecutorBuilder`]
155    pub fn builder() -> ExecutorBuilder {
156        ExecutorBuilder::default()
157    }
158
159    pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
160        self.inner.get_active_states().await
161    }
162
163    /// Adds a number of state machines to the executor atomically. They will be
164    /// driven to completion automatically in the background.
165    ///
166    /// **Attention**: do not use before background task is started!
167    // TODO: remove warning once finality is an inherent state attribute
168    pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
169        self.inner
170            .db
171            .autocommit(
172                |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
173                MAX_DB_ATTEMPTS,
174            )
175            .await
176            .map_err(|e| match e {
177                AutocommitError::CommitFailed {
178                    last_error,
179                    attempts,
180                } => last_error.context(format!("Failed to commit after {attempts} attempts")),
181                AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
182            })?;
183
184        // TODO: notify subscribers to state changes?
185
186        Ok(())
187    }
188
189    /// Adds a number of state machines to the executor atomically with other DB
190    /// changes is `dbtx`. See [`Executor::add_state_machines`] for more
191    /// details.
192    ///
193    /// ## Panics
194    /// If called before background task is started using
195    /// [`Executor::start_executor`]!
196    // TODO: remove warning once finality is an inherent state attribute
197    pub async fn add_state_machines_dbtx(
198        &self,
199        dbtx: &mut DatabaseTransaction<'_>,
200        states: Vec<DynState>,
201    ) -> AddStateMachinesResult {
202        for state in states {
203            if !self
204                .inner
205                .valid_module_ids
206                .contains(&state.module_instance_id())
207            {
208                return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
209            }
210
211            let is_active_state = dbtx
212                .get_value(&ActiveStateKey::from_state(state.clone()))
213                .await
214                .is_some();
215            let is_inactive_state = dbtx
216                .get_value(&InactiveStateKey::from_state(state.clone()))
217                .await
218                .is_some();
219
220            if is_active_state || is_inactive_state {
221                return Err(AddStateMachinesError::StateAlreadyExists);
222            }
223
224            // In case of recovery functions, the module itself is not yet initialized,
225            // so we can't check if the state is terminal. However the
226            // [`Self::get_transitions_for`] function will double check and
227            // deactivate any terminal states that would slip past this check.
228            if let Some(module_context) =
229                self.inner.module_contexts.get(&state.module_instance_id())
230            {
231                if let Some(context) = self
232                    .inner
233                    .state
234                    .read()
235                    .expect("locking failed")
236                    .gen_context(&state)
237                {
238                    if state.is_terminal(module_context, &context) {
239                        return Err(AddStateMachinesError::Other(anyhow!(
240                        "State is already terminal, adding it to the executor doesn't make sense."
241                    )));
242                    }
243                } else {
244                    warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
245                }
246            }
247
248            dbtx.insert_new_entry(
249                &ActiveStateKey::from_state(state.clone()),
250                &ActiveStateMeta::default(),
251            )
252            .await;
253
254            let notify_sender = self.inner.notifier.sender();
255            let sm_updates_tx = self.inner.sm_update_tx.clone();
256            dbtx.on_commit(move || {
257                notify_sender.notify(state.clone());
258                let _ = sm_updates_tx.send(state);
259            });
260        }
261
262        Ok(())
263    }
264
265    /// **Mostly used for testing**
266    ///
267    /// Check if state exists in the database as part of an actively running
268    /// state machine.
269    pub async fn contains_active_state<S: State>(
270        &self,
271        instance: ModuleInstanceId,
272        state: S,
273    ) -> bool {
274        let state = DynState::from_typed(instance, state);
275        self.inner
276            .get_active_states()
277            .await
278            .into_iter()
279            .any(|(s, _)| s == state)
280    }
281
282    // TODO: unify querying fns
283    /// **Mostly used for testing**
284    ///
285    /// Check if state exists in the database as inactive. If the state is
286    /// terminal it means the corresponding state machine finished its
287    /// execution. If the state is non-terminal it means the state machine was
288    /// in that state at some point but moved on since then.
289    pub async fn contains_inactive_state<S: State>(
290        &self,
291        instance: ModuleInstanceId,
292        state: S,
293    ) -> bool {
294        let state = DynState::from_typed(instance, state);
295        self.inner
296            .get_inactive_states()
297            .await
298            .into_iter()
299            .any(|(s, _)| s == state)
300    }
301
302    pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
303        self.inner
304            .db
305            .wait_key_exists(&InactiveStateKey::from_state(state))
306            .await
307    }
308
309    pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
310        self.inner
311            .db
312            .wait_key_exists(&ActiveStateKey::from_state(state))
313            .await
314    }
315
316    /// Only meant for debug tooling
317    pub async fn get_operation_states(
318        &self,
319        operation_id: OperationId,
320    ) -> (
321        Vec<(DynState, ActiveStateMeta)>,
322        Vec<(DynState, InactiveStateMeta)>,
323    ) {
324        let mut dbtx = self.inner.db.begin_transaction_nc().await;
325        let active_states: Vec<_> = dbtx
326            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
327            .await
328            .map(|(active_key, active_meta)| (active_key.state, active_meta))
329            .collect()
330            .await;
331        let inactive_states: Vec<_> = dbtx
332            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
333            .await
334            .map(|(active_key, inactive_meta)| (active_key.state, inactive_meta))
335            .collect()
336            .await;
337
338        (active_states, inactive_states)
339    }
340
341    /// Starts the background thread that runs the state machines. This cannot
342    /// be done when building the executor since some global contexts in turn
343    /// may depend on the executor, forming a cyclic dependency.
344    ///
345    /// ## Panics
346    /// If called more than once.
347    pub fn start_executor(&self, context_gen: ContextGen) {
348        let Some((shutdown_receiver, sm_update_rx)) = self
349            .inner
350            .state
351            .write()
352            .expect("locking can't fail")
353            .start(context_gen.clone())
354        else {
355            panic!("start_executor was called previously");
356        };
357
358        let task_runner_inner = self.inner.clone();
359        let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
360            let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
361            let task_group_shutdown_rx = task_handle.make_shutdown_rx();
362            select! {
363                () = task_group_shutdown_rx => {
364                    debug!(
365                        target: LOG_CLIENT_REACTOR,
366                        "Shutting down state machine executor runner due to task group shutdown signal"
367                    );
368                },
369                shutdown_happened_sender = shutdown_receiver => {
370                    match shutdown_happened_sender {
371                        Ok(()) => {
372                            debug!(
373                                target: LOG_CLIENT_REACTOR,
374                                "Shutting down state machine executor runner due to explicit shutdown signal"
375                            );
376                        },
377                        Err(_) => {
378                            warn!(
379                                target: LOG_CLIENT_REACTOR,
380                                "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
381                            );
382                        }
383                    }
384                },
385                () = executor_runner => {
386                    error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
387                },
388            };
389        });
390    }
391
392    /// Stops the background task that runs the state machines.
393    ///
394    /// If a shutdown signal was sent it returns a [`oneshot::Receiver`] that
395    /// will be signalled when the main loop of the background task has
396    /// exited. This can be useful to block until the executor has stopped
397    /// to avoid errors due to the async runtime shutting down while the
398    /// task is still running.
399    ///
400    /// If no shutdown signal was sent it returns `None`. This can happen if
401    /// `stop_executor` is called multiple times.
402    ///
403    /// ## Panics
404    /// If called in parallel with [`start_executor`](Self::start_executor).
405    pub fn stop_executor(&self) -> Option<()> {
406        self.inner.stop_executor()
407    }
408
409    /// Returns a reference to the [`Notifier`] that can be used to subscribe to
410    /// state transitions
411    pub fn notifier(&self) -> &Notifier {
412        &self.inner.notifier
413    }
414}
415
416impl Drop for ExecutorInner {
417    fn drop(&mut self) {
418        self.stop_executor();
419    }
420}
421
422struct TransitionForActiveState {
423    outcome: serde_json::Value,
424    state: DynState,
425    meta: ActiveStateMeta,
426    transition_fn: StateTransitionFunction<DynState>,
427}
428
429impl ExecutorInner {
430    async fn run(
431        &self,
432        global_context_gen: ContextGen,
433        sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
434    ) {
435        debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
436        if let Err(err) = self
437            .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
438            .await
439        {
440            warn!(
441                target: LOG_CLIENT_REACTOR,
442                err = %err.fmt_compact_anyhow(),
443                "An unexpected error occurred during a state transition"
444            );
445        }
446    }
447
448    async fn get_transition_for(
449        &self,
450        state: &DynState,
451        meta: ActiveStateMeta,
452        global_context_gen: &ContextGen,
453    ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
454        let module_instance = state.module_instance_id();
455        let context = &self
456            .module_contexts
457            .get(&module_instance)
458            .expect("Unknown module");
459        let transitions = state
460            .transitions(
461                context,
462                &global_context_gen(module_instance, state.operation_id()),
463            )
464            .into_iter()
465            .map(|transition| {
466                let state = state.clone();
467                let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
468                    let StateTransition {
469                        trigger,
470                        transition,
471                    } = transition;
472                    TransitionForActiveState {
473                        outcome: trigger.await,
474                        state,
475                        transition_fn: transition,
476                        meta,
477                    }
478                });
479                f
480            })
481            .collect::<Vec<_>>();
482        if transitions.is_empty() {
483            // In certain cases a terminal (no transitions) state could get here due to
484            // module bug. Inactivate it to prevent accumulation of such states.
485            // See [`Self::add_state_machines_dbtx`].
486            warn!(
487                target: LOG_CLIENT_REACTOR,
488                module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
489            );
490            self.db
491                .autocommit::<_, _, anyhow::Error>(
492                    |dbtx, _| {
493                        Box::pin(async {
494                            let k = InactiveStateKey::from_state(state.clone());
495                            let v = ActiveStateMeta::default().into_inactive();
496                            dbtx.remove_entry(&ActiveStateKey::from_state(state.clone()))
497                                .await;
498                            dbtx.insert_entry(&k, &v).await;
499                            Ok(())
500                        })
501                    },
502                    None,
503                )
504                .await
505                .expect("Autocommit here can't fail");
506        }
507
508        transitions
509    }
510
511    async fn run_state_machines_executor_inner(
512        &self,
513        global_context_gen: ContextGen,
514        mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
515    ) -> anyhow::Result<()> {
516        /// All futures in the executor resolve to this type, so the handling
517        /// code can tell them apart.
518        enum ExecutorLoopEvent {
519            /// Notification about `DynState` arrived and should be handled,
520            /// usually added to the list of pending futures.
521            New { state: DynState },
522            /// One of trigger futures of a state machine finished and
523            /// returned transition function to run
524            Triggered(TransitionForActiveState),
525            /// The state machine did not need to run, so it was canceled
526            Invalid { state: DynState },
527            /// Transition function and all the accounting around it are done
528            Completed {
529                state: DynState,
530                outcome: ActiveOrInactiveState,
531            },
532            /// New job receiver disconnected, that can only mean termination
533            Disconnected,
534        }
535
536        let active_states = self.get_active_states().await;
537        trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
538        for (state, _meta) in active_states {
539            self.sm_update_tx
540                .send(state)
541                .expect("Must be able to send state machine to own opened channel");
542        }
543
544        // Keeps track of things already running, so we can deduplicate, just
545        // in case.
546        let mut currently_running_sms = HashSet::<DynState>::new();
547        // All things happening in parallel go into here
548        // NOTE: `FuturesUnordered` is a footgun: when it's not being polled
549        // (e.g. we picked an event and are awaiting on something to process it),
550        // nothing inside `futures` will be making progress, which in extreme cases
551        // could lead to hangs. For this reason we try really hard in the code here,
552        // to pick an event from `futures` and spawn a new task, avoiding any `await`,
553        // just so we can get back to `futures.next()` ASAP.
554        let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
555            FuturesUnordered::new();
556
557        loop {
558            let event = tokio::select! {
559                new = sm_update_rx.recv() => {
560                    if let Some(new) = new {
561                        ExecutorLoopEvent::New {
562                            state: new,
563                        }
564                    } else {
565                        ExecutorLoopEvent::Disconnected
566                    }
567                },
568
569                event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
570            };
571
572            // main reactor loop: wait for next thing that completed, react (possibly adding
573            // more things to `futures`)
574            match event {
575                ExecutorLoopEvent::New { state } => {
576                    if currently_running_sms.contains(&state) {
577                        warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
578                        continue;
579                    }
580                    currently_running_sms.insert(state.clone());
581                    let futures_len = futures.len();
582                    let global_context_gen = &global_context_gen;
583                    trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
584                    futures.push(Box::pin(async move {
585                        let Some(meta) = self.get_active_state(&state).await else {
586                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
587                            return ExecutorLoopEvent::Invalid { state: state.clone() };
588                        };
589
590                        let transitions = self
591                            .get_transition_for(&state, meta, global_context_gen)
592                            .await;
593                        if transitions.is_empty() {
594                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
595                            return ExecutorLoopEvent::Invalid { state: state.clone() };
596                        }
597                        let transitions_num = transitions.len();
598
599                        debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
600
601                        let (first_completed_result, _index, _unused_transitions) =
602                            select_all(transitions).await;
603                        ExecutorLoopEvent::Triggered(first_completed_result)
604                    }));
605                }
606                ExecutorLoopEvent::Triggered(TransitionForActiveState {
607                    outcome,
608                    state,
609                    meta,
610                    transition_fn,
611                }) => {
612                    debug!(
613                        target: LOG_CLIENT_REACTOR,
614                        operation_id = %state.operation_id().fmt_short(),
615                        "Triggered state transition",
616                    );
617                    let span = tracing::debug_span!(
618                        target: LOG_CLIENT_REACTOR,
619                        "sm_transition",
620                        operation_id = %state.operation_id().fmt_short()
621                    );
622                    // Perform the transition as another future, so transitions can happen in
623                    // parallel.
624                    // Database write conflicts might be happening quite often here,
625                    // but transaction functions are supposed to be idempotent anyway,
626                    // so it seems like a good stress-test in the worst case.
627                    futures.push({
628                        let sm_update_tx = self.sm_update_tx.clone();
629                        let db = self.db.clone();
630                        let notifier = self.notifier.clone();
631                        let module_contexts = self.module_contexts.clone();
632                        let global_context_gen = global_context_gen.clone();
633                        Box::pin(
634                            async move {
635                                debug!(
636                                    target: LOG_CLIENT_REACTOR,
637                                    "Executing state transition",
638                                );
639                                trace!(
640                                    target: LOG_CLIENT_REACTOR,
641                                    ?state,
642                                    outcome = ?AbbreviateJson(&outcome),
643                                    "Executing state transition (details)",
644                                );
645
646                                let module_contexts = &module_contexts;
647                                let global_context_gen = &global_context_gen;
648
649                                let outcome = db
650                                    .autocommit::<'_, '_, _, _, Infallible>(
651                                        |dbtx, _| {
652                                            let state = state.clone();
653                                            let transition_fn = transition_fn.clone();
654                                            let transition_outcome = outcome.clone();
655                                            Box::pin(async move {
656                                                let new_state = transition_fn(
657                                                    &mut ClientSMDatabaseTransaction::new(
658                                                        &mut dbtx.to_ref(),
659                                                        state.module_instance_id(),
660                                                    ),
661                                                    transition_outcome.clone(),
662                                                    state.clone(),
663                                                )
664                                                .await;
665                                                dbtx.remove_entry(&ActiveStateKey::from_state(
666                                                    state.clone(),
667                                                ))
668                                                .await;
669                                                dbtx.insert_entry(
670                                                    &InactiveStateKey::from_state(state.clone()),
671                                                    &meta.into_inactive(),
672                                                )
673                                                .await;
674
675                                                let context = &module_contexts
676                                                    .get(&state.module_instance_id())
677                                                    .expect("Unknown module");
678
679                                                let operation_id = state.operation_id();
680                                                let global_context = global_context_gen(
681                                                    state.module_instance_id(),
682                                                    operation_id,
683                                                );
684
685                                                let is_terminal = new_state.is_terminal(context, &global_context);
686
687                                                if is_terminal {
688                                                    let k = InactiveStateKey::from_state(
689                                                        new_state.clone(),
690                                                    );
691                                                    let v = ActiveStateMeta::default().into_inactive();
692                                                    dbtx.insert_entry(&k, &v).await;
693                                                    Ok(ActiveOrInactiveState::Inactive {
694                                                        dyn_state: new_state,
695                                                    })
696                                                } else {
697                                                    let k = ActiveStateKey::from_state(
698                                                        new_state.clone(),
699                                                    );
700                                                    let v = ActiveStateMeta::default();
701                                                    dbtx.insert_entry(&k, &v).await;
702                                                    Ok(ActiveOrInactiveState::Active {
703                                                        dyn_state: new_state,
704                                                        meta: v,
705                                                    })
706                                                }
707                                            })
708                                        },
709                                        None,
710                                    )
711                                    .await
712                                    .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
713
714                                debug!(
715                                    target: LOG_CLIENT_REACTOR,
716                                    terminal = !outcome.is_active(),
717                                    ?outcome,
718                                    "State transition complete",
719                                );
720
721                                match &outcome {
722                                    ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
723                                        sm_update_tx
724                                            .send(dyn_state.clone())
725                                            .expect("can't fail: we are the receiving end");
726                                        notifier.notify(dyn_state.clone());
727                                    }
728                                    ActiveOrInactiveState::Inactive { dyn_state } => {
729                                        notifier.notify(dyn_state.clone());
730                                    }
731                                }
732                                ExecutorLoopEvent::Completed { state, outcome }
733                            }
734                            .instrument(span),
735                        )
736                    });
737                }
738                ExecutorLoopEvent::Invalid { state } => {
739                    trace!(
740                        target: LOG_CLIENT_REACTOR,
741                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
742                        "State invalid"
743                    );
744                    assert!(
745                        currently_running_sms.remove(&state),
746                        "State must have been recorded"
747                    );
748                }
749
750                ExecutorLoopEvent::Completed { state, outcome } => {
751                    assert!(
752                        currently_running_sms.remove(&state),
753                        "State must have been recorded"
754                    );
755                    debug!(
756                        target: LOG_CLIENT_REACTOR,
757                        operation_id = %state.operation_id().fmt_short(),
758                        outcome_active = outcome.is_active(),
759                        total = futures.len(),
760                        "State transition complete"
761                    );
762                    trace!(
763                        target: LOG_CLIENT_REACTOR,
764                        ?outcome,
765                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
766                        "State transition complete"
767                    );
768                }
769                ExecutorLoopEvent::Disconnected => {
770                    break;
771                }
772            }
773        }
774
775        info!(target: LOG_CLIENT_REACTOR, "Terminated.");
776        Ok(())
777    }
778
779    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
780        self.db
781            .begin_transaction_nc()
782            .await
783            .find_by_prefix(&ActiveStateKeyPrefix)
784            .await
785            // ignore states from modules that are not initialized yet
786            .filter(|(state, _)| {
787                future::ready(
788                    self.module_contexts
789                        .contains_key(&state.state.module_instance_id()),
790                )
791            })
792            .map(|(state, meta)| (state.state, meta))
793            .collect::<Vec<_>>()
794            .await
795    }
796
797    async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
798        // ignore states from modules that are not initialized yet
799        if !self
800            .module_contexts
801            .contains_key(&state.module_instance_id())
802        {
803            return None;
804        }
805        self.db
806            .begin_transaction_nc()
807            .await
808            .get_value(&ActiveStateKey::from_state(state.clone()))
809            .await
810    }
811
812    async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
813        self.db
814            .begin_transaction_nc()
815            .await
816            .find_by_prefix(&InactiveStateKeyPrefix)
817            .await
818            // ignore states from modules that are not initialized yet
819            .filter(|(state, _)| {
820                future::ready(
821                    self.module_contexts
822                        .contains_key(&state.state.module_instance_id()),
823                )
824            })
825            .map(|(state, meta)| (state.state, meta))
826            .collect::<Vec<_>>()
827            .await
828    }
829}
830
831impl ExecutorInner {
832    /// See [`Executor::stop_executor`].
833    fn stop_executor(&self) -> Option<()> {
834        let mut state = self.state.write().expect("Locking can't fail");
835
836        state.stop()
837    }
838}
839
840impl Debug for ExecutorInner {
841    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
842        writeln!(f, "ExecutorInner {{}}")
843    }
844}
845
846impl ExecutorBuilder {
847    /// Allow executor being built to run state machines associated with the
848    /// supplied module
849    pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
850    where
851        C: IntoDynInstance<DynType = DynContext>,
852    {
853        self.with_module_dyn(context.into_dyn(instance_id));
854    }
855
856    /// Allow executor being built to run state machines associated with the
857    /// supplied module
858    pub fn with_module_dyn(&mut self, context: DynContext) {
859        self.valid_module_ids.insert(context.module_instance_id());
860
861        if self
862            .module_contexts
863            .insert(context.module_instance_id(), context)
864            .is_some()
865        {
866            panic!("Tried to add two modules with the same instance id!");
867        }
868    }
869
870    /// Allow executor to build state machines associated with the module id,
871    /// for which the module itself might not be available yet (otherwise it
872    /// would be registered with `[Self::with_module_dyn]`).
873    pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
874        self.valid_module_ids.insert(module_id);
875    }
876
877    /// Build [`Executor`] and spawn background task in `tasks` executing active
878    /// state machines. The supplied database `db` must support isolation, so
879    /// cannot be an isolated DB instance itself.
880    pub fn build(self, db: Database, notifier: Notifier, client_task_group: TaskGroup) -> Executor {
881        let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
882
883        let inner = Arc::new(ExecutorInner {
884            db,
885            state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
886            module_contexts: self.module_contexts,
887            valid_module_ids: self.valid_module_ids,
888            notifier,
889            sm_update_tx,
890            client_task_group,
891        });
892
893        debug!(
894            target: LOG_CLIENT_REACTOR,
895            instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
896            "Initialized state machine executor with module instances"
897        );
898        Executor { inner }
899    }
900}
901
902/// A state that is able to make progress eventually
903#[derive(Debug)]
904pub struct ActiveStateKey {
905    // TODO: remove redundant operation id from state trait
906    pub operation_id: OperationId,
907    // TODO: state being a key... seems ... risky?
908    pub state: DynState,
909}
910
911impl ActiveStateKey {
912    pub fn from_state(state: DynState) -> ActiveStateKey {
913        ActiveStateKey {
914            operation_id: state.operation_id(),
915            state,
916        }
917    }
918}
919
920impl Encodable for ActiveStateKey {
921    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
922        let mut len = 0;
923        len += self.operation_id.consensus_encode(writer)?;
924        len += self.state.consensus_encode(writer)?;
925        Ok(len)
926    }
927}
928
929impl Decodable for ActiveStateKey {
930    fn consensus_decode_partial<R: Read>(
931        reader: &mut R,
932        modules: &ModuleDecoderRegistry,
933    ) -> Result<Self, DecodeError> {
934        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
935        let state = DynState::consensus_decode_partial(reader, modules)?;
936
937        Ok(ActiveStateKey {
938            operation_id,
939            state,
940        })
941    }
942}
943
944#[derive(Debug)]
945pub struct ActiveStateKeyBytes {
946    pub operation_id: OperationId,
947    pub module_instance_id: ModuleInstanceId,
948    pub state: Vec<u8>,
949}
950
951impl Encodable for ActiveStateKeyBytes {
952    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
953        let mut len = 0;
954        len += self.operation_id.consensus_encode(writer)?;
955        len += writer.write(self.state.as_slice())?;
956        Ok(len)
957    }
958}
959
960impl Decodable for ActiveStateKeyBytes {
961    fn consensus_decode_partial<R: std::io::Read>(
962        reader: &mut R,
963        modules: &ModuleDecoderRegistry,
964    ) -> Result<Self, DecodeError> {
965        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
966        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
967        let mut bytes = Vec::new();
968        reader
969            .read_to_end(&mut bytes)
970            .map_err(DecodeError::from_err)?;
971
972        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
973        instance_bytes.append(&mut bytes);
974
975        Ok(ActiveStateKeyBytes {
976            operation_id,
977            module_instance_id,
978            state: instance_bytes,
979        })
980    }
981}
982
983#[derive(Debug)]
984pub(crate) struct ActiveOperationStateKeyPrefix {
985    pub operation_id: OperationId,
986}
987
988impl Encodable for ActiveOperationStateKeyPrefix {
989    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
990        self.operation_id.consensus_encode(writer)
991    }
992}
993
994impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
995    type Record = ActiveStateKey;
996}
997
998#[derive(Debug)]
999pub(crate) struct ActiveModuleOperationStateKeyPrefix {
1000    pub operation_id: OperationId,
1001    pub module_instance: ModuleInstanceId,
1002}
1003
1004impl Encodable for ActiveModuleOperationStateKeyPrefix {
1005    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1006        let mut len = 0;
1007        len += self.operation_id.consensus_encode(writer)?;
1008        len += self.module_instance.consensus_encode(writer)?;
1009        Ok(len)
1010    }
1011}
1012
1013impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
1014    type Record = ActiveStateKey;
1015}
1016
1017#[derive(Debug)]
1018pub struct ActiveStateKeyPrefix;
1019
1020impl Encodable for ActiveStateKeyPrefix {
1021    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> {
1022        Ok(0)
1023    }
1024}
1025
1026#[derive(Debug, Copy, Clone, Encodable, Decodable)]
1027pub struct ActiveStateMeta {
1028    pub created_at: SystemTime,
1029}
1030
1031impl ::fedimint_core::db::DatabaseRecord for ActiveStateKey {
1032    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1033    const NOTIFY_ON_MODIFY: bool = true;
1034    type Key = Self;
1035    type Value = ActiveStateMeta;
1036}
1037
1038impl DatabaseKeyWithNotify for ActiveStateKey {}
1039
1040impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1041    type Record = ActiveStateKey;
1042}
1043
1044#[derive(Debug, Encodable, Decodable)]
1045pub(crate) struct ActiveStateKeyPrefixBytes;
1046
1047impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1048    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1049    const NOTIFY_ON_MODIFY: bool = false;
1050    type Key = Self;
1051    type Value = ActiveStateMeta;
1052}
1053
1054impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1055    type Record = ActiveStateKeyBytes;
1056}
1057
1058impl Default for ActiveStateMeta {
1059    fn default() -> Self {
1060        Self {
1061            created_at: fedimint_core::time::now(),
1062        }
1063    }
1064}
1065
1066impl ActiveStateMeta {
1067    fn into_inactive(self) -> InactiveStateMeta {
1068        InactiveStateMeta {
1069            created_at: self.created_at,
1070            exited_at: fedimint_core::time::now(),
1071        }
1072    }
1073}
1074
1075/// A past or final state of a state machine
1076#[derive(Debug, Clone)]
1077pub struct InactiveStateKey {
1078    // TODO: remove redundant operation id from state trait
1079    pub operation_id: OperationId,
1080    pub state: DynState,
1081}
1082
1083impl InactiveStateKey {
1084    pub fn from_state(state: DynState) -> InactiveStateKey {
1085        InactiveStateKey {
1086            operation_id: state.operation_id(),
1087            state,
1088        }
1089    }
1090}
1091
1092impl Encodable for InactiveStateKey {
1093    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1094        let mut len = 0;
1095        len += self.operation_id.consensus_encode(writer)?;
1096        len += self.state.consensus_encode(writer)?;
1097        Ok(len)
1098    }
1099}
1100
1101impl Decodable for InactiveStateKey {
1102    fn consensus_decode_partial<R: Read>(
1103        reader: &mut R,
1104        modules: &ModuleDecoderRegistry,
1105    ) -> Result<Self, DecodeError> {
1106        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1107        let state = DynState::consensus_decode_partial(reader, modules)?;
1108
1109        Ok(InactiveStateKey {
1110            operation_id,
1111            state,
1112        })
1113    }
1114}
1115
1116#[derive(Debug)]
1117pub struct InactiveStateKeyBytes {
1118    pub operation_id: OperationId,
1119    pub module_instance_id: ModuleInstanceId,
1120    pub state: Vec<u8>,
1121}
1122
1123impl Encodable for InactiveStateKeyBytes {
1124    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
1125        let mut len = 0;
1126        len += self.operation_id.consensus_encode(writer)?;
1127        len += writer.write(self.state.as_slice())?;
1128        Ok(len)
1129    }
1130}
1131
1132impl Decodable for InactiveStateKeyBytes {
1133    fn consensus_decode_partial<R: std::io::Read>(
1134        reader: &mut R,
1135        modules: &ModuleDecoderRegistry,
1136    ) -> Result<Self, DecodeError> {
1137        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1138        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1139        let mut bytes = Vec::new();
1140        reader
1141            .read_to_end(&mut bytes)
1142            .map_err(DecodeError::from_err)?;
1143
1144        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1145        instance_bytes.append(&mut bytes);
1146
1147        Ok(InactiveStateKeyBytes {
1148            operation_id,
1149            module_instance_id,
1150            state: instance_bytes,
1151        })
1152    }
1153}
1154
1155#[derive(Debug)]
1156pub(crate) struct InactiveOperationStateKeyPrefix {
1157    pub operation_id: OperationId,
1158}
1159
1160impl Encodable for InactiveOperationStateKeyPrefix {
1161    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1162        self.operation_id.consensus_encode(writer)
1163    }
1164}
1165
1166impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1167    type Record = InactiveStateKey;
1168}
1169
1170#[derive(Debug)]
1171pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1172    pub operation_id: OperationId,
1173    pub module_instance: ModuleInstanceId,
1174}
1175
1176impl Encodable for InactiveModuleOperationStateKeyPrefix {
1177    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1178        let mut len = 0;
1179        len += self.operation_id.consensus_encode(writer)?;
1180        len += self.module_instance.consensus_encode(writer)?;
1181        Ok(len)
1182    }
1183}
1184
1185impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1186    type Record = InactiveStateKey;
1187}
1188
1189#[derive(Debug, Clone)]
1190pub struct InactiveStateKeyPrefix;
1191
1192impl Encodable for InactiveStateKeyPrefix {
1193    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> {
1194        Ok(0)
1195    }
1196}
1197
1198#[derive(Debug, Encodable, Decodable)]
1199pub(crate) struct InactiveStateKeyPrefixBytes;
1200
1201impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1202    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1203    const NOTIFY_ON_MODIFY: bool = false;
1204    type Key = Self;
1205    type Value = InactiveStateMeta;
1206}
1207
1208impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1209    type Record = InactiveStateKeyBytes;
1210}
1211
1212#[derive(Debug, Copy, Clone, Decodable, Encodable)]
1213pub struct InactiveStateMeta {
1214    pub created_at: SystemTime,
1215    pub exited_at: SystemTime,
1216}
1217
1218impl ::fedimint_core::db::DatabaseRecord for InactiveStateKey {
1219    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1220    const NOTIFY_ON_MODIFY: bool = true;
1221    type Key = Self;
1222    type Value = InactiveStateMeta;
1223}
1224
1225impl DatabaseKeyWithNotify for InactiveStateKey {}
1226
1227impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1228    type Record = InactiveStateKey;
1229}
1230
1231#[derive(Debug)]
1232enum ActiveOrInactiveState {
1233    Active {
1234        dyn_state: DynState,
1235        #[allow(dead_code)] // currently not printed anywhere, but useful in the db
1236        meta: ActiveStateMeta,
1237    },
1238    Inactive {
1239        dyn_state: DynState,
1240    },
1241}
1242
1243impl ActiveOrInactiveState {
1244    fn is_active(&self) -> bool {
1245        match self {
1246            ActiveOrInactiveState::Active { .. } => true,
1247            ActiveOrInactiveState::Inactive { .. } => false,
1248        }
1249    }
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254    use std::fmt::Debug;
1255    use std::sync::Arc;
1256    use std::time::Duration;
1257
1258    use fedimint_core::core::{
1259        Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId,
1260    };
1261    use fedimint_core::db::mem_impl::MemDatabase;
1262    use fedimint_core::db::Database;
1263    use fedimint_core::encoding::{Decodable, Encodable};
1264    use fedimint_core::module::registry::ModuleDecoderRegistry;
1265    use fedimint_core::runtime;
1266    use fedimint_core::task::TaskGroup;
1267    use fedimint_logging::LOG_CLIENT_REACTOR;
1268    use tokio::sync::broadcast::Sender;
1269    use tracing::{info, trace};
1270
1271    use crate::sm::state::{Context, DynContext, DynState};
1272    use crate::sm::{Executor, Notifier, State, StateTransition};
1273    use crate::DynGlobalClientContext;
1274
1275    #[derive(Debug, Clone, Eq, PartialEq, Decodable, Encodable, Hash)]
1276    enum MockStateMachine {
1277        Start,
1278        ReceivedNonNull(u64),
1279        Final,
1280    }
1281
1282    impl State for MockStateMachine {
1283        type ModuleContext = MockContext;
1284
1285        fn transitions(
1286            &self,
1287            context: &Self::ModuleContext,
1288            _global_context: &DynGlobalClientContext,
1289        ) -> Vec<StateTransition<Self>> {
1290            match self {
1291                MockStateMachine::Start => {
1292                    let mut receiver1 = context.broadcast.subscribe();
1293                    let mut receiver2 = context.broadcast.subscribe();
1294                    vec![
1295                        StateTransition::new(
1296                            async move {
1297                                loop {
1298                                    let val = receiver1.recv().await.unwrap();
1299                                    if val == 0 {
1300                                        trace!("State transition Start->Final");
1301                                        break;
1302                                    }
1303                                }
1304                            },
1305                            |_dbtx, (), _state| Box::pin(async { MockStateMachine::Final }),
1306                        ),
1307                        StateTransition::new(
1308                            async move {
1309                                loop {
1310                                    let val = receiver2.recv().await.unwrap();
1311                                    if val != 0 {
1312                                        trace!("State transition Start->ReceivedNonNull");
1313                                        break val;
1314                                    }
1315                                }
1316                            },
1317                            |_dbtx, value, _state| {
1318                                Box::pin(async move { MockStateMachine::ReceivedNonNull(value) })
1319                            },
1320                        ),
1321                    ]
1322                }
1323                MockStateMachine::ReceivedNonNull(prev_val) => {
1324                    let prev_val = *prev_val;
1325                    let mut receiver = context.broadcast.subscribe();
1326                    vec![StateTransition::new(
1327                        async move {
1328                            loop {
1329                                let val = receiver.recv().await.unwrap();
1330                                if val == prev_val {
1331                                    trace!("State transition ReceivedNonNull->Final");
1332                                    break;
1333                                }
1334                            }
1335                        },
1336                        |_dbtx, (), _state| Box::pin(async { MockStateMachine::Final }),
1337                    )]
1338                }
1339                MockStateMachine::Final => {
1340                    vec![]
1341                }
1342            }
1343        }
1344
1345        fn operation_id(&self) -> OperationId {
1346            OperationId([0u8; 32])
1347        }
1348    }
1349
1350    impl IntoDynInstance for MockStateMachine {
1351        type DynType = DynState;
1352
1353        fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1354            DynState::from_typed(instance_id, self)
1355        }
1356    }
1357
1358    #[derive(Debug, Clone)]
1359    struct MockContext {
1360        broadcast: tokio::sync::broadcast::Sender<u64>,
1361    }
1362
1363    impl IntoDynInstance for MockContext {
1364        type DynType = DynContext;
1365
1366        fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1367            DynContext::from_typed(instance_id, self)
1368        }
1369    }
1370
1371    impl Context for MockContext {
1372        const KIND: Option<ModuleKind> = None;
1373    }
1374
1375    fn get_executor() -> (Executor, Sender<u64>, Database) {
1376        let (broadcast, _) = tokio::sync::broadcast::channel(10);
1377
1378        let mut decoder_builder = Decoder::builder();
1379        decoder_builder.with_decodable_type::<MockStateMachine>();
1380        let decoder = decoder_builder.build();
1381
1382        let decoders =
1383            ModuleDecoderRegistry::new(vec![(42, ModuleKind::from_static_str("test"), decoder)]);
1384        let db = Database::new(MemDatabase::new(), decoders);
1385
1386        let mut executor_builder = Executor::builder();
1387        executor_builder.with_module(
1388            42,
1389            MockContext {
1390                broadcast: broadcast.clone(),
1391            },
1392        );
1393        let executor =
1394            executor_builder.build(db.clone(), Notifier::new(db.clone()), TaskGroup::new());
1395        executor.start_executor(Arc::new(|_, _| DynGlobalClientContext::new_fake()));
1396
1397        info!(
1398            target: LOG_CLIENT_REACTOR,
1399            "Initialized test executor"
1400        );
1401        (executor, broadcast, db)
1402    }
1403
1404    #[tokio::test]
1405    #[tracing_test::traced_test]
1406    async fn test_executor() {
1407        const MOCK_INSTANCE_1: ModuleInstanceId = 42;
1408        const MOCK_INSTANCE_2: ModuleInstanceId = 21;
1409
1410        let (executor, sender, _db) = get_executor();
1411        executor
1412            .add_state_machines(vec![DynState::from_typed(
1413                MOCK_INSTANCE_1,
1414                MockStateMachine::Start,
1415            )])
1416            .await
1417            .unwrap();
1418
1419        assert!(
1420            executor
1421                .add_state_machines(vec![DynState::from_typed(
1422                    MOCK_INSTANCE_1,
1423                    MockStateMachine::Start
1424                )])
1425                .await
1426                .is_err(),
1427            "Running the same state machine a second time should fail"
1428        );
1429
1430        assert!(
1431            executor
1432                .contains_active_state(MOCK_INSTANCE_1, MockStateMachine::Start)
1433                .await,
1434            "State was written to DB and waits for broadcast"
1435        );
1436        assert!(
1437            !executor
1438                .contains_active_state(MOCK_INSTANCE_2, MockStateMachine::Start)
1439                .await,
1440            "Instance separation works"
1441        );
1442
1443        // TODO build await fn+timeout or allow manual driving of executor
1444        runtime::sleep(Duration::from_secs(1)).await;
1445        sender.send(0).unwrap();
1446        runtime::sleep(Duration::from_secs(2)).await;
1447
1448        assert!(
1449            executor
1450                .contains_inactive_state(MOCK_INSTANCE_1, MockStateMachine::Final)
1451                .await,
1452            "State was written to DB and waits for broadcast"
1453        );
1454    }
1455}