fedimint_client/sm/
notifier.rs1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use fedimint_core::core::{ModuleInstanceId, OperationId};
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::util::broadcaststream::BroadcastStream;
7use fedimint_core::util::BoxStream;
8use fedimint_logging::LOG_CLIENT;
9use futures::StreamExt;
10use tracing::{debug, error, trace};
11
12use crate::sm::executor::{
13 ActiveModuleOperationStateKeyPrefix, ActiveStateKey, InactiveModuleOperationStateKeyPrefix,
14 InactiveStateKey,
15};
16use crate::sm::{ActiveStateMeta, DynState, InactiveStateMeta, State};
17
18#[derive(Clone)]
25pub struct Notifier {
26 broadcast: tokio::sync::broadcast::Sender<DynState>,
28 db: Database,
30}
31
32impl Notifier {
33 pub fn new(db: Database) -> Self {
34 let (sender, _receiver) = tokio::sync::broadcast::channel(10_000);
35 Self {
36 broadcast: sender,
37 db,
38 }
39 }
40
41 pub fn notify(&self, state: DynState) {
43 let queue_len = self.broadcast.len();
44 trace!(?state, %queue_len, "Sending notification about state transition");
45 if let Err(e) = self.broadcast.send(state) {
47 debug!(
48 ?e,
49 %queue_len,
50 receivers=self.broadcast.receiver_count(),
51 "Could not send state transition notification, no active receivers"
52 );
53 }
54 }
55
56 pub fn module_notifier<S>(&self, module_instance: ModuleInstanceId) -> ModuleNotifier<S> {
59 ModuleNotifier {
60 broadcast: self.broadcast.clone(),
61 module_instance,
62 db: self.db.clone(),
63 _pd: PhantomData,
64 }
65 }
66
67 pub fn sender(&self) -> NotifierSender {
70 NotifierSender {
71 sender: self.broadcast.clone(),
72 }
73 }
74}
75
76pub struct NotifierSender {
81 sender: tokio::sync::broadcast::Sender<DynState>,
82}
83
84impl NotifierSender {
85 pub fn notify(&self, state: DynState) {
87 let _res = self.sender.send(state);
88 }
89}
90
91#[derive(Debug, Clone)]
94pub struct ModuleNotifier<S> {
95 broadcast: tokio::sync::broadcast::Sender<DynState>,
96 module_instance: ModuleInstanceId,
97 db: Database,
100 _pd: PhantomData<S>,
103}
104
105impl<S> ModuleNotifier<S>
106where
107 S: State,
108{
109 pub async fn subscribe(&self, operation_id: OperationId) -> BoxStream<'static, S> {
119 let to_typed_state = |state: DynState| {
120 state
121 .as_any()
122 .downcast_ref::<S>()
123 .expect("Tried to subscribe to wrong state type")
124 .clone()
125 };
126
127 let new_transitions = self.subscribe_all_operations();
130
131 let db_states = {
132 let mut dbtx = self.db.begin_transaction_nc().await;
133 let active_states = dbtx
134 .find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
135 operation_id,
136 module_instance: self.module_instance,
137 })
138 .await
139 .map(|(key, val): (ActiveStateKey, ActiveStateMeta)| {
140 (to_typed_state(key.state), val.created_at)
141 })
142 .collect::<Vec<(S, _)>>()
143 .await;
144
145 let inactive_states = dbtx
146 .find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
147 operation_id,
148 module_instance: self.module_instance,
149 })
150 .await
151 .map(|(key, val): (InactiveStateKey, InactiveStateMeta)| {
152 (to_typed_state(key.state), val.created_at)
153 })
154 .collect::<Vec<(S, _)>>()
155 .await;
156
157 let mut all_states_timed = active_states
160 .into_iter()
161 .chain(inactive_states)
162 .collect::<Vec<(S, _)>>();
163 all_states_timed.sort_by(|(_, t1), (_, t2)| t1.cmp(t2));
164 debug!(
165 operation_id = %operation_id.fmt_short(),
166 num = all_states_timed.len(),
167 "Returning state transitions from DB for notifier subscription",
168 );
169 all_states_timed
170 .into_iter()
171 .map(|(s, _)| s)
172 .collect::<Vec<S>>()
173 };
174
175 let new_transitions = new_transitions.filter_map({
176 let db_states: Arc<_> = Arc::new(db_states.clone());
177
178 move |state: S| {
179 let db_states = db_states.clone();
180 async move {
181 if state.operation_id() == operation_id {
182 trace!(operation_id = %operation_id.fmt_short(), ?state, "Received state transition notification");
183 if db_states.iter().any(|db_s| db_s == &state) {
191 debug!(operation_id = %operation_id.fmt_short(), ?state, "Ignoring duplicated event");
192 return None;
193 }
194 Some(state)
195 } else {
196 None
197 }
198 }
199 }
200 });
201 Box::pin(futures::stream::iter(db_states).chain(new_transitions))
202 }
203
204 pub fn subscribe_all_operations(&self) -> BoxStream<'static, S> {
206 let module_instance_id = self.module_instance;
207 Box::pin(
208 BroadcastStream::new(self.broadcast.subscribe())
209 .take_while(|res| {
210 let cont = if let Err(err) = res {
211 error!(target: LOG_CLIENT, ?err, "ModuleNotifier stream stopped on error");
212 false
213 } else {
214 true
215 };
216 std::future::ready(cont)
217 })
218 .filter_map(move |res| async move {
219 let s = res.expect("We filtered out errors above");
220 if s.module_instance_id() == module_instance_id {
221 Some(
222 s.as_any()
223 .downcast_ref::<S>()
224 .expect("Tried to subscribe to wrong state type")
225 .clone(),
226 )
227 } else {
228 None
229 }
230 }),
231 )
232 }
233}