fedimint_client/transaction/
sm.rs1use std::time::Duration;
4
5use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
8use fedimint_core::util::backoff_util::custom_backoff;
9use fedimint_core::util::retry;
10use fedimint_core::TransactionId;
11use tokio::sync::watch;
12
13use crate::sm::{Context, DynContext, State, StateTransition};
14use crate::{DynGlobalClientContext, DynState, TxAcceptedEvent, TxRejectedEvent};
15
16pub const TRANSACTION_SUBMISSION_MODULE_INSTANCE: ModuleInstanceId = 0xffff;
19
20#[derive(Debug, Clone)]
21pub struct TxSubmissionContext;
22
23impl Context for TxSubmissionContext {
24 const KIND: Option<ModuleKind> = None;
25}
26
27impl IntoDynInstance for TxSubmissionContext {
28 type DynType = DynContext;
29
30 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
31 DynContext::from_typed(instance_id, self)
32 }
33}
34
35#[cfg_attr(doc, aquamarine::aquamarine)]
36#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
47pub struct TxSubmissionStatesSM {
48 pub operation_id: OperationId,
49 pub state: TxSubmissionStates,
50}
51
52#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
53pub enum TxSubmissionStates {
54 Created(Transaction),
57 Accepted(TransactionId),
61 Rejected(TransactionId, String),
65 NonRetryableError(String),
70}
71
72impl State for TxSubmissionStatesSM {
73 type ModuleContext = TxSubmissionContext;
74
75 fn transitions(
76 &self,
77 _context: &Self::ModuleContext,
78 global_context: &DynGlobalClientContext,
79 ) -> Vec<StateTransition<Self>> {
80 let operation_id = self.operation_id;
81 let (tx_submitted_sender, tx_submitted_receiver) = watch::channel(false);
89 match self.state.clone() {
90 TxSubmissionStates::Created(transaction) => {
91 let txid = transaction.tx_hash();
92 vec![
93 StateTransition::new(
94 TxSubmissionStates::trigger_created_rejected(
95 transaction.clone(),
96 global_context.clone(),
97 tx_submitted_sender,
98 ),
99 {
100 let global_context = global_context.clone();
101 move |sm_dbtx, error, _| {
102 let global_context = global_context.clone();
103 Box::pin(async move {
104 global_context
105 .log_event(
106 sm_dbtx,
107 TxRejectedEvent {
108 txid,
109 operation_id,
110 error: error.clone(),
111 },
112 )
113 .await;
114 TxSubmissionStatesSM {
115 state: TxSubmissionStates::Rejected(txid, error),
116 operation_id,
117 }
118 })
119 }
120 },
121 ),
122 StateTransition::new(
123 TxSubmissionStates::trigger_created_accepted(
124 txid,
125 global_context.clone(),
126 tx_submitted_receiver,
127 ),
128 {
129 let global_context = global_context.clone();
130 move |sm_dbtx, (), _| {
131 let global_context = global_context.clone();
132 Box::pin(async move {
133 global_context
134 .log_event(sm_dbtx, TxAcceptedEvent { txid, operation_id })
135 .await;
136 TxSubmissionStatesSM {
137 state: TxSubmissionStates::Accepted(txid),
138 operation_id,
139 }
140 })
141 }
142 },
143 ),
144 ]
145 }
146 TxSubmissionStates::Accepted(..)
147 | TxSubmissionStates::Rejected(..)
148 | TxSubmissionStates::NonRetryableError(..) => {
149 vec![]
150 }
151 }
152 }
153
154 fn operation_id(&self) -> OperationId {
155 self.operation_id
156 }
157}
158
159impl TxSubmissionStates {
160 async fn trigger_created_rejected(
161 transaction: Transaction,
162 context: DynGlobalClientContext,
163 tx_submitted: watch::Sender<bool>,
164 ) -> String {
165 retry(
166 "tx-submit-sm",
167 custom_backoff(Duration::from_secs(2), Duration::from_secs(600), None),
168 || async {
169 if let TransactionSubmissionOutcome(Err(transaction_error)) = context
170 .api()
171 .submit_transaction(transaction.clone())
172 .await
173 .try_into_inner(context.decoders())?
174 {
175 Ok(transaction_error.to_string())
176 } else {
177 let _ = tx_submitted.send(true);
178 Err(anyhow::anyhow!("Transaction is still valid"))
179 }
180 },
181 )
182 .await
183 .expect("Number of retries is has no limit")
184 }
185
186 async fn trigger_created_accepted(
187 txid: TransactionId,
188 context: DynGlobalClientContext,
189 mut tx_submitted: watch::Receiver<bool>,
190 ) {
191 let _ = tx_submitted.wait_for(|submitted| *submitted).await;
192 context.api().await_transaction(txid).await;
193 }
194}
195
196impl IntoDynInstance for TxSubmissionStatesSM {
197 type DynType = DynState;
198
199 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
200 DynState::from_typed(instance_id, self)
201 }
202}
203
204pub fn tx_submission_sm_decoder() -> Decoder {
205 let mut decoder_builder = Decoder::builder_system();
206 decoder_builder.with_decodable_type::<TxSubmissionStatesSM>();
207 decoder_builder.build()
208}