fedimint_client/transaction/
sm.rs

1//! State machine for submitting transactions
2
3use std::time::Duration;
4
5use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::runtime::sleep;
8use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
9use fedimint_core::TransactionId;
10use fedimint_logging::LOG_CLIENT_NET_API;
11use tracing::warn;
12
13use crate::sm::{Context, DynContext, State, StateTransition};
14use crate::{DynGlobalClientContext, DynState, TxAcceptedEvent, TxRejectedEvent};
15
16// TODO: how to prevent collisions? Generally reserve some range for custom IDs?
17/// Reserved module instance id used for client-internal state machines
18pub const TRANSACTION_SUBMISSION_MODULE_INSTANCE: ModuleInstanceId = 0xffff;
19
20const RETRY_INTERVAL: Duration = Duration::from_secs(5);
21
22#[derive(Debug, Clone)]
23pub struct TxSubmissionContext;
24
25impl Context for TxSubmissionContext {
26    const KIND: Option<ModuleKind> = None;
27}
28
29impl IntoDynInstance for TxSubmissionContext {
30    type DynType = DynContext;
31
32    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
33        DynContext::from_typed(instance_id, self)
34    }
35}
36
37#[cfg_attr(doc, aquamarine::aquamarine)]
38/// State machine to (re-)submit a transaction until it is either accepted or
39/// rejected by the federation
40///
41/// ```mermaid
42/// flowchart LR
43///     Created -- tx is accepted by consensus --> Accepted
44///     Created -- tx is rejected on submission --> Rejected
45/// ```
46// NOTE: This struct needs to retain the same encoding as [`crate::sm::OperationState`],
47// because it was used to replace it, and clients already have it persisted.
48#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
49pub struct TxSubmissionStatesSM {
50    pub operation_id: OperationId,
51    pub state: TxSubmissionStates,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
55pub enum TxSubmissionStates {
56    /// The transaction has been created and potentially already been submitted,
57    /// but no rejection or acceptance happened so far
58    Created(Transaction),
59    /// The transaction has been accepted in consensus
60    ///
61    /// **This state is final**
62    Accepted(TransactionId),
63    /// The transaction has been rejected by a quorum on submission
64    ///
65    /// **This state is final**
66    Rejected(TransactionId, String),
67    // Ideally this would be uncommented:
68    // #[deprecated(since = "0.2.2", note = "all errors should be retried")]
69    // but due to some rust bug/limitation it seem impossible to prevent
70    // existing usages from spamming compilation output with warnings.
71    NonRetryableError(String),
72}
73
74impl State for TxSubmissionStatesSM {
75    type ModuleContext = TxSubmissionContext;
76
77    fn transitions(
78        &self,
79        _context: &Self::ModuleContext,
80        global_context: &DynGlobalClientContext,
81    ) -> Vec<StateTransition<Self>> {
82        let operation_id = self.operation_id;
83        match self.state.clone() {
84            TxSubmissionStates::Created(transaction) => {
85                let txid = transaction.tx_hash();
86                vec![
87                    StateTransition::new(
88                        TxSubmissionStates::trigger_created_rejected(
89                            transaction.clone(),
90                            global_context.clone(),
91                        ),
92                        {
93                            let global_context = global_context.clone();
94                            move |sm_dbtx, error, _| {
95                                let global_context = global_context.clone();
96                                Box::pin(async move {
97                                    global_context
98                                        .log_event(
99                                            sm_dbtx,
100                                            TxRejectedEvent {
101                                                txid,
102                                                operation_id,
103                                                error: error.clone(),
104                                            },
105                                        )
106                                        .await;
107                                    TxSubmissionStatesSM {
108                                        state: TxSubmissionStates::Rejected(txid, error),
109                                        operation_id,
110                                    }
111                                })
112                            }
113                        },
114                    ),
115                    StateTransition::new(
116                        TxSubmissionStates::trigger_created_accepted(txid, global_context.clone()),
117                        {
118                            let global_context = global_context.clone();
119                            move |sm_dbtx, (), _| {
120                                let global_context = global_context.clone();
121                                Box::pin(async move {
122                                    global_context
123                                        .log_event(sm_dbtx, TxAcceptedEvent { txid, operation_id })
124                                        .await;
125                                    TxSubmissionStatesSM {
126                                        state: TxSubmissionStates::Accepted(txid),
127                                        operation_id,
128                                    }
129                                })
130                            }
131                        },
132                    ),
133                ]
134            }
135            TxSubmissionStates::Accepted(..)
136            | TxSubmissionStates::Rejected(..)
137            | TxSubmissionStates::NonRetryableError(..) => {
138                vec![]
139            }
140        }
141    }
142
143    fn operation_id(&self) -> OperationId {
144        self.operation_id
145    }
146}
147
148impl TxSubmissionStates {
149    async fn trigger_created_rejected(tx: Transaction, context: DynGlobalClientContext) -> String {
150        loop {
151            match context.api().submit_transaction(tx.clone()).await {
152                Ok(serde_outcome) => match serde_outcome.try_into_inner(context.decoders()) {
153                    Ok(outcome) => {
154                        if let TransactionSubmissionOutcome(Err(transaction_error)) = outcome {
155                            return transaction_error.to_string();
156                        }
157                    }
158                    Err(decode_error) => {
159                        warn!(target: LOG_CLIENT_NET_API, error = %decode_error, "Failed to decode SerdeModuleEncoding");
160                    }
161                },
162                Err(error) => {
163                    error.report_if_important();
164                }
165            }
166
167            sleep(RETRY_INTERVAL).await;
168        }
169    }
170
171    async fn trigger_created_accepted(txid: TransactionId, context: DynGlobalClientContext) {
172        loop {
173            match context.api().await_transaction(txid).await {
174                Ok(..) => return,
175                Err(error) => error.report_if_important(),
176            }
177
178            sleep(RETRY_INTERVAL).await;
179        }
180    }
181}
182
183impl IntoDynInstance for TxSubmissionStatesSM {
184    type DynType = DynState;
185
186    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
187        DynState::from_typed(instance_id, self)
188    }
189}
190
191pub fn tx_submission_sm_decoder() -> Decoder {
192    let mut decoder_builder = Decoder::builder_system();
193    decoder_builder.with_decodable_type::<TxSubmissionStatesSM>();
194    decoder_builder.build()
195}