fedimint_client/sm/
notifier.rs1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use fedimint_core::core::{ModuleInstanceId, OperationId};
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::util::broadcaststream::BroadcastStream;
7use fedimint_core::util::BoxStream;
8use futures::StreamExt;
9use tracing::{debug, error, trace};
10
11use crate::sm::executor::{
12 ActiveModuleOperationStateKeyPrefix, ActiveStateKey, InactiveModuleOperationStateKeyPrefix,
13 InactiveStateKey,
14};
15use crate::sm::{ActiveStateMeta, DynState, InactiveStateMeta, State};
16
17#[derive(Clone)]
24pub struct Notifier {
25 broadcast: tokio::sync::broadcast::Sender<DynState>,
27 db: Database,
29}
30
31impl Notifier {
32 pub fn new(db: Database) -> Self {
33 let (sender, _receiver) = tokio::sync::broadcast::channel(10_000);
34 Self {
35 broadcast: sender,
36 db,
37 }
38 }
39
40 pub fn notify(&self, state: DynState) {
42 let queue_len = self.broadcast.len();
43 trace!(?state, %queue_len, "Sending notification about state transition");
44 if let Err(e) = self.broadcast.send(state) {
46 debug!(
47 ?e,
48 %queue_len,
49 receivers=self.broadcast.receiver_count(),
50 "Could not send state transition notification, no active receivers"
51 );
52 }
53 }
54
55 pub fn module_notifier<S>(&self, module_instance: ModuleInstanceId) -> ModuleNotifier<S> {
58 ModuleNotifier {
59 broadcast: self.broadcast.clone(),
60 module_instance,
61 db: self.db.clone(),
62 _pd: PhantomData,
63 }
64 }
65
66 pub fn sender(&self) -> NotifierSender {
69 NotifierSender {
70 sender: self.broadcast.clone(),
71 }
72 }
73}
74
75pub struct NotifierSender {
80 sender: tokio::sync::broadcast::Sender<DynState>,
81}
82
83impl NotifierSender {
84 pub fn notify(&self, state: DynState) {
86 let _res = self.sender.send(state);
87 }
88}
89
90#[derive(Debug, Clone)]
93pub struct ModuleNotifier<S> {
94 broadcast: tokio::sync::broadcast::Sender<DynState>,
95 module_instance: ModuleInstanceId,
96 db: Database,
99 _pd: PhantomData<S>,
102}
103
104impl<S> ModuleNotifier<S>
105where
106 S: State,
107{
108 pub async fn subscribe(&self, operation_id: OperationId) -> BoxStream<'static, S> {
118 let to_typed_state = |state: DynState| {
119 state
120 .as_any()
121 .downcast_ref::<S>()
122 .expect("Tried to subscribe to wrong state type")
123 .clone()
124 };
125
126 let new_transitions = self.subscribe_all_operations();
129
130 let db_states = {
131 let mut dbtx = self.db.begin_transaction_nc().await;
132 let active_states = dbtx
133 .find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
134 operation_id,
135 module_instance: self.module_instance,
136 })
137 .await
138 .map(|(key, val): (ActiveStateKey, ActiveStateMeta)| {
139 (to_typed_state(key.state), val.created_at)
140 })
141 .collect::<Vec<(S, _)>>()
142 .await;
143
144 let inactive_states = dbtx
145 .find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
146 operation_id,
147 module_instance: self.module_instance,
148 })
149 .await
150 .map(|(key, val): (InactiveStateKey, InactiveStateMeta)| {
151 (to_typed_state(key.state), val.created_at)
152 })
153 .collect::<Vec<(S, _)>>()
154 .await;
155
156 let mut all_states_timed = active_states
159 .into_iter()
160 .chain(inactive_states)
161 .collect::<Vec<(S, _)>>();
162 all_states_timed.sort_by(|(_, t1), (_, t2)| t1.cmp(t2));
163 debug!(
164 operation_id = %operation_id.fmt_short(),
165 num = all_states_timed.len(),
166 "Returning state transitions from DB for notifier subscription",
167 );
168 all_states_timed
169 .into_iter()
170 .map(|(s, _)| s)
171 .collect::<Vec<S>>()
172 };
173
174 let new_transitions = new_transitions.filter_map({
175 let db_states: Arc<_> = Arc::new(db_states.clone());
176
177 move |state: S| {
178 let db_states = db_states.clone();
179 async move {
180 if state.operation_id() == operation_id {
181 trace!(operation_id = %operation_id.fmt_short(), ?state, "Received state transition notification");
182 if db_states.iter().any(|db_s| db_s == &state) {
190 debug!(operation_id = %operation_id.fmt_short(), ?state, "Ignoring duplicated event");
191 return None;
192 }
193 Some(state)
194 } else {
195 None
196 }
197 }
198 }
199 });
200 Box::pin(futures::stream::iter(db_states).chain(new_transitions))
201 }
202
203 pub fn subscribe_all_operations(&self) -> BoxStream<'static, S> {
205 let module_instance_id = self.module_instance;
206 Box::pin(
207 BroadcastStream::new(self.broadcast.subscribe())
208 .take_while(|res| {
209 let cont = if let Err(err) = res {
210 error!(?err, "ModuleNotifier stream stopped on error");
211 false
212 } else {
213 true
214 };
215 std::future::ready(cont)
216 })
217 .filter_map(move |res| async move {
218 let s = res.expect("We filtered out errors above");
219 if s.module_instance_id() == module_instance_id {
220 Some(
221 s.as_any()
222 .downcast_ref::<S>()
223 .expect("Tried to subscribe to wrong state type")
224 .clone(),
225 )
226 } else {
227 None
228 }
229 }),
230 )
231 }
232}