1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Read, Write};
5use std::mem;
6use std::sync::Arc;
7use std::time::SystemTime;
8
9use anyhow::anyhow;
10use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
11use fedimint_core::db::{
12 AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
13 IDatabaseTransactionOpsCoreTyped,
14};
15use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
16use fedimint_core::fmt_utils::AbbreviateJson;
17use fedimint_core::maybe_add_send_sync;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::task::TaskGroup;
20use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
21use fedimint_logging::LOG_CLIENT_REACTOR;
22use futures::future::{self, select_all};
23use futures::stream::{FuturesUnordered, StreamExt};
24use tokio::select;
25use tokio::sync::{mpsc, oneshot};
26use tracing::{debug, error, info, trace, warn, Instrument};
27
28use super::state::StateTransitionFunction;
29use crate::sm::notifier::Notifier;
30use crate::sm::state::{DynContext, DynState};
31use crate::sm::{ClientSMDatabaseTransaction, State, StateTransition};
32use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
33
34const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
36
37pub type ContextGen =
38 Arc<maybe_add_send_sync!(dyn Fn(ModuleInstanceId, OperationId) -> DynGlobalClientContext)>;
39
40enum ExecutorDbPrefixes {
42 ActiveStates = 0xa1,
44 InactiveStates = 0xa2,
46}
47
48#[derive(Clone, Debug)]
56pub struct Executor {
57 inner: Arc<ExecutorInner>,
58}
59
60struct ExecutorInner {
61 db: Database,
62 state: std::sync::RwLock<ExecutorState>,
63 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
64 valid_module_ids: BTreeSet<ModuleInstanceId>,
65 notifier: Notifier,
66 sm_update_tx: mpsc::UnboundedSender<DynState>,
69 client_task_group: TaskGroup,
70}
71
72enum ExecutorState {
73 Unstarted {
74 sm_update_rx: mpsc::UnboundedReceiver<DynState>,
75 },
76 Running {
77 context_gen: ContextGen,
78 shutdown_sender: oneshot::Sender<()>,
79 },
80 Stopped,
81}
82
83impl ExecutorState {
84 fn start(
88 &mut self,
89 context: ContextGen,
90 ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
91 let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
92
93 let previous_state = mem::replace(
94 self,
95 ExecutorState::Running {
96 context_gen: context,
97 shutdown_sender,
98 },
99 );
100
101 if let ExecutorState::Unstarted { sm_update_rx } = previous_state {
102 Some((shutdown_receiver, sm_update_rx))
103 } else {
104 *self = previous_state;
106
107 debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
108 None
109 }
110 }
111
112 fn stop(&mut self) -> Option<()> {
115 let previous_state = mem::replace(self, ExecutorState::Stopped);
116
117 if let ExecutorState::Running {
118 shutdown_sender, ..
119 } = previous_state
120 {
121 if shutdown_sender.send(()).is_err() {
122 warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
123 }
124 Some(())
125 } else {
126 *self = previous_state;
128
129 debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
130 None
131 }
132 }
133
134 fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
135 let ExecutorState::Running { context_gen, .. } = self else {
136 return None;
137 };
138 Some(context_gen(
139 state.module_instance_id(),
140 state.operation_id(),
141 ))
142 }
143}
144
145#[derive(Debug, Default)]
148pub struct ExecutorBuilder {
149 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
150 valid_module_ids: BTreeSet<ModuleInstanceId>,
151}
152
153impl Executor {
154 pub fn builder() -> ExecutorBuilder {
156 ExecutorBuilder::default()
157 }
158
159 pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
160 self.inner.get_active_states().await
161 }
162
163 pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
169 self.inner
170 .db
171 .autocommit(
172 |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
173 MAX_DB_ATTEMPTS,
174 )
175 .await
176 .map_err(|e| match e {
177 AutocommitError::CommitFailed {
178 last_error,
179 attempts,
180 } => last_error.context(format!("Failed to commit after {attempts} attempts")),
181 AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
182 })?;
183
184 Ok(())
187 }
188
189 pub async fn add_state_machines_dbtx(
198 &self,
199 dbtx: &mut DatabaseTransaction<'_>,
200 states: Vec<DynState>,
201 ) -> AddStateMachinesResult {
202 for state in states {
203 if !self
204 .inner
205 .valid_module_ids
206 .contains(&state.module_instance_id())
207 {
208 return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
209 }
210
211 let is_active_state = dbtx
212 .get_value(&ActiveStateKey::from_state(state.clone()))
213 .await
214 .is_some();
215 let is_inactive_state = dbtx
216 .get_value(&InactiveStateKey::from_state(state.clone()))
217 .await
218 .is_some();
219
220 if is_active_state || is_inactive_state {
221 return Err(AddStateMachinesError::StateAlreadyExists);
222 }
223
224 if let Some(module_context) =
229 self.inner.module_contexts.get(&state.module_instance_id())
230 {
231 if let Some(context) = self
232 .inner
233 .state
234 .read()
235 .expect("locking failed")
236 .gen_context(&state)
237 {
238 if state.is_terminal(module_context, &context) {
239 return Err(AddStateMachinesError::Other(anyhow!(
240 "State is already terminal, adding it to the executor doesn't make sense."
241 )));
242 }
243 } else {
244 warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
245 }
246 }
247
248 dbtx.insert_new_entry(
249 &ActiveStateKey::from_state(state.clone()),
250 &ActiveStateMeta::default(),
251 )
252 .await;
253
254 let notify_sender = self.inner.notifier.sender();
255 let sm_updates_tx = self.inner.sm_update_tx.clone();
256 dbtx.on_commit(move || {
257 notify_sender.notify(state.clone());
258 let _ = sm_updates_tx.send(state);
259 });
260 }
261
262 Ok(())
263 }
264
265 pub async fn contains_active_state<S: State>(
270 &self,
271 instance: ModuleInstanceId,
272 state: S,
273 ) -> bool {
274 let state = DynState::from_typed(instance, state);
275 self.inner
276 .get_active_states()
277 .await
278 .into_iter()
279 .any(|(s, _)| s == state)
280 }
281
282 pub async fn contains_inactive_state<S: State>(
290 &self,
291 instance: ModuleInstanceId,
292 state: S,
293 ) -> bool {
294 let state = DynState::from_typed(instance, state);
295 self.inner
296 .get_inactive_states()
297 .await
298 .into_iter()
299 .any(|(s, _)| s == state)
300 }
301
302 pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
303 self.inner
304 .db
305 .wait_key_exists(&InactiveStateKey::from_state(state))
306 .await
307 }
308
309 pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
310 self.inner
311 .db
312 .wait_key_exists(&ActiveStateKey::from_state(state))
313 .await
314 }
315
316 pub async fn get_operation_states(
318 &self,
319 operation_id: OperationId,
320 ) -> (
321 Vec<(DynState, ActiveStateMeta)>,
322 Vec<(DynState, InactiveStateMeta)>,
323 ) {
324 let mut dbtx = self.inner.db.begin_transaction_nc().await;
325 let active_states: Vec<_> = dbtx
326 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
327 .await
328 .map(|(active_key, active_meta)| (active_key.state, active_meta))
329 .collect()
330 .await;
331 let inactive_states: Vec<_> = dbtx
332 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
333 .await
334 .map(|(active_key, inactive_meta)| (active_key.state, inactive_meta))
335 .collect()
336 .await;
337
338 (active_states, inactive_states)
339 }
340
341 pub fn start_executor(&self, context_gen: ContextGen) {
348 let Some((shutdown_receiver, sm_update_rx)) = self
349 .inner
350 .state
351 .write()
352 .expect("locking can't fail")
353 .start(context_gen.clone())
354 else {
355 panic!("start_executor was called previously");
356 };
357
358 let task_runner_inner = self.inner.clone();
359 let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
360 let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
361 let task_group_shutdown_rx = task_handle.make_shutdown_rx();
362 select! {
363 () = task_group_shutdown_rx => {
364 debug!(
365 target: LOG_CLIENT_REACTOR,
366 "Shutting down state machine executor runner due to task group shutdown signal"
367 );
368 },
369 shutdown_happened_sender = shutdown_receiver => {
370 match shutdown_happened_sender {
371 Ok(()) => {
372 debug!(
373 target: LOG_CLIENT_REACTOR,
374 "Shutting down state machine executor runner due to explicit shutdown signal"
375 );
376 },
377 Err(_) => {
378 warn!(
379 target: LOG_CLIENT_REACTOR,
380 "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
381 );
382 }
383 }
384 },
385 () = executor_runner => {
386 error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
387 },
388 };
389 });
390 }
391
392 pub fn stop_executor(&self) -> Option<()> {
406 self.inner.stop_executor()
407 }
408
409 pub fn notifier(&self) -> &Notifier {
412 &self.inner.notifier
413 }
414}
415
416impl Drop for ExecutorInner {
417 fn drop(&mut self) {
418 self.stop_executor();
419 }
420}
421
422struct TransitionForActiveState {
423 outcome: serde_json::Value,
424 state: DynState,
425 meta: ActiveStateMeta,
426 transition_fn: StateTransitionFunction<DynState>,
427}
428
429impl ExecutorInner {
430 async fn run(
431 &self,
432 global_context_gen: ContextGen,
433 sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
434 ) {
435 debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
436 if let Err(err) = self
437 .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
438 .await
439 {
440 warn!(
441 target: LOG_CLIENT_REACTOR,
442 err = %err.fmt_compact_anyhow(),
443 "An unexpected error occurred during a state transition"
444 );
445 }
446 }
447
448 async fn get_transition_for(
449 &self,
450 state: &DynState,
451 meta: ActiveStateMeta,
452 global_context_gen: &ContextGen,
453 ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
454 let module_instance = state.module_instance_id();
455 let context = &self
456 .module_contexts
457 .get(&module_instance)
458 .expect("Unknown module");
459 let transitions = state
460 .transitions(
461 context,
462 &global_context_gen(module_instance, state.operation_id()),
463 )
464 .into_iter()
465 .map(|transition| {
466 let state = state.clone();
467 let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
468 let StateTransition {
469 trigger,
470 transition,
471 } = transition;
472 TransitionForActiveState {
473 outcome: trigger.await,
474 state,
475 transition_fn: transition,
476 meta,
477 }
478 });
479 f
480 })
481 .collect::<Vec<_>>();
482 if transitions.is_empty() {
483 warn!(
487 target: LOG_CLIENT_REACTOR,
488 module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
489 );
490 self.db
491 .autocommit::<_, _, anyhow::Error>(
492 |dbtx, _| {
493 Box::pin(async {
494 let k = InactiveStateKey::from_state(state.clone());
495 let v = ActiveStateMeta::default().into_inactive();
496 dbtx.remove_entry(&ActiveStateKey::from_state(state.clone()))
497 .await;
498 dbtx.insert_entry(&k, &v).await;
499 Ok(())
500 })
501 },
502 None,
503 )
504 .await
505 .expect("Autocommit here can't fail");
506 }
507
508 transitions
509 }
510
511 async fn run_state_machines_executor_inner(
512 &self,
513 global_context_gen: ContextGen,
514 mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
515 ) -> anyhow::Result<()> {
516 enum ExecutorLoopEvent {
519 New { state: DynState },
522 Triggered(TransitionForActiveState),
525 Invalid { state: DynState },
527 Completed {
529 state: DynState,
530 outcome: ActiveOrInactiveState,
531 },
532 Disconnected,
534 }
535
536 let active_states = self.get_active_states().await;
537 trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
538 for (state, _meta) in active_states {
539 self.sm_update_tx
540 .send(state)
541 .expect("Must be able to send state machine to own opened channel");
542 }
543
544 let mut currently_running_sms = HashSet::<DynState>::new();
547 let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
555 FuturesUnordered::new();
556
557 loop {
558 let event = tokio::select! {
559 new = sm_update_rx.recv() => {
560 if let Some(new) = new {
561 ExecutorLoopEvent::New {
562 state: new,
563 }
564 } else {
565 ExecutorLoopEvent::Disconnected
566 }
567 },
568
569 event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
570 };
571
572 match event {
575 ExecutorLoopEvent::New { state } => {
576 if currently_running_sms.contains(&state) {
577 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
578 continue;
579 }
580 currently_running_sms.insert(state.clone());
581 let futures_len = futures.len();
582 let global_context_gen = &global_context_gen;
583 trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
584 futures.push(Box::pin(async move {
585 let Some(meta) = self.get_active_state(&state).await else {
586 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
587 return ExecutorLoopEvent::Invalid { state: state.clone() };
588 };
589
590 let transitions = self
591 .get_transition_for(&state, meta, global_context_gen)
592 .await;
593 if transitions.is_empty() {
594 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
595 return ExecutorLoopEvent::Invalid { state: state.clone() };
596 }
597 let transitions_num = transitions.len();
598
599 debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
600
601 let (first_completed_result, _index, _unused_transitions) =
602 select_all(transitions).await;
603 ExecutorLoopEvent::Triggered(first_completed_result)
604 }));
605 }
606 ExecutorLoopEvent::Triggered(TransitionForActiveState {
607 outcome,
608 state,
609 meta,
610 transition_fn,
611 }) => {
612 debug!(
613 target: LOG_CLIENT_REACTOR,
614 operation_id = %state.operation_id().fmt_short(),
615 "Triggered state transition",
616 );
617 let span = tracing::debug_span!(
618 target: LOG_CLIENT_REACTOR,
619 "sm_transition",
620 operation_id = %state.operation_id().fmt_short()
621 );
622 futures.push({
628 let sm_update_tx = self.sm_update_tx.clone();
629 let db = self.db.clone();
630 let notifier = self.notifier.clone();
631 let module_contexts = self.module_contexts.clone();
632 let global_context_gen = global_context_gen.clone();
633 Box::pin(
634 async move {
635 debug!(
636 target: LOG_CLIENT_REACTOR,
637 "Executing state transition",
638 );
639 trace!(
640 target: LOG_CLIENT_REACTOR,
641 ?state,
642 outcome = ?AbbreviateJson(&outcome),
643 "Executing state transition (details)",
644 );
645
646 let module_contexts = &module_contexts;
647 let global_context_gen = &global_context_gen;
648
649 let outcome = db
650 .autocommit::<'_, '_, _, _, Infallible>(
651 |dbtx, _| {
652 let state = state.clone();
653 let transition_fn = transition_fn.clone();
654 let transition_outcome = outcome.clone();
655 Box::pin(async move {
656 let new_state = transition_fn(
657 &mut ClientSMDatabaseTransaction::new(
658 &mut dbtx.to_ref(),
659 state.module_instance_id(),
660 ),
661 transition_outcome.clone(),
662 state.clone(),
663 )
664 .await;
665 dbtx.remove_entry(&ActiveStateKey::from_state(
666 state.clone(),
667 ))
668 .await;
669 dbtx.insert_entry(
670 &InactiveStateKey::from_state(state.clone()),
671 &meta.into_inactive(),
672 )
673 .await;
674
675 let context = &module_contexts
676 .get(&state.module_instance_id())
677 .expect("Unknown module");
678
679 let operation_id = state.operation_id();
680 let global_context = global_context_gen(
681 state.module_instance_id(),
682 operation_id,
683 );
684
685 let is_terminal = new_state.is_terminal(context, &global_context);
686
687 if is_terminal {
688 let k = InactiveStateKey::from_state(
689 new_state.clone(),
690 );
691 let v = ActiveStateMeta::default().into_inactive();
692 dbtx.insert_entry(&k, &v).await;
693 Ok(ActiveOrInactiveState::Inactive {
694 dyn_state: new_state,
695 })
696 } else {
697 let k = ActiveStateKey::from_state(
698 new_state.clone(),
699 );
700 let v = ActiveStateMeta::default();
701 dbtx.insert_entry(&k, &v).await;
702 Ok(ActiveOrInactiveState::Active {
703 dyn_state: new_state,
704 meta: v,
705 })
706 }
707 })
708 },
709 None,
710 )
711 .await
712 .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
713
714 debug!(
715 target: LOG_CLIENT_REACTOR,
716 terminal = !outcome.is_active(),
717 ?outcome,
718 "State transition complete",
719 );
720
721 match &outcome {
722 ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
723 sm_update_tx
724 .send(dyn_state.clone())
725 .expect("can't fail: we are the receiving end");
726 notifier.notify(dyn_state.clone());
727 }
728 ActiveOrInactiveState::Inactive { dyn_state } => {
729 notifier.notify(dyn_state.clone());
730 }
731 }
732 ExecutorLoopEvent::Completed { state, outcome }
733 }
734 .instrument(span),
735 )
736 });
737 }
738 ExecutorLoopEvent::Invalid { state } => {
739 trace!(
740 target: LOG_CLIENT_REACTOR,
741 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
742 "State invalid"
743 );
744 assert!(
745 currently_running_sms.remove(&state),
746 "State must have been recorded"
747 );
748 }
749
750 ExecutorLoopEvent::Completed { state, outcome } => {
751 assert!(
752 currently_running_sms.remove(&state),
753 "State must have been recorded"
754 );
755 debug!(
756 target: LOG_CLIENT_REACTOR,
757 operation_id = %state.operation_id().fmt_short(),
758 outcome_active = outcome.is_active(),
759 total = futures.len(),
760 "State transition complete"
761 );
762 trace!(
763 target: LOG_CLIENT_REACTOR,
764 ?outcome,
765 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
766 "State transition complete"
767 );
768 }
769 ExecutorLoopEvent::Disconnected => {
770 break;
771 }
772 }
773 }
774
775 info!(target: LOG_CLIENT_REACTOR, "Terminated.");
776 Ok(())
777 }
778
779 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
780 self.db
781 .begin_transaction_nc()
782 .await
783 .find_by_prefix(&ActiveStateKeyPrefix)
784 .await
785 .filter(|(state, _)| {
787 future::ready(
788 self.module_contexts
789 .contains_key(&state.state.module_instance_id()),
790 )
791 })
792 .map(|(state, meta)| (state.state, meta))
793 .collect::<Vec<_>>()
794 .await
795 }
796
797 async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
798 if !self
800 .module_contexts
801 .contains_key(&state.module_instance_id())
802 {
803 return None;
804 }
805 self.db
806 .begin_transaction_nc()
807 .await
808 .get_value(&ActiveStateKey::from_state(state.clone()))
809 .await
810 }
811
812 async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
813 self.db
814 .begin_transaction_nc()
815 .await
816 .find_by_prefix(&InactiveStateKeyPrefix)
817 .await
818 .filter(|(state, _)| {
820 future::ready(
821 self.module_contexts
822 .contains_key(&state.state.module_instance_id()),
823 )
824 })
825 .map(|(state, meta)| (state.state, meta))
826 .collect::<Vec<_>>()
827 .await
828 }
829}
830
831impl ExecutorInner {
832 fn stop_executor(&self) -> Option<()> {
834 let mut state = self.state.write().expect("Locking can't fail");
835
836 state.stop()
837 }
838}
839
840impl Debug for ExecutorInner {
841 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
842 writeln!(f, "ExecutorInner {{}}")
843 }
844}
845
846impl ExecutorBuilder {
847 pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
850 where
851 C: IntoDynInstance<DynType = DynContext>,
852 {
853 self.with_module_dyn(context.into_dyn(instance_id));
854 }
855
856 pub fn with_module_dyn(&mut self, context: DynContext) {
859 self.valid_module_ids.insert(context.module_instance_id());
860
861 if self
862 .module_contexts
863 .insert(context.module_instance_id(), context)
864 .is_some()
865 {
866 panic!("Tried to add two modules with the same instance id!");
867 }
868 }
869
870 pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
874 self.valid_module_ids.insert(module_id);
875 }
876
877 pub fn build(self, db: Database, notifier: Notifier, client_task_group: TaskGroup) -> Executor {
881 let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
882
883 let inner = Arc::new(ExecutorInner {
884 db,
885 state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
886 module_contexts: self.module_contexts,
887 valid_module_ids: self.valid_module_ids,
888 notifier,
889 sm_update_tx,
890 client_task_group,
891 });
892
893 debug!(
894 target: LOG_CLIENT_REACTOR,
895 instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
896 "Initialized state machine executor with module instances"
897 );
898 Executor { inner }
899 }
900}
901
902#[derive(Debug)]
904pub struct ActiveStateKey {
905 pub operation_id: OperationId,
907 pub state: DynState,
909}
910
911impl ActiveStateKey {
912 pub fn from_state(state: DynState) -> ActiveStateKey {
913 ActiveStateKey {
914 operation_id: state.operation_id(),
915 state,
916 }
917 }
918}
919
920impl Encodable for ActiveStateKey {
921 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
922 let mut len = 0;
923 len += self.operation_id.consensus_encode(writer)?;
924 len += self.state.consensus_encode(writer)?;
925 Ok(len)
926 }
927}
928
929impl Decodable for ActiveStateKey {
930 fn consensus_decode_partial<R: Read>(
931 reader: &mut R,
932 modules: &ModuleDecoderRegistry,
933 ) -> Result<Self, DecodeError> {
934 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
935 let state = DynState::consensus_decode_partial(reader, modules)?;
936
937 Ok(ActiveStateKey {
938 operation_id,
939 state,
940 })
941 }
942}
943
944#[derive(Debug)]
945pub struct ActiveStateKeyBytes {
946 pub operation_id: OperationId,
947 pub module_instance_id: ModuleInstanceId,
948 pub state: Vec<u8>,
949}
950
951impl Encodable for ActiveStateKeyBytes {
952 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
953 let mut len = 0;
954 len += self.operation_id.consensus_encode(writer)?;
955 len += writer.write(self.state.as_slice())?;
956 Ok(len)
957 }
958}
959
960impl Decodable for ActiveStateKeyBytes {
961 fn consensus_decode_partial<R: std::io::Read>(
962 reader: &mut R,
963 modules: &ModuleDecoderRegistry,
964 ) -> Result<Self, DecodeError> {
965 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
966 let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
967 let mut bytes = Vec::new();
968 reader
969 .read_to_end(&mut bytes)
970 .map_err(DecodeError::from_err)?;
971
972 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
973 instance_bytes.append(&mut bytes);
974
975 Ok(ActiveStateKeyBytes {
976 operation_id,
977 module_instance_id,
978 state: instance_bytes,
979 })
980 }
981}
982
983#[derive(Debug)]
984pub(crate) struct ActiveOperationStateKeyPrefix {
985 pub operation_id: OperationId,
986}
987
988impl Encodable for ActiveOperationStateKeyPrefix {
989 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
990 self.operation_id.consensus_encode(writer)
991 }
992}
993
994impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
995 type Record = ActiveStateKey;
996}
997
998#[derive(Debug)]
999pub(crate) struct ActiveModuleOperationStateKeyPrefix {
1000 pub operation_id: OperationId,
1001 pub module_instance: ModuleInstanceId,
1002}
1003
1004impl Encodable for ActiveModuleOperationStateKeyPrefix {
1005 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1006 let mut len = 0;
1007 len += self.operation_id.consensus_encode(writer)?;
1008 len += self.module_instance.consensus_encode(writer)?;
1009 Ok(len)
1010 }
1011}
1012
1013impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
1014 type Record = ActiveStateKey;
1015}
1016
1017#[derive(Debug)]
1018pub struct ActiveStateKeyPrefix;
1019
1020impl Encodable for ActiveStateKeyPrefix {
1021 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> {
1022 Ok(0)
1023 }
1024}
1025
1026#[derive(Debug, Copy, Clone, Encodable, Decodable)]
1027pub struct ActiveStateMeta {
1028 pub created_at: SystemTime,
1029}
1030
1031impl ::fedimint_core::db::DatabaseRecord for ActiveStateKey {
1032 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1033 const NOTIFY_ON_MODIFY: bool = true;
1034 type Key = Self;
1035 type Value = ActiveStateMeta;
1036}
1037
1038impl DatabaseKeyWithNotify for ActiveStateKey {}
1039
1040impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1041 type Record = ActiveStateKey;
1042}
1043
1044#[derive(Debug, Encodable, Decodable)]
1045pub(crate) struct ActiveStateKeyPrefixBytes;
1046
1047impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1048 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1049 const NOTIFY_ON_MODIFY: bool = false;
1050 type Key = Self;
1051 type Value = ActiveStateMeta;
1052}
1053
1054impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1055 type Record = ActiveStateKeyBytes;
1056}
1057
1058impl Default for ActiveStateMeta {
1059 fn default() -> Self {
1060 Self {
1061 created_at: fedimint_core::time::now(),
1062 }
1063 }
1064}
1065
1066impl ActiveStateMeta {
1067 fn into_inactive(self) -> InactiveStateMeta {
1068 InactiveStateMeta {
1069 created_at: self.created_at,
1070 exited_at: fedimint_core::time::now(),
1071 }
1072 }
1073}
1074
1075#[derive(Debug, Clone)]
1077pub struct InactiveStateKey {
1078 pub operation_id: OperationId,
1080 pub state: DynState,
1081}
1082
1083impl InactiveStateKey {
1084 pub fn from_state(state: DynState) -> InactiveStateKey {
1085 InactiveStateKey {
1086 operation_id: state.operation_id(),
1087 state,
1088 }
1089 }
1090}
1091
1092impl Encodable for InactiveStateKey {
1093 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1094 let mut len = 0;
1095 len += self.operation_id.consensus_encode(writer)?;
1096 len += self.state.consensus_encode(writer)?;
1097 Ok(len)
1098 }
1099}
1100
1101impl Decodable for InactiveStateKey {
1102 fn consensus_decode_partial<R: Read>(
1103 reader: &mut R,
1104 modules: &ModuleDecoderRegistry,
1105 ) -> Result<Self, DecodeError> {
1106 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1107 let state = DynState::consensus_decode_partial(reader, modules)?;
1108
1109 Ok(InactiveStateKey {
1110 operation_id,
1111 state,
1112 })
1113 }
1114}
1115
1116#[derive(Debug)]
1117pub struct InactiveStateKeyBytes {
1118 pub operation_id: OperationId,
1119 pub module_instance_id: ModuleInstanceId,
1120 pub state: Vec<u8>,
1121}
1122
1123impl Encodable for InactiveStateKeyBytes {
1124 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
1125 let mut len = 0;
1126 len += self.operation_id.consensus_encode(writer)?;
1127 len += writer.write(self.state.as_slice())?;
1128 Ok(len)
1129 }
1130}
1131
1132impl Decodable for InactiveStateKeyBytes {
1133 fn consensus_decode_partial<R: std::io::Read>(
1134 reader: &mut R,
1135 modules: &ModuleDecoderRegistry,
1136 ) -> Result<Self, DecodeError> {
1137 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1138 let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1139 let mut bytes = Vec::new();
1140 reader
1141 .read_to_end(&mut bytes)
1142 .map_err(DecodeError::from_err)?;
1143
1144 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1145 instance_bytes.append(&mut bytes);
1146
1147 Ok(InactiveStateKeyBytes {
1148 operation_id,
1149 module_instance_id,
1150 state: instance_bytes,
1151 })
1152 }
1153}
1154
1155#[derive(Debug)]
1156pub(crate) struct InactiveOperationStateKeyPrefix {
1157 pub operation_id: OperationId,
1158}
1159
1160impl Encodable for InactiveOperationStateKeyPrefix {
1161 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1162 self.operation_id.consensus_encode(writer)
1163 }
1164}
1165
1166impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1167 type Record = InactiveStateKey;
1168}
1169
1170#[derive(Debug)]
1171pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1172 pub operation_id: OperationId,
1173 pub module_instance: ModuleInstanceId,
1174}
1175
1176impl Encodable for InactiveModuleOperationStateKeyPrefix {
1177 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1178 let mut len = 0;
1179 len += self.operation_id.consensus_encode(writer)?;
1180 len += self.module_instance.consensus_encode(writer)?;
1181 Ok(len)
1182 }
1183}
1184
1185impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1186 type Record = InactiveStateKey;
1187}
1188
1189#[derive(Debug, Clone)]
1190pub struct InactiveStateKeyPrefix;
1191
1192impl Encodable for InactiveStateKeyPrefix {
1193 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> {
1194 Ok(0)
1195 }
1196}
1197
1198#[derive(Debug, Encodable, Decodable)]
1199pub(crate) struct InactiveStateKeyPrefixBytes;
1200
1201impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1202 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1203 const NOTIFY_ON_MODIFY: bool = false;
1204 type Key = Self;
1205 type Value = InactiveStateMeta;
1206}
1207
1208impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1209 type Record = InactiveStateKeyBytes;
1210}
1211
1212#[derive(Debug, Copy, Clone, Decodable, Encodable)]
1213pub struct InactiveStateMeta {
1214 pub created_at: SystemTime,
1215 pub exited_at: SystemTime,
1216}
1217
1218impl ::fedimint_core::db::DatabaseRecord for InactiveStateKey {
1219 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1220 const NOTIFY_ON_MODIFY: bool = true;
1221 type Key = Self;
1222 type Value = InactiveStateMeta;
1223}
1224
1225impl DatabaseKeyWithNotify for InactiveStateKey {}
1226
1227impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1228 type Record = InactiveStateKey;
1229}
1230
1231#[derive(Debug)]
1232enum ActiveOrInactiveState {
1233 Active {
1234 dyn_state: DynState,
1235 #[allow(dead_code)] meta: ActiveStateMeta,
1237 },
1238 Inactive {
1239 dyn_state: DynState,
1240 },
1241}
1242
1243impl ActiveOrInactiveState {
1244 fn is_active(&self) -> bool {
1245 match self {
1246 ActiveOrInactiveState::Active { .. } => true,
1247 ActiveOrInactiveState::Inactive { .. } => false,
1248 }
1249 }
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254 use std::fmt::Debug;
1255 use std::sync::Arc;
1256 use std::time::Duration;
1257
1258 use fedimint_core::core::{
1259 Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId,
1260 };
1261 use fedimint_core::db::mem_impl::MemDatabase;
1262 use fedimint_core::db::Database;
1263 use fedimint_core::encoding::{Decodable, Encodable};
1264 use fedimint_core::module::registry::ModuleDecoderRegistry;
1265 use fedimint_core::runtime;
1266 use fedimint_core::task::TaskGroup;
1267 use fedimint_logging::LOG_CLIENT_REACTOR;
1268 use tokio::sync::broadcast::Sender;
1269 use tracing::{info, trace};
1270
1271 use crate::sm::state::{Context, DynContext, DynState};
1272 use crate::sm::{Executor, Notifier, State, StateTransition};
1273 use crate::DynGlobalClientContext;
1274
1275 #[derive(Debug, Clone, Eq, PartialEq, Decodable, Encodable, Hash)]
1276 enum MockStateMachine {
1277 Start,
1278 ReceivedNonNull(u64),
1279 Final,
1280 }
1281
1282 impl State for MockStateMachine {
1283 type ModuleContext = MockContext;
1284
1285 fn transitions(
1286 &self,
1287 context: &Self::ModuleContext,
1288 _global_context: &DynGlobalClientContext,
1289 ) -> Vec<StateTransition<Self>> {
1290 match self {
1291 MockStateMachine::Start => {
1292 let mut receiver1 = context.broadcast.subscribe();
1293 let mut receiver2 = context.broadcast.subscribe();
1294 vec![
1295 StateTransition::new(
1296 async move {
1297 loop {
1298 let val = receiver1.recv().await.unwrap();
1299 if val == 0 {
1300 trace!("State transition Start->Final");
1301 break;
1302 }
1303 }
1304 },
1305 |_dbtx, (), _state| Box::pin(async { MockStateMachine::Final }),
1306 ),
1307 StateTransition::new(
1308 async move {
1309 loop {
1310 let val = receiver2.recv().await.unwrap();
1311 if val != 0 {
1312 trace!("State transition Start->ReceivedNonNull");
1313 break val;
1314 }
1315 }
1316 },
1317 |_dbtx, value, _state| {
1318 Box::pin(async move { MockStateMachine::ReceivedNonNull(value) })
1319 },
1320 ),
1321 ]
1322 }
1323 MockStateMachine::ReceivedNonNull(prev_val) => {
1324 let prev_val = *prev_val;
1325 let mut receiver = context.broadcast.subscribe();
1326 vec![StateTransition::new(
1327 async move {
1328 loop {
1329 let val = receiver.recv().await.unwrap();
1330 if val == prev_val {
1331 trace!("State transition ReceivedNonNull->Final");
1332 break;
1333 }
1334 }
1335 },
1336 |_dbtx, (), _state| Box::pin(async { MockStateMachine::Final }),
1337 )]
1338 }
1339 MockStateMachine::Final => {
1340 vec![]
1341 }
1342 }
1343 }
1344
1345 fn operation_id(&self) -> OperationId {
1346 OperationId([0u8; 32])
1347 }
1348 }
1349
1350 impl IntoDynInstance for MockStateMachine {
1351 type DynType = DynState;
1352
1353 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1354 DynState::from_typed(instance_id, self)
1355 }
1356 }
1357
1358 #[derive(Debug, Clone)]
1359 struct MockContext {
1360 broadcast: tokio::sync::broadcast::Sender<u64>,
1361 }
1362
1363 impl IntoDynInstance for MockContext {
1364 type DynType = DynContext;
1365
1366 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1367 DynContext::from_typed(instance_id, self)
1368 }
1369 }
1370
1371 impl Context for MockContext {
1372 const KIND: Option<ModuleKind> = None;
1373 }
1374
1375 fn get_executor() -> (Executor, Sender<u64>, Database) {
1376 let (broadcast, _) = tokio::sync::broadcast::channel(10);
1377
1378 let mut decoder_builder = Decoder::builder();
1379 decoder_builder.with_decodable_type::<MockStateMachine>();
1380 let decoder = decoder_builder.build();
1381
1382 let decoders =
1383 ModuleDecoderRegistry::new(vec![(42, ModuleKind::from_static_str("test"), decoder)]);
1384 let db = Database::new(MemDatabase::new(), decoders);
1385
1386 let mut executor_builder = Executor::builder();
1387 executor_builder.with_module(
1388 42,
1389 MockContext {
1390 broadcast: broadcast.clone(),
1391 },
1392 );
1393 let executor =
1394 executor_builder.build(db.clone(), Notifier::new(db.clone()), TaskGroup::new());
1395 executor.start_executor(Arc::new(|_, _| DynGlobalClientContext::new_fake()));
1396
1397 info!(
1398 target: LOG_CLIENT_REACTOR,
1399 "Initialized test executor"
1400 );
1401 (executor, broadcast, db)
1402 }
1403
1404 #[tokio::test]
1405 #[tracing_test::traced_test]
1406 async fn test_executor() {
1407 const MOCK_INSTANCE_1: ModuleInstanceId = 42;
1408 const MOCK_INSTANCE_2: ModuleInstanceId = 21;
1409
1410 let (executor, sender, _db) = get_executor();
1411 executor
1412 .add_state_machines(vec![DynState::from_typed(
1413 MOCK_INSTANCE_1,
1414 MockStateMachine::Start,
1415 )])
1416 .await
1417 .unwrap();
1418
1419 assert!(
1420 executor
1421 .add_state_machines(vec![DynState::from_typed(
1422 MOCK_INSTANCE_1,
1423 MockStateMachine::Start
1424 )])
1425 .await
1426 .is_err(),
1427 "Running the same state machine a second time should fail"
1428 );
1429
1430 assert!(
1431 executor
1432 .contains_active_state(MOCK_INSTANCE_1, MockStateMachine::Start)
1433 .await,
1434 "State was written to DB and waits for broadcast"
1435 );
1436 assert!(
1437 !executor
1438 .contains_active_state(MOCK_INSTANCE_2, MockStateMachine::Start)
1439 .await,
1440 "Instance separation works"
1441 );
1442
1443 runtime::sleep(Duration::from_secs(1)).await;
1445 sender.send(0).unwrap();
1446 runtime::sleep(Duration::from_secs(2)).await;
1447
1448 assert!(
1449 executor
1450 .contains_inactive_state(MOCK_INSTANCE_1, MockStateMachine::Final)
1451 .await,
1452 "State was written to DB and waits for broadcast"
1453 );
1454 }
1455}