fedimint_client/transaction/
sm.rs

1//! State machine for submitting transactions
2
3use std::time::Duration;
4
5use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
8use fedimint_core::util::backoff_util::custom_backoff;
9use fedimint_core::util::retry;
10use fedimint_core::TransactionId;
11use tokio::sync::watch;
12
13use crate::sm::{Context, DynContext, State, StateTransition};
14use crate::{DynGlobalClientContext, DynState, TxAcceptedEvent, TxRejectedEvent};
15
16// TODO: how to prevent collisions? Generally reserve some range for custom IDs?
17/// Reserved module instance id used for client-internal state machines
18pub const TRANSACTION_SUBMISSION_MODULE_INSTANCE: ModuleInstanceId = 0xffff;
19
20#[derive(Debug, Clone)]
21pub struct TxSubmissionContext;
22
23impl Context for TxSubmissionContext {
24    const KIND: Option<ModuleKind> = None;
25}
26
27impl IntoDynInstance for TxSubmissionContext {
28    type DynType = DynContext;
29
30    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
31        DynContext::from_typed(instance_id, self)
32    }
33}
34
35#[cfg_attr(doc, aquamarine::aquamarine)]
36/// State machine to (re-)submit a transaction until it is either accepted or
37/// rejected by the federation
38///
39/// ```mermaid
40/// flowchart LR
41///     Created -- tx is accepted by consensus --> Accepted
42///     Created -- tx is rejected on submission --> Rejected
43/// ```
44// NOTE: This struct needs to retain the same encoding as [`crate::sm::OperationState`],
45// because it was used to replace it, and clients already have it persisted.
46#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
47pub struct TxSubmissionStatesSM {
48    pub operation_id: OperationId,
49    pub state: TxSubmissionStates,
50}
51
52#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
53pub enum TxSubmissionStates {
54    /// The transaction has been created and potentially already been submitted,
55    /// but no rejection or acceptance happened so far
56    Created(Transaction),
57    /// The transaction has been accepted in consensus
58    ///
59    /// **This state is final**
60    Accepted(TransactionId),
61    /// The transaction has been rejected by a quorum on submission
62    ///
63    /// **This state is final**
64    Rejected(TransactionId, String),
65    // Ideally this would be uncommented:
66    // #[deprecated(since = "0.2.2", note = "all errors should be retried")]
67    // but due to some rust bug/limitation it seem impossible to prevent
68    // existing usages from spamming compilation output with warnings.
69    NonRetryableError(String),
70}
71
72impl State for TxSubmissionStatesSM {
73    type ModuleContext = TxSubmissionContext;
74
75    fn transitions(
76        &self,
77        _context: &Self::ModuleContext,
78        global_context: &DynGlobalClientContext,
79    ) -> Vec<StateTransition<Self>> {
80        let operation_id = self.operation_id;
81        // There is no point awaiting tx until it was submitted, so
82        // `trigger_created_rejected` which does the submitting will use this
83        // channel to let the `trigger_created_accepted` which does the awaiting
84        // know when it did the submission.
85        //
86        // Submitting tx does not guarantee that it will get into consensus, so the
87        // submitting need to continue.
88        let (tx_submitted_sender, tx_submitted_receiver) = watch::channel(false);
89        match self.state.clone() {
90            TxSubmissionStates::Created(transaction) => {
91                let txid = transaction.tx_hash();
92                vec![
93                    StateTransition::new(
94                        TxSubmissionStates::trigger_created_rejected(
95                            transaction.clone(),
96                            global_context.clone(),
97                            tx_submitted_sender,
98                        ),
99                        {
100                            let global_context = global_context.clone();
101                            move |sm_dbtx, error, _| {
102                                let global_context = global_context.clone();
103                                Box::pin(async move {
104                                    global_context
105                                        .log_event(
106                                            sm_dbtx,
107                                            TxRejectedEvent {
108                                                txid,
109                                                operation_id,
110                                                error: error.clone(),
111                                            },
112                                        )
113                                        .await;
114                                    TxSubmissionStatesSM {
115                                        state: TxSubmissionStates::Rejected(txid, error),
116                                        operation_id,
117                                    }
118                                })
119                            }
120                        },
121                    ),
122                    StateTransition::new(
123                        TxSubmissionStates::trigger_created_accepted(
124                            txid,
125                            global_context.clone(),
126                            tx_submitted_receiver,
127                        ),
128                        {
129                            let global_context = global_context.clone();
130                            move |sm_dbtx, (), _| {
131                                let global_context = global_context.clone();
132                                Box::pin(async move {
133                                    global_context
134                                        .log_event(sm_dbtx, TxAcceptedEvent { txid, operation_id })
135                                        .await;
136                                    TxSubmissionStatesSM {
137                                        state: TxSubmissionStates::Accepted(txid),
138                                        operation_id,
139                                    }
140                                })
141                            }
142                        },
143                    ),
144                ]
145            }
146            TxSubmissionStates::Accepted(..)
147            | TxSubmissionStates::Rejected(..)
148            | TxSubmissionStates::NonRetryableError(..) => {
149                vec![]
150            }
151        }
152    }
153
154    fn operation_id(&self) -> OperationId {
155        self.operation_id
156    }
157}
158
159impl TxSubmissionStates {
160    async fn trigger_created_rejected(
161        transaction: Transaction,
162        context: DynGlobalClientContext,
163        tx_submitted: watch::Sender<bool>,
164    ) -> String {
165        retry(
166            "tx-submit-sm",
167            custom_backoff(Duration::from_secs(2), Duration::from_secs(600), None),
168            || async {
169                if let TransactionSubmissionOutcome(Err(transaction_error)) = context
170                    .api()
171                    .submit_transaction(transaction.clone())
172                    .await
173                    .try_into_inner(context.decoders())?
174                {
175                    Ok(transaction_error.to_string())
176                } else {
177                    let _ = tx_submitted.send(true);
178                    Err(anyhow::anyhow!("Transaction is still valid"))
179                }
180            },
181        )
182        .await
183        .expect("Number of retries is has no limit")
184    }
185
186    async fn trigger_created_accepted(
187        txid: TransactionId,
188        context: DynGlobalClientContext,
189        mut tx_submitted: watch::Receiver<bool>,
190    ) {
191        let _ = tx_submitted.wait_for(|submitted| *submitted).await;
192        context.api().await_transaction(txid).await;
193    }
194}
195
196impl IntoDynInstance for TxSubmissionStatesSM {
197    type DynType = DynState;
198
199    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
200        DynState::from_typed(instance_id, self)
201    }
202}
203
204pub fn tx_submission_sm_decoder() -> Decoder {
205    let mut decoder_builder = Decoder::builder_system();
206    decoder_builder.with_decodable_type::<TxSubmissionStatesSM>();
207    decoder_builder.build()
208}