fedimint_client/transaction/
sm.rs1use std::time::Duration;
4
5use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::runtime::sleep;
8use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
9use fedimint_core::TransactionId;
10use fedimint_logging::LOG_CLIENT_NET_API;
11use tracing::warn;
12
13use crate::sm::{Context, DynContext, State, StateTransition};
14use crate::{DynGlobalClientContext, DynState, TxAcceptedEvent, TxRejectedEvent};
15
16pub const TRANSACTION_SUBMISSION_MODULE_INSTANCE: ModuleInstanceId = 0xffff;
19
20const RETRY_INTERVAL: Duration = Duration::from_secs(5);
21
22#[derive(Debug, Clone)]
23pub struct TxSubmissionContext;
24
25impl Context for TxSubmissionContext {
26 const KIND: Option<ModuleKind> = None;
27}
28
29impl IntoDynInstance for TxSubmissionContext {
30 type DynType = DynContext;
31
32 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
33 DynContext::from_typed(instance_id, self)
34 }
35}
36
37#[cfg_attr(doc, aquamarine::aquamarine)]
38#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
49pub struct TxSubmissionStatesSM {
50 pub operation_id: OperationId,
51 pub state: TxSubmissionStates,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
55pub enum TxSubmissionStates {
56 Created(Transaction),
59 Accepted(TransactionId),
63 Rejected(TransactionId, String),
67 NonRetryableError(String),
72}
73
74impl State for TxSubmissionStatesSM {
75 type ModuleContext = TxSubmissionContext;
76
77 fn transitions(
78 &self,
79 _context: &Self::ModuleContext,
80 global_context: &DynGlobalClientContext,
81 ) -> Vec<StateTransition<Self>> {
82 let operation_id = self.operation_id;
83 match self.state.clone() {
84 TxSubmissionStates::Created(transaction) => {
85 let txid = transaction.tx_hash();
86 vec![
87 StateTransition::new(
88 TxSubmissionStates::trigger_created_rejected(
89 transaction.clone(),
90 global_context.clone(),
91 ),
92 {
93 let global_context = global_context.clone();
94 move |sm_dbtx, error, _| {
95 let global_context = global_context.clone();
96 Box::pin(async move {
97 global_context
98 .log_event(
99 sm_dbtx,
100 TxRejectedEvent {
101 txid,
102 operation_id,
103 error: error.clone(),
104 },
105 )
106 .await;
107 TxSubmissionStatesSM {
108 state: TxSubmissionStates::Rejected(txid, error),
109 operation_id,
110 }
111 })
112 }
113 },
114 ),
115 StateTransition::new(
116 TxSubmissionStates::trigger_created_accepted(txid, global_context.clone()),
117 {
118 let global_context = global_context.clone();
119 move |sm_dbtx, (), _| {
120 let global_context = global_context.clone();
121 Box::pin(async move {
122 global_context
123 .log_event(sm_dbtx, TxAcceptedEvent { txid, operation_id })
124 .await;
125 TxSubmissionStatesSM {
126 state: TxSubmissionStates::Accepted(txid),
127 operation_id,
128 }
129 })
130 }
131 },
132 ),
133 ]
134 }
135 TxSubmissionStates::Accepted(..)
136 | TxSubmissionStates::Rejected(..)
137 | TxSubmissionStates::NonRetryableError(..) => {
138 vec![]
139 }
140 }
141 }
142
143 fn operation_id(&self) -> OperationId {
144 self.operation_id
145 }
146}
147
148impl TxSubmissionStates {
149 async fn trigger_created_rejected(tx: Transaction, context: DynGlobalClientContext) -> String {
150 loop {
151 match context.api().submit_transaction(tx.clone()).await {
152 Ok(serde_outcome) => match serde_outcome.try_into_inner(context.decoders()) {
153 Ok(outcome) => {
154 if let TransactionSubmissionOutcome(Err(transaction_error)) = outcome {
155 return transaction_error.to_string();
156 }
157 }
158 Err(decode_error) => {
159 warn!(target: LOG_CLIENT_NET_API, error = %decode_error, "Failed to decode SerdeModuleEncoding");
160 }
161 },
162 Err(error) => {
163 error.report_if_important();
164 }
165 }
166
167 sleep(RETRY_INTERVAL).await;
168 }
169 }
170
171 async fn trigger_created_accepted(txid: TransactionId, context: DynGlobalClientContext) {
172 loop {
173 match context.api().await_transaction(txid).await {
174 Ok(..) => return,
175 Err(error) => error.report_if_important(),
176 }
177
178 sleep(RETRY_INTERVAL).await;
179 }
180 }
181}
182
183impl IntoDynInstance for TxSubmissionStatesSM {
184 type DynType = DynState;
185
186 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
187 DynState::from_typed(instance_id, self)
188 }
189}
190
191pub fn tx_submission_sm_decoder() -> Decoder {
192 let mut decoder_builder = Decoder::builder_system();
193 decoder_builder.with_decodable_type::<TxSubmissionStatesSM>();
194 decoder_builder.build()
195}