fedimint_client/sm/
notifier.rs

1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use fedimint_core::core::{ModuleInstanceId, OperationId};
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::util::broadcaststream::BroadcastStream;
7use fedimint_core::util::BoxStream;
8use fedimint_logging::LOG_CLIENT;
9use futures::StreamExt;
10use tracing::{debug, error, trace};
11
12use crate::sm::executor::{
13    ActiveModuleOperationStateKeyPrefix, ActiveStateKey, InactiveModuleOperationStateKeyPrefix,
14    InactiveStateKey,
15};
16use crate::sm::{ActiveStateMeta, DynState, InactiveStateMeta, State};
17
18/// State transition notifier owned by the modularized client used to inform
19/// modules of state transitions.
20///
21/// To not lose any state transitions that happen before a module subscribes to
22/// the operation the notifier loads all belonging past state transitions from
23/// the DB. State transitions may be reported multiple times and out of order.
24#[derive(Clone)]
25pub struct Notifier {
26    /// Broadcast channel used to send state transitions to all subscribers
27    broadcast: tokio::sync::broadcast::Sender<DynState>,
28    /// Database used to load all states that happened before subscribing
29    db: Database,
30}
31
32impl Notifier {
33    pub fn new(db: Database) -> Self {
34        let (sender, _receiver) = tokio::sync::broadcast::channel(10_000);
35        Self {
36            broadcast: sender,
37            db,
38        }
39    }
40
41    /// Notify all subscribers of a state transition
42    pub fn notify(&self, state: DynState) {
43        let queue_len = self.broadcast.len();
44        trace!(?state, %queue_len, "Sending notification about state transition");
45        // FIXME: use more robust notification mechanism
46        if let Err(e) = self.broadcast.send(state) {
47            debug!(
48                ?e,
49                %queue_len,
50                receivers=self.broadcast.receiver_count(),
51                "Could not send state transition notification, no active receivers"
52            );
53        }
54    }
55
56    /// Create a new notifier for a specific module instance that can only
57    /// subscribe to the instance's state transitions
58    pub fn module_notifier<S>(&self, module_instance: ModuleInstanceId) -> ModuleNotifier<S> {
59        ModuleNotifier {
60            broadcast: self.broadcast.clone(),
61            module_instance,
62            db: self.db.clone(),
63            _pd: PhantomData,
64        }
65    }
66
67    /// Create a [`NotifierSender`] handle that lets the owner trigger
68    /// notifications without having to hold a full `Notifier`.
69    pub fn sender(&self) -> NotifierSender {
70        NotifierSender {
71            sender: self.broadcast.clone(),
72        }
73    }
74}
75
76/// Notifier send handle that can be shared to places where we don't need an
77/// entire [`Notifier`] but still need to trigger notifications. The main use
78/// case is triggering notifications when a DB transaction was committed
79/// successfully.
80pub struct NotifierSender {
81    sender: tokio::sync::broadcast::Sender<DynState>,
82}
83
84impl NotifierSender {
85    /// Notify all subscribers of a state transition
86    pub fn notify(&self, state: DynState) {
87        let _res = self.sender.send(state);
88    }
89}
90
91/// State transition notifier for a specific module instance that can only
92/// subscribe to transitions belonging to that module
93#[derive(Debug, Clone)]
94pub struct ModuleNotifier<S> {
95    broadcast: tokio::sync::broadcast::Sender<DynState>,
96    module_instance: ModuleInstanceId,
97    /// Database used to load all states that happened before subscribing, see
98    /// [`Notifier`]
99    db: Database,
100    /// `S` limits the type of state that can be subscribed to the one
101    /// associated with the module instance
102    _pd: PhantomData<S>,
103}
104
105impl<S> ModuleNotifier<S>
106where
107    S: State,
108{
109    // TODO: remove duplicates and order old transitions
110    /// Subscribe to state transitions belonging to an operation and module
111    /// (module context contained in struct).
112    ///
113    /// The returned stream will contain all past state transitions that
114    /// happened before the subscription and are read from the database, after
115    /// these the stream will contain all future state transitions. The states
116    /// loaded from the database are not returned in a specific order. There may
117    /// also be duplications.
118    pub async fn subscribe(&self, operation_id: OperationId) -> BoxStream<'static, S> {
119        let to_typed_state = |state: DynState| {
120            state
121                .as_any()
122                .downcast_ref::<S>()
123                .expect("Tried to subscribe to wrong state type")
124                .clone()
125        };
126
127        // It's important to start the subscription first and then query the database to
128        // not lose any transitions in the meantime.
129        let new_transitions = self.subscribe_all_operations();
130
131        let db_states = {
132            let mut dbtx = self.db.begin_transaction_nc().await;
133            let active_states = dbtx
134                .find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
135                    operation_id,
136                    module_instance: self.module_instance,
137                })
138                .await
139                .map(|(key, val): (ActiveStateKey, ActiveStateMeta)| {
140                    (to_typed_state(key.state), val.created_at)
141                })
142                .collect::<Vec<(S, _)>>()
143                .await;
144
145            let inactive_states = dbtx
146                .find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
147                    operation_id,
148                    module_instance: self.module_instance,
149                })
150                .await
151                .map(|(key, val): (InactiveStateKey, InactiveStateMeta)| {
152                    (to_typed_state(key.state), val.created_at)
153                })
154                .collect::<Vec<(S, _)>>()
155                .await;
156
157            // FIXME: don't rely on SystemTime for ordering and introduce a state transition
158            // index instead (dpc was right again xD)
159            let mut all_states_timed = active_states
160                .into_iter()
161                .chain(inactive_states)
162                .collect::<Vec<(S, _)>>();
163            all_states_timed.sort_by(|(_, t1), (_, t2)| t1.cmp(t2));
164            debug!(
165                operation_id = %operation_id.fmt_short(),
166                num = all_states_timed.len(),
167                "Returning state transitions from DB for notifier subscription",
168            );
169            all_states_timed
170                .into_iter()
171                .map(|(s, _)| s)
172                .collect::<Vec<S>>()
173        };
174
175        let new_transitions = new_transitions.filter_map({
176            let db_states: Arc<_> = Arc::new(db_states.clone());
177
178            move |state: S| {
179                let db_states = db_states.clone();
180                async move {
181                    if state.operation_id() == operation_id {
182                        trace!(operation_id = %operation_id.fmt_short(), ?state, "Received state transition notification");
183                        // Deduplicate events that might have both come from the DB and streamed,
184                        // due to subscribing to notifier before querying the DB.
185                        //
186                        // Note: linear search should be good enough in practice for many reasons.
187                        // Eg. states tend to have all the states in the DB, or all streamed "live",
188                        // so the overlap here should be minimal.
189                        // And we'll rewrite the whole thing anyway and use only db as a reference.
190                        if db_states.iter().any(|db_s| db_s == &state) {
191                            debug!(operation_id = %operation_id.fmt_short(), ?state, "Ignoring duplicated event");
192                            return None;
193                        }
194                        Some(state)
195                    } else {
196                        None
197                    }
198                }
199            }
200        });
201        Box::pin(futures::stream::iter(db_states).chain(new_transitions))
202    }
203
204    /// Subscribe to all state transitions belonging to the module instance.
205    pub fn subscribe_all_operations(&self) -> BoxStream<'static, S> {
206        let module_instance_id = self.module_instance;
207        Box::pin(
208            BroadcastStream::new(self.broadcast.subscribe())
209                .take_while(|res| {
210                    let cont = if let Err(err) = res {
211                        error!(target: LOG_CLIENT, ?err, "ModuleNotifier stream stopped on error");
212                        false
213                    } else {
214                        true
215                    };
216                    std::future::ready(cont)
217                })
218                .filter_map(move |res| async move {
219                    let s = res.expect("We filtered out errors above");
220                    if s.module_instance_id() == module_instance_id {
221                        Some(
222                            s.as_any()
223                                .downcast_ref::<S>()
224                                .expect("Tried to subscribe to wrong state type")
225                                .clone(),
226                        )
227                    } else {
228                        None
229                    }
230                }),
231        )
232    }
233}