1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Read, Write};
5use std::mem;
6use std::sync::Arc;
7use std::time::SystemTime;
8
9use anyhow::anyhow;
10use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
11use fedimint_core::db::{
12 AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
13 IDatabaseTransactionOpsCoreTyped,
14};
15use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
16use fedimint_core::fmt_utils::AbbreviateJson;
17use fedimint_core::maybe_add_send_sync;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::task::TaskGroup;
20use fedimint_core::util::BoxFuture;
21use fedimint_logging::LOG_CLIENT_REACTOR;
22use futures::future::{self, select_all};
23use futures::stream::{FuturesUnordered, StreamExt};
24use tokio::select;
25use tokio::sync::{mpsc, oneshot};
26use tracing::{debug, error, info, trace, warn, Instrument};
27
28use super::state::StateTransitionFunction;
29use crate::sm::notifier::Notifier;
30use crate::sm::state::{DynContext, DynState};
31use crate::sm::{ClientSMDatabaseTransaction, State, StateTransition};
32use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
33
34const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
36
37pub type ContextGen =
38 Arc<maybe_add_send_sync!(dyn Fn(ModuleInstanceId, OperationId) -> DynGlobalClientContext)>;
39
40enum ExecutorDbPrefixes {
42 ActiveStates = 0xa1,
44 InactiveStates = 0xa2,
46}
47
48#[derive(Clone, Debug)]
56pub struct Executor {
57 inner: Arc<ExecutorInner>,
58}
59
60struct ExecutorInner {
61 db: Database,
62 state: std::sync::RwLock<ExecutorState>,
63 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
64 valid_module_ids: BTreeSet<ModuleInstanceId>,
65 notifier: Notifier,
66 sm_update_tx: mpsc::UnboundedSender<DynState>,
69 client_task_group: TaskGroup,
70}
71
72enum ExecutorState {
73 Unstarted {
74 sm_update_rx: mpsc::UnboundedReceiver<DynState>,
75 },
76 Running {
77 context_gen: ContextGen,
78 shutdown_sender: oneshot::Sender<()>,
79 },
80 Stopped,
81}
82
83impl ExecutorState {
84 fn start(
88 &mut self,
89 context: ContextGen,
90 ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
91 let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
92
93 let previous_state = mem::replace(
94 self,
95 ExecutorState::Running {
96 context_gen: context,
97 shutdown_sender,
98 },
99 );
100
101 if let ExecutorState::Unstarted { sm_update_rx } = previous_state {
102 Some((shutdown_receiver, sm_update_rx))
103 } else {
104 *self = previous_state;
106
107 debug!("Executor already started, ignoring start request");
108 None
109 }
110 }
111
112 fn stop(&mut self) -> Option<()> {
115 let previous_state = mem::replace(self, ExecutorState::Stopped);
116
117 if let ExecutorState::Running {
118 shutdown_sender, ..
119 } = previous_state
120 {
121 if shutdown_sender.send(()).is_err() {
122 warn!("Failed to send shutdown signal to executor, already dead?");
123 }
124 Some(())
125 } else {
126 *self = previous_state;
128
129 debug!("Executor not running, ignoring stop request");
130 None
131 }
132 }
133
134 fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
135 let ExecutorState::Running { context_gen, .. } = self else {
136 return None;
137 };
138 Some(context_gen(
139 state.module_instance_id(),
140 state.operation_id(),
141 ))
142 }
143}
144
145#[derive(Debug, Default)]
148pub struct ExecutorBuilder {
149 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
150 valid_module_ids: BTreeSet<ModuleInstanceId>,
151}
152
153impl Executor {
154 pub fn builder() -> ExecutorBuilder {
156 ExecutorBuilder::default()
157 }
158
159 pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
160 self.inner.get_active_states().await
161 }
162
163 pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
169 self.inner
170 .db
171 .autocommit(
172 |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
173 MAX_DB_ATTEMPTS,
174 )
175 .await
176 .map_err(|e| match e {
177 AutocommitError::CommitFailed {
178 last_error,
179 attempts,
180 } => last_error.context(format!("Failed to commit after {attempts} attempts")),
181 AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
182 })?;
183
184 Ok(())
187 }
188
189 pub async fn add_state_machines_dbtx(
198 &self,
199 dbtx: &mut DatabaseTransaction<'_>,
200 states: Vec<DynState>,
201 ) -> AddStateMachinesResult {
202 for state in states {
203 if !self
204 .inner
205 .valid_module_ids
206 .contains(&state.module_instance_id())
207 {
208 return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
209 }
210
211 let is_active_state = dbtx
212 .get_value(&ActiveStateKey::from_state(state.clone()))
213 .await
214 .is_some();
215 let is_inactive_state = dbtx
216 .get_value(&InactiveStateKey::from_state(state.clone()))
217 .await
218 .is_some();
219
220 if is_active_state || is_inactive_state {
221 return Err(AddStateMachinesError::StateAlreadyExists);
222 }
223
224 if let Some(module_context) =
229 self.inner.module_contexts.get(&state.module_instance_id())
230 {
231 if let Some(context) = self
232 .inner
233 .state
234 .read()
235 .expect("locking failed")
236 .gen_context(&state)
237 {
238 if state.is_terminal(module_context, &context) {
239 return Err(AddStateMachinesError::Other(anyhow!(
240 "State is already terminal, adding it to the executor doesn't make sense."
241 )));
242 }
243 } else {
244 warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
245 }
246 }
247
248 dbtx.insert_new_entry(
249 &ActiveStateKey::from_state(state.clone()),
250 &ActiveStateMeta::default(),
251 )
252 .await;
253
254 let notify_sender = self.inner.notifier.sender();
255 let sm_updates_tx = self.inner.sm_update_tx.clone();
256 dbtx.on_commit(move || {
257 notify_sender.notify(state.clone());
258 let _ = sm_updates_tx.send(state);
259 });
260 }
261
262 Ok(())
263 }
264
265 pub async fn contains_active_state<S: State>(
270 &self,
271 instance: ModuleInstanceId,
272 state: S,
273 ) -> bool {
274 let state = DynState::from_typed(instance, state);
275 self.inner
276 .get_active_states()
277 .await
278 .into_iter()
279 .any(|(s, _)| s == state)
280 }
281
282 pub async fn contains_inactive_state<S: State>(
290 &self,
291 instance: ModuleInstanceId,
292 state: S,
293 ) -> bool {
294 let state = DynState::from_typed(instance, state);
295 self.inner
296 .get_inactive_states()
297 .await
298 .into_iter()
299 .any(|(s, _)| s == state)
300 }
301
302 pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
303 self.inner
304 .db
305 .wait_key_exists(&InactiveStateKey::from_state(state))
306 .await
307 }
308
309 pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
310 self.inner
311 .db
312 .wait_key_exists(&ActiveStateKey::from_state(state))
313 .await
314 }
315
316 pub async fn get_operation_states(
318 &self,
319 operation_id: OperationId,
320 ) -> (
321 Vec<(DynState, ActiveStateMeta)>,
322 Vec<(DynState, InactiveStateMeta)>,
323 ) {
324 let mut dbtx = self.inner.db.begin_transaction_nc().await;
325 let active_states: Vec<_> = dbtx
326 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
327 .await
328 .map(|(active_key, active_meta)| (active_key.state, active_meta))
329 .collect()
330 .await;
331 let inactive_states: Vec<_> = dbtx
332 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
333 .await
334 .map(|(active_key, inactive_meta)| (active_key.state, inactive_meta))
335 .collect()
336 .await;
337
338 (active_states, inactive_states)
339 }
340
341 pub fn start_executor(&self, context_gen: ContextGen) {
348 let Some((shutdown_receiver, sm_update_rx)) = self
349 .inner
350 .state
351 .write()
352 .expect("locking can't fail")
353 .start(context_gen.clone())
354 else {
355 panic!("start_executor was called previously");
356 };
357
358 let task_runner_inner = self.inner.clone();
359 let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
360 let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
361 let task_group_shutdown_rx = task_handle.make_shutdown_rx();
362 select! {
363 () = task_group_shutdown_rx => {
364 debug!("Shutting down state machine executor runner due to task group shutdown signal");
365 },
366 shutdown_happened_sender = shutdown_receiver => {
367 match shutdown_happened_sender {
368 Ok(()) => {
369 debug!("Shutting down state machine executor runner due to explicit shutdown signal");
370 },
371 Err(_) => {
372 warn!("Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)");
373 }
374 }
375 },
376 () = executor_runner => {
377 error!("State machine executor runner exited unexpectedly!");
378 },
379 };
380 });
381 }
382
383 pub fn stop_executor(&self) -> Option<()> {
397 self.inner.stop_executor()
398 }
399
400 pub fn notifier(&self) -> &Notifier {
403 &self.inner.notifier
404 }
405}
406
407impl Drop for ExecutorInner {
408 fn drop(&mut self) {
409 self.stop_executor();
410 }
411}
412
413struct TransitionForActiveState {
414 outcome: serde_json::Value,
415 state: DynState,
416 meta: ActiveStateMeta,
417 transition_fn: StateTransitionFunction<DynState>,
418}
419
420impl ExecutorInner {
421 async fn run(
422 &self,
423 global_context_gen: ContextGen,
424 sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
425 ) {
426 debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
427 if let Err(err) = self
428 .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
429 .await
430 {
431 warn!(
432 %err,
433 "An unexpected error occurred during a state transition"
434 );
435 }
436 }
437
438 async fn get_transition_for(
439 &self,
440 state: &DynState,
441 meta: ActiveStateMeta,
442 global_context_gen: &ContextGen,
443 ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
444 let module_instance = state.module_instance_id();
445 let context = &self
446 .module_contexts
447 .get(&module_instance)
448 .expect("Unknown module");
449 let transitions = state
450 .transitions(
451 context,
452 &global_context_gen(module_instance, state.operation_id()),
453 )
454 .into_iter()
455 .map(|transition| {
456 let state = state.clone();
457 let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
458 let StateTransition {
459 trigger,
460 transition,
461 } = transition;
462 TransitionForActiveState {
463 outcome: trigger.await,
464 state,
465 transition_fn: transition,
466 meta,
467 }
468 });
469 f
470 })
471 .collect::<Vec<_>>();
472 if transitions.is_empty() {
473 warn!(module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream.");
477 self.db
478 .autocommit::<_, _, anyhow::Error>(
479 |dbtx, _| {
480 Box::pin(async {
481 let k = InactiveStateKey::from_state(state.clone());
482 let v = ActiveStateMeta::default().into_inactive();
483 dbtx.remove_entry(&ActiveStateKey::from_state(state.clone()))
484 .await;
485 dbtx.insert_entry(&k, &v).await;
486 Ok(())
487 })
488 },
489 None,
490 )
491 .await
492 .expect("Autocommit here can't fail");
493 }
494
495 transitions
496 }
497
498 async fn run_state_machines_executor_inner(
499 &self,
500 global_context_gen: ContextGen,
501 mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
502 ) -> anyhow::Result<()> {
503 enum ExecutorLoopEvent {
506 New { state: DynState },
509 Triggered(TransitionForActiveState),
512 Invalid { state: DynState },
514 Completed {
516 state: DynState,
517 outcome: ActiveOrInactiveState,
518 },
519 Disconnected,
521 }
522
523 let active_states = self.get_active_states().await;
524 trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
525 for (state, _meta) in active_states {
526 self.sm_update_tx
527 .send(state)
528 .expect("Must be able to send state machine to own opened channel");
529 }
530
531 let mut currently_running_sms = HashSet::<DynState>::new();
534 let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
542 FuturesUnordered::new();
543
544 loop {
545 let event = tokio::select! {
546 new = sm_update_rx.recv() => {
547 if let Some(new) = new {
548 ExecutorLoopEvent::New {
549 state: new,
550 }
551 } else {
552 ExecutorLoopEvent::Disconnected
553 }
554 },
555
556 event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
557 };
558
559 match event {
562 ExecutorLoopEvent::New { state } => {
563 if currently_running_sms.contains(&state) {
564 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
565 continue;
566 }
567 currently_running_sms.insert(state.clone());
568 let futures_len = futures.len();
569 let global_context_gen = &global_context_gen;
570 trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
571 futures.push(Box::pin(async move {
572 let Some(meta) = self.get_active_state(&state).await else {
573 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
574 return ExecutorLoopEvent::Invalid { state: state.clone() };
575 };
576
577 let transitions = self
578 .get_transition_for(&state, meta, global_context_gen)
579 .await;
580 if transitions.is_empty() {
581 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
582 return ExecutorLoopEvent::Invalid { state: state.clone() };
583 }
584 let transitions_num = transitions.len();
585
586 debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
587
588 let (first_completed_result, _index, _unused_transitions) =
589 select_all(transitions).await;
590 ExecutorLoopEvent::Triggered(first_completed_result)
591 }));
592 }
593 ExecutorLoopEvent::Triggered(TransitionForActiveState {
594 outcome,
595 state,
596 meta,
597 transition_fn,
598 }) => {
599 debug!(
600 target: LOG_CLIENT_REACTOR,
601 operation_id = %state.operation_id().fmt_short(),
602 "Triggered state transition",
603 );
604 let span = tracing::debug_span!(
605 target: LOG_CLIENT_REACTOR,
606 "sm_transition",
607 operation_id = %state.operation_id().fmt_short()
608 );
609 futures.push({
615 let sm_update_tx = self.sm_update_tx.clone();
616 let db = self.db.clone();
617 let notifier = self.notifier.clone();
618 let module_contexts = self.module_contexts.clone();
619 let global_context_gen = global_context_gen.clone();
620 Box::pin(
621 async move {
622 debug!(
623 target: LOG_CLIENT_REACTOR,
624 "Executing state transition",
625 );
626 trace!(
627 target: LOG_CLIENT_REACTOR,
628 ?state,
629 outcome = ?AbbreviateJson(&outcome),
630 "Executing state transition (details)",
631 );
632
633 let module_contexts = &module_contexts;
634 let global_context_gen = &global_context_gen;
635
636 let outcome = db
637 .autocommit::<'_, '_, _, _, Infallible>(
638 |dbtx, _| {
639 let state = state.clone();
640 let transition_fn = transition_fn.clone();
641 let transition_outcome = outcome.clone();
642 Box::pin(async move {
643 let new_state = transition_fn(
644 &mut ClientSMDatabaseTransaction::new(
645 &mut dbtx.to_ref(),
646 state.module_instance_id(),
647 ),
648 transition_outcome.clone(),
649 state.clone(),
650 )
651 .await;
652 dbtx.remove_entry(&ActiveStateKey::from_state(
653 state.clone(),
654 ))
655 .await;
656 dbtx.insert_entry(
657 &InactiveStateKey::from_state(state.clone()),
658 &meta.into_inactive(),
659 )
660 .await;
661
662 let context = &module_contexts
663 .get(&state.module_instance_id())
664 .expect("Unknown module");
665
666 let operation_id = state.operation_id();
667 let global_context = global_context_gen(
668 state.module_instance_id(),
669 operation_id,
670 );
671
672 let is_terminal = new_state.is_terminal(context, &global_context);
673
674 if is_terminal {
675 let k = InactiveStateKey::from_state(
676 new_state.clone(),
677 );
678 let v = ActiveStateMeta::default().into_inactive();
679 dbtx.insert_entry(&k, &v).await;
680 Ok(ActiveOrInactiveState::Inactive {
681 dyn_state: new_state,
682 })
683 } else {
684 let k = ActiveStateKey::from_state(
685 new_state.clone(),
686 );
687 let v = ActiveStateMeta::default();
688 dbtx.insert_entry(&k, &v).await;
689 Ok(ActiveOrInactiveState::Active {
690 dyn_state: new_state,
691 meta: v,
692 })
693 }
694 })
695 },
696 None,
697 )
698 .await
699 .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
700
701 debug!(
702 target: LOG_CLIENT_REACTOR,
703 terminal = !outcome.is_active(),
704 ?outcome,
705 "State transition complete",
706 );
707
708 match &outcome {
709 ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
710 sm_update_tx
711 .send(dyn_state.clone())
712 .expect("can't fail: we are the receiving end");
713 notifier.notify(dyn_state.clone());
714 }
715 ActiveOrInactiveState::Inactive { dyn_state } => {
716 notifier.notify(dyn_state.clone());
717 }
718 }
719 ExecutorLoopEvent::Completed { state, outcome }
720 }
721 .instrument(span),
722 )
723 });
724 }
725 ExecutorLoopEvent::Invalid { state } => {
726 trace!(
727 target: LOG_CLIENT_REACTOR,
728 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
729 "State invalid"
730 );
731 assert!(
732 currently_running_sms.remove(&state),
733 "State must have been recorded"
734 );
735 }
736
737 ExecutorLoopEvent::Completed { state, outcome } => {
738 assert!(
739 currently_running_sms.remove(&state),
740 "State must have been recorded"
741 );
742 debug!(
743 target: LOG_CLIENT_REACTOR,
744 operation_id = %state.operation_id().fmt_short(),
745 outcome_active = outcome.is_active(),
746 total = futures.len(),
747 "State transition complete"
748 );
749 trace!(
750 target: LOG_CLIENT_REACTOR,
751 ?outcome,
752 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
753 "State transition complete"
754 );
755 }
756 ExecutorLoopEvent::Disconnected => {
757 break;
758 }
759 }
760 }
761
762 info!(target: LOG_CLIENT_REACTOR, "Terminated.");
763 Ok(())
764 }
765
766 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
767 self.db
768 .begin_transaction_nc()
769 .await
770 .find_by_prefix(&ActiveStateKeyPrefix)
771 .await
772 .filter(|(state, _)| {
774 future::ready(
775 self.module_contexts
776 .contains_key(&state.state.module_instance_id()),
777 )
778 })
779 .map(|(state, meta)| (state.state, meta))
780 .collect::<Vec<_>>()
781 .await
782 }
783
784 async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
785 if !self
787 .module_contexts
788 .contains_key(&state.module_instance_id())
789 {
790 return None;
791 }
792 self.db
793 .begin_transaction_nc()
794 .await
795 .get_value(&ActiveStateKey::from_state(state.clone()))
796 .await
797 }
798
799 async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
800 self.db
801 .begin_transaction_nc()
802 .await
803 .find_by_prefix(&InactiveStateKeyPrefix)
804 .await
805 .filter(|(state, _)| {
807 future::ready(
808 self.module_contexts
809 .contains_key(&state.state.module_instance_id()),
810 )
811 })
812 .map(|(state, meta)| (state.state, meta))
813 .collect::<Vec<_>>()
814 .await
815 }
816}
817
818impl ExecutorInner {
819 fn stop_executor(&self) -> Option<()> {
821 let mut state = self.state.write().expect("Locking can't fail");
822
823 state.stop()
824 }
825}
826
827impl Debug for ExecutorInner {
828 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
829 writeln!(f, "ExecutorInner {{}}")
830 }
831}
832
833impl ExecutorBuilder {
834 pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
837 where
838 C: IntoDynInstance<DynType = DynContext>,
839 {
840 self.with_module_dyn(context.into_dyn(instance_id));
841 }
842
843 pub fn with_module_dyn(&mut self, context: DynContext) {
846 self.valid_module_ids.insert(context.module_instance_id());
847
848 if self
849 .module_contexts
850 .insert(context.module_instance_id(), context)
851 .is_some()
852 {
853 panic!("Tried to add two modules with the same instance id!");
854 }
855 }
856
857 pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
861 self.valid_module_ids.insert(module_id);
862 }
863
864 pub fn build(self, db: Database, notifier: Notifier, client_task_group: TaskGroup) -> Executor {
868 let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
869
870 let inner = Arc::new(ExecutorInner {
871 db,
872 state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
873 module_contexts: self.module_contexts,
874 valid_module_ids: self.valid_module_ids,
875 notifier,
876 sm_update_tx,
877 client_task_group,
878 });
879
880 debug!(
881 instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
882 "Initialized state machine executor with module instances"
883 );
884 Executor { inner }
885 }
886}
887
888#[derive(Debug)]
890pub struct ActiveStateKey {
891 pub operation_id: OperationId,
893 pub state: DynState,
895}
896
897impl ActiveStateKey {
898 pub fn from_state(state: DynState) -> ActiveStateKey {
899 ActiveStateKey {
900 operation_id: state.operation_id(),
901 state,
902 }
903 }
904}
905
906impl Encodable for ActiveStateKey {
907 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
908 let mut len = 0;
909 len += self.operation_id.consensus_encode(writer)?;
910 len += self.state.consensus_encode(writer)?;
911 Ok(len)
912 }
913}
914
915impl Decodable for ActiveStateKey {
916 fn consensus_decode<R: Read>(
917 reader: &mut R,
918 modules: &ModuleDecoderRegistry,
919 ) -> Result<Self, DecodeError> {
920 let operation_id = OperationId::consensus_decode(reader, modules)?;
921 let state = DynState::consensus_decode(reader, modules)?;
922
923 Ok(ActiveStateKey {
924 operation_id,
925 state,
926 })
927 }
928}
929
930#[derive(Debug)]
931pub struct ActiveStateKeyBytes {
932 pub operation_id: OperationId,
933 pub module_instance_id: ModuleInstanceId,
934 pub state: Vec<u8>,
935}
936
937impl Encodable for ActiveStateKeyBytes {
938 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
939 let mut len = 0;
940 len += self.operation_id.consensus_encode(writer)?;
941 len += writer.write(self.state.as_slice())?;
942 Ok(len)
943 }
944}
945
946impl Decodable for ActiveStateKeyBytes {
947 fn consensus_decode<R: std::io::Read>(
948 reader: &mut R,
949 modules: &ModuleDecoderRegistry,
950 ) -> Result<Self, DecodeError> {
951 let operation_id = OperationId::consensus_decode(reader, modules)?;
952 let module_instance_id = ModuleInstanceId::consensus_decode(reader, modules)?;
953 let mut bytes = Vec::new();
954 reader
955 .read_to_end(&mut bytes)
956 .map_err(DecodeError::from_err)?;
957
958 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
959 instance_bytes.append(&mut bytes);
960
961 Ok(ActiveStateKeyBytes {
962 operation_id,
963 module_instance_id,
964 state: instance_bytes,
965 })
966 }
967}
968
969#[derive(Debug)]
970pub(crate) struct ActiveOperationStateKeyPrefix {
971 pub operation_id: OperationId,
972}
973
974impl Encodable for ActiveOperationStateKeyPrefix {
975 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
976 self.operation_id.consensus_encode(writer)
977 }
978}
979
980impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
981 type Record = ActiveStateKey;
982}
983
984#[derive(Debug)]
985pub(crate) struct ActiveModuleOperationStateKeyPrefix {
986 pub operation_id: OperationId,
987 pub module_instance: ModuleInstanceId,
988}
989
990impl Encodable for ActiveModuleOperationStateKeyPrefix {
991 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
992 let mut len = 0;
993 len += self.operation_id.consensus_encode(writer)?;
994 len += self.module_instance.consensus_encode(writer)?;
995 Ok(len)
996 }
997}
998
999impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
1000 type Record = ActiveStateKey;
1001}
1002
1003#[derive(Debug)]
1004pub struct ActiveStateKeyPrefix;
1005
1006impl Encodable for ActiveStateKeyPrefix {
1007 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> {
1008 Ok(0)
1009 }
1010}
1011
1012#[derive(Debug, Copy, Clone, Encodable, Decodable)]
1013pub struct ActiveStateMeta {
1014 pub created_at: SystemTime,
1015}
1016
1017impl ::fedimint_core::db::DatabaseRecord for ActiveStateKey {
1018 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1019 const NOTIFY_ON_MODIFY: bool = true;
1020 type Key = Self;
1021 type Value = ActiveStateMeta;
1022}
1023
1024impl DatabaseKeyWithNotify for ActiveStateKey {}
1025
1026impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1027 type Record = ActiveStateKey;
1028}
1029
1030#[derive(Debug, Encodable, Decodable)]
1031pub(crate) struct ActiveStateKeyPrefixBytes;
1032
1033impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1034 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1035 const NOTIFY_ON_MODIFY: bool = false;
1036 type Key = Self;
1037 type Value = ActiveStateMeta;
1038}
1039
1040impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1041 type Record = ActiveStateKeyBytes;
1042}
1043
1044impl Default for ActiveStateMeta {
1045 fn default() -> Self {
1046 Self {
1047 created_at: fedimint_core::time::now(),
1048 }
1049 }
1050}
1051
1052impl ActiveStateMeta {
1053 fn into_inactive(self) -> InactiveStateMeta {
1054 InactiveStateMeta {
1055 created_at: self.created_at,
1056 exited_at: fedimint_core::time::now(),
1057 }
1058 }
1059}
1060
1061#[derive(Debug, Clone)]
1063pub struct InactiveStateKey {
1064 pub operation_id: OperationId,
1066 pub state: DynState,
1067}
1068
1069impl InactiveStateKey {
1070 pub fn from_state(state: DynState) -> InactiveStateKey {
1071 InactiveStateKey {
1072 operation_id: state.operation_id(),
1073 state,
1074 }
1075 }
1076}
1077
1078impl Encodable for InactiveStateKey {
1079 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1080 let mut len = 0;
1081 len += self.operation_id.consensus_encode(writer)?;
1082 len += self.state.consensus_encode(writer)?;
1083 Ok(len)
1084 }
1085}
1086
1087impl Decodable for InactiveStateKey {
1088 fn consensus_decode<R: Read>(
1089 reader: &mut R,
1090 modules: &ModuleDecoderRegistry,
1091 ) -> Result<Self, DecodeError> {
1092 let operation_id = OperationId::consensus_decode(reader, modules)?;
1093 let state = DynState::consensus_decode(reader, modules)?;
1094
1095 Ok(InactiveStateKey {
1096 operation_id,
1097 state,
1098 })
1099 }
1100}
1101
1102#[derive(Debug)]
1103pub struct InactiveStateKeyBytes {
1104 pub operation_id: OperationId,
1105 pub module_instance_id: ModuleInstanceId,
1106 pub state: Vec<u8>,
1107}
1108
1109impl Encodable for InactiveStateKeyBytes {
1110 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
1111 let mut len = 0;
1112 len += self.operation_id.consensus_encode(writer)?;
1113 len += writer.write(self.state.as_slice())?;
1114 Ok(len)
1115 }
1116}
1117
1118impl Decodable for InactiveStateKeyBytes {
1119 fn consensus_decode<R: std::io::Read>(
1120 reader: &mut R,
1121 modules: &ModuleDecoderRegistry,
1122 ) -> Result<Self, DecodeError> {
1123 let operation_id = OperationId::consensus_decode(reader, modules)?;
1124 let module_instance_id = ModuleInstanceId::consensus_decode(reader, modules)?;
1125 let mut bytes = Vec::new();
1126 reader
1127 .read_to_end(&mut bytes)
1128 .map_err(DecodeError::from_err)?;
1129
1130 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1131 instance_bytes.append(&mut bytes);
1132
1133 Ok(InactiveStateKeyBytes {
1134 operation_id,
1135 module_instance_id,
1136 state: instance_bytes,
1137 })
1138 }
1139}
1140
1141#[derive(Debug)]
1142pub(crate) struct InactiveOperationStateKeyPrefix {
1143 pub operation_id: OperationId,
1144}
1145
1146impl Encodable for InactiveOperationStateKeyPrefix {
1147 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1148 self.operation_id.consensus_encode(writer)
1149 }
1150}
1151
1152impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1153 type Record = InactiveStateKey;
1154}
1155
1156#[derive(Debug)]
1157pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1158 pub operation_id: OperationId,
1159 pub module_instance: ModuleInstanceId,
1160}
1161
1162impl Encodable for InactiveModuleOperationStateKeyPrefix {
1163 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1164 let mut len = 0;
1165 len += self.operation_id.consensus_encode(writer)?;
1166 len += self.module_instance.consensus_encode(writer)?;
1167 Ok(len)
1168 }
1169}
1170
1171impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1172 type Record = InactiveStateKey;
1173}
1174
1175#[derive(Debug, Clone)]
1176pub struct InactiveStateKeyPrefix;
1177
1178impl Encodable for InactiveStateKeyPrefix {
1179 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> {
1180 Ok(0)
1181 }
1182}
1183
1184#[derive(Debug, Encodable, Decodable)]
1185pub(crate) struct InactiveStateKeyPrefixBytes;
1186
1187impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1188 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1189 const NOTIFY_ON_MODIFY: bool = false;
1190 type Key = Self;
1191 type Value = InactiveStateMeta;
1192}
1193
1194impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1195 type Record = InactiveStateKeyBytes;
1196}
1197
1198#[derive(Debug, Copy, Clone, Decodable, Encodable)]
1199pub struct InactiveStateMeta {
1200 pub created_at: SystemTime,
1201 pub exited_at: SystemTime,
1202}
1203
1204impl ::fedimint_core::db::DatabaseRecord for InactiveStateKey {
1205 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1206 const NOTIFY_ON_MODIFY: bool = true;
1207 type Key = Self;
1208 type Value = InactiveStateMeta;
1209}
1210
1211impl DatabaseKeyWithNotify for InactiveStateKey {}
1212
1213impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1214 type Record = InactiveStateKey;
1215}
1216
1217#[derive(Debug)]
1218enum ActiveOrInactiveState {
1219 Active {
1220 dyn_state: DynState,
1221 #[allow(dead_code)] meta: ActiveStateMeta,
1223 },
1224 Inactive {
1225 dyn_state: DynState,
1226 },
1227}
1228
1229impl ActiveOrInactiveState {
1230 fn is_active(&self) -> bool {
1231 match self {
1232 ActiveOrInactiveState::Active { .. } => true,
1233 ActiveOrInactiveState::Inactive { .. } => false,
1234 }
1235 }
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240 use std::fmt::Debug;
1241 use std::sync::Arc;
1242 use std::time::Duration;
1243
1244 use fedimint_core::core::{
1245 Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId,
1246 };
1247 use fedimint_core::db::mem_impl::MemDatabase;
1248 use fedimint_core::db::Database;
1249 use fedimint_core::encoding::{Decodable, Encodable};
1250 use fedimint_core::module::registry::ModuleDecoderRegistry;
1251 use fedimint_core::runtime;
1252 use fedimint_core::task::TaskGroup;
1253 use tokio::sync::broadcast::Sender;
1254 use tracing::{info, trace};
1255
1256 use crate::sm::state::{Context, DynContext, DynState};
1257 use crate::sm::{Executor, Notifier, State, StateTransition};
1258 use crate::DynGlobalClientContext;
1259
1260 #[derive(Debug, Clone, Eq, PartialEq, Decodable, Encodable, Hash)]
1261 enum MockStateMachine {
1262 Start,
1263 ReceivedNonNull(u64),
1264 Final,
1265 }
1266
1267 impl State for MockStateMachine {
1268 type ModuleContext = MockContext;
1269
1270 fn transitions(
1271 &self,
1272 context: &Self::ModuleContext,
1273 _global_context: &DynGlobalClientContext,
1274 ) -> Vec<StateTransition<Self>> {
1275 match self {
1276 MockStateMachine::Start => {
1277 let mut receiver1 = context.broadcast.subscribe();
1278 let mut receiver2 = context.broadcast.subscribe();
1279 vec![
1280 StateTransition::new(
1281 async move {
1282 loop {
1283 let val = receiver1.recv().await.unwrap();
1284 if val == 0 {
1285 trace!("State transition Start->Final");
1286 break;
1287 }
1288 }
1289 },
1290 |_dbtx, (), _state| Box::pin(async { MockStateMachine::Final }),
1291 ),
1292 StateTransition::new(
1293 async move {
1294 loop {
1295 let val = receiver2.recv().await.unwrap();
1296 if val != 0 {
1297 trace!("State transition Start->ReceivedNonNull");
1298 break val;
1299 }
1300 }
1301 },
1302 |_dbtx, value, _state| {
1303 Box::pin(async move { MockStateMachine::ReceivedNonNull(value) })
1304 },
1305 ),
1306 ]
1307 }
1308 MockStateMachine::ReceivedNonNull(prev_val) => {
1309 let prev_val = *prev_val;
1310 let mut receiver = context.broadcast.subscribe();
1311 vec![StateTransition::new(
1312 async move {
1313 loop {
1314 let val = receiver.recv().await.unwrap();
1315 if val == prev_val {
1316 trace!("State transition ReceivedNonNull->Final");
1317 break;
1318 }
1319 }
1320 },
1321 |_dbtx, (), _state| Box::pin(async { MockStateMachine::Final }),
1322 )]
1323 }
1324 MockStateMachine::Final => {
1325 vec![]
1326 }
1327 }
1328 }
1329
1330 fn operation_id(&self) -> OperationId {
1331 OperationId([0u8; 32])
1332 }
1333 }
1334
1335 impl IntoDynInstance for MockStateMachine {
1336 type DynType = DynState;
1337
1338 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1339 DynState::from_typed(instance_id, self)
1340 }
1341 }
1342
1343 #[derive(Debug, Clone)]
1344 struct MockContext {
1345 broadcast: tokio::sync::broadcast::Sender<u64>,
1346 }
1347
1348 impl IntoDynInstance for MockContext {
1349 type DynType = DynContext;
1350
1351 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1352 DynContext::from_typed(instance_id, self)
1353 }
1354 }
1355
1356 impl Context for MockContext {
1357 const KIND: Option<ModuleKind> = None;
1358 }
1359
1360 fn get_executor() -> (Executor, Sender<u64>, Database) {
1361 let (broadcast, _) = tokio::sync::broadcast::channel(10);
1362
1363 let mut decoder_builder = Decoder::builder();
1364 decoder_builder.with_decodable_type::<MockStateMachine>();
1365 let decoder = decoder_builder.build();
1366
1367 let decoders =
1368 ModuleDecoderRegistry::new(vec![(42, ModuleKind::from_static_str("test"), decoder)]);
1369 let db = Database::new(MemDatabase::new(), decoders);
1370
1371 let mut executor_builder = Executor::builder();
1372 executor_builder.with_module(
1373 42,
1374 MockContext {
1375 broadcast: broadcast.clone(),
1376 },
1377 );
1378 let executor =
1379 executor_builder.build(db.clone(), Notifier::new(db.clone()), TaskGroup::new());
1380 executor.start_executor(Arc::new(|_, _| DynGlobalClientContext::new_fake()));
1381
1382 info!("Initialized test executor");
1383 (executor, broadcast, db)
1384 }
1385
1386 #[tokio::test]
1387 #[tracing_test::traced_test]
1388 async fn test_executor() {
1389 const MOCK_INSTANCE_1: ModuleInstanceId = 42;
1390 const MOCK_INSTANCE_2: ModuleInstanceId = 21;
1391
1392 let (executor, sender, _db) = get_executor();
1393 executor
1394 .add_state_machines(vec![DynState::from_typed(
1395 MOCK_INSTANCE_1,
1396 MockStateMachine::Start,
1397 )])
1398 .await
1399 .unwrap();
1400
1401 assert!(
1402 executor
1403 .add_state_machines(vec![DynState::from_typed(
1404 MOCK_INSTANCE_1,
1405 MockStateMachine::Start
1406 )])
1407 .await
1408 .is_err(),
1409 "Running the same state machine a second time should fail"
1410 );
1411
1412 assert!(
1413 executor
1414 .contains_active_state(MOCK_INSTANCE_1, MockStateMachine::Start)
1415 .await,
1416 "State was written to DB and waits for broadcast"
1417 );
1418 assert!(
1419 !executor
1420 .contains_active_state(MOCK_INSTANCE_2, MockStateMachine::Start)
1421 .await,
1422 "Instance separation works"
1423 );
1424
1425 runtime::sleep(Duration::from_secs(1)).await;
1427 sender.send(0).unwrap();
1428 runtime::sleep(Duration::from_secs(2)).await;
1429
1430 assert!(
1431 executor
1432 .contains_inactive_state(MOCK_INSTANCE_1, MockStateMachine::Final)
1433 .await,
1434 "State was written to DB and waits for broadcast"
1435 );
1436 }
1437}