fedimint_server/consensus/
db.rs1use std::collections::BTreeMap;
2use std::fmt::Debug;
3
4use fedimint_core::core::{DynInput, DynModuleConsensusItem, DynOutput, ModuleInstanceId};
5use fedimint_core::db::{
6 CoreMigrationFn, DatabaseVersion, IDatabaseTransactionOpsCoreTyped, MigrationContext,
7 MODULE_GLOBAL_PREFIX,
8};
9use fedimint_core::encoding::{Decodable, Encodable};
10use fedimint_core::epoch::ConsensusItem;
11use fedimint_core::module::ModuleCommon;
12use fedimint_core::session_outcome::{AcceptedItem, SignedSessionOutcome};
13use fedimint_core::util::BoxStream;
14use fedimint_core::{apply, async_trait_maybe_send, impl_db_lookup, impl_db_record, TransactionId};
15use futures::StreamExt;
16use serde::Serialize;
17use strum_macros::EnumIter;
18
19#[repr(u8)]
20#[derive(Clone, EnumIter, Debug)]
21pub enum DbKeyPrefix {
22 AcceptedItem = 0x01,
23 AcceptedTransaction = 0x02,
24 SignedSessionOutcome = 0x04,
25 AlephUnits = 0x05,
26 ApiAnnouncements = 0x06,
28 ServerInfo = 0x07,
29 Module = MODULE_GLOBAL_PREFIX,
30}
31
32impl std::fmt::Display for DbKeyPrefix {
33 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
34 write!(f, "{self:?}")
35 }
36}
37
38#[derive(Clone, Debug, Encodable, Decodable)]
39pub struct AcceptedItemKey(pub u64);
40
41#[derive(Clone, Debug, Encodable, Decodable)]
42pub struct AcceptedItemPrefix;
43
44impl_db_record!(
45 key = AcceptedItemKey,
46 value = AcceptedItem,
47 db_prefix = DbKeyPrefix::AcceptedItem,
48 notify_on_modify = false,
49);
50impl_db_lookup!(key = AcceptedItemKey, query_prefix = AcceptedItemPrefix);
51
52#[derive(Debug, Encodable, Decodable, Serialize)]
53pub struct AcceptedTransactionKey(pub TransactionId);
54
55#[derive(Debug, Encodable, Decodable)]
56pub struct AcceptedTransactionKeyPrefix;
57
58impl_db_record!(
59 key = AcceptedTransactionKey,
60 value = Vec<ModuleInstanceId>,
61 db_prefix = DbKeyPrefix::AcceptedTransaction,
62 notify_on_modify = true,
63);
64impl_db_lookup!(
65 key = AcceptedTransactionKey,
66 query_prefix = AcceptedTransactionKeyPrefix
67);
68
69#[derive(Debug, Encodable, Decodable)]
70pub struct SignedSessionOutcomeKey(pub u64);
71
72#[derive(Debug, Encodable, Decodable)]
73pub struct SignedSessionOutcomePrefix;
74
75impl_db_record!(
76 key = SignedSessionOutcomeKey,
77 value = SignedSessionOutcome,
78 db_prefix = DbKeyPrefix::SignedSessionOutcome,
79 notify_on_modify = true,
80);
81impl_db_lookup!(
82 key = SignedSessionOutcomeKey,
83 query_prefix = SignedSessionOutcomePrefix
84);
85
86#[derive(Debug, Encodable, Decodable)]
87pub struct AlephUnitsKey(pub u64);
88
89#[derive(Debug, Encodable, Decodable)]
90pub struct AlephUnitsPrefix;
91
92impl_db_record!(
93 key = AlephUnitsKey,
94 value = Vec<u8>,
95 db_prefix = DbKeyPrefix::AlephUnits,
96 notify_on_modify = false,
97);
98impl_db_lookup!(key = AlephUnitsKey, query_prefix = AlephUnitsPrefix);
99
100pub fn get_global_database_migrations() -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
101 BTreeMap::new()
102}
103
104pub enum ModuleHistoryItem {
105 ConsensusItem(DynModuleConsensusItem),
106 Input(DynInput),
107 Output(DynOutput),
108}
109
110pub enum TypedModuleHistoryItem<M: ModuleCommon> {
111 ConsensusItem(M::ConsensusItem),
112 Input(M::Input),
113 Output(M::Output),
114}
115
116#[apply(async_trait_maybe_send!)]
117pub trait MigrationContextExt {
118 async fn get_module_history_stream(&mut self) -> BoxStream<ModuleHistoryItem>;
119
120 async fn get_typed_module_history_stream<M: ModuleCommon>(
121 &mut self,
122 ) -> BoxStream<TypedModuleHistoryItem<M>>;
123}
124
125#[apply(async_trait_maybe_send!)]
126impl MigrationContextExt for MigrationContext<'_> {
127 async fn get_module_history_stream(&mut self) -> BoxStream<ModuleHistoryItem> {
128 let module_instance_id = self
129 .module_instance_id()
130 .expect("module_instance_id must be set");
131
132 let active_session_items = self
136 .__global_dbtx()
137 .find_by_prefix(&AcceptedItemPrefix)
138 .await
139 .map(|(_, item)| item)
140 .collect::<Vec<_>>()
141 .await;
142
143 let stream = self
144 .__global_dbtx()
145 .find_by_prefix(&SignedSessionOutcomePrefix)
146 .await
147 .flat_map(|(_, signed_session_outcome): (_, SignedSessionOutcome)| {
149 futures::stream::iter(signed_session_outcome.session_outcome.items)
150 })
151 .chain(futures::stream::iter(active_session_items))
154 .flat_map(move |item| {
155 let history_items = match item.item {
156 ConsensusItem::Transaction(tx) => tx
157 .inputs
158 .into_iter()
159 .filter_map(|input| {
160 (input.module_instance_id() == module_instance_id)
161 .then_some(ModuleHistoryItem::Input(input))
162 })
163 .chain(tx.outputs.into_iter().filter_map(|output| {
164 (output.module_instance_id() == module_instance_id)
165 .then_some(ModuleHistoryItem::Output(output))
166 }))
167 .collect::<Vec<_>>(),
168 ConsensusItem::Module(mci) => {
169 if mci.module_instance_id() == module_instance_id {
170 vec![ModuleHistoryItem::ConsensusItem(mci)]
171 } else {
172 vec![]
173 }
174 }
175 ConsensusItem::Default { .. } => {
176 unreachable!("We never save unknown CIs on the server side")
177 }
178 };
179 futures::stream::iter(history_items)
180 });
181
182 Box::pin(stream)
183 }
184
185 async fn get_typed_module_history_stream<M: ModuleCommon>(
186 &mut self,
187 ) -> BoxStream<TypedModuleHistoryItem<M>> {
188 Box::pin(self.get_module_history_stream().await.map(|item| {
189 match item {
190 ModuleHistoryItem::ConsensusItem(ci) => TypedModuleHistoryItem::ConsensusItem(
191 ci.as_any()
192 .downcast_ref::<M::ConsensusItem>()
193 .expect("Wrong module type")
194 .clone(),
195 ),
196 ModuleHistoryItem::Input(input) => TypedModuleHistoryItem::Input(
197 input
198 .as_any()
199 .downcast_ref::<M::Input>()
200 .expect("Wrong module type")
201 .clone(),
202 ),
203 ModuleHistoryItem::Output(output) => TypedModuleHistoryItem::Output(
204 output
205 .as_any()
206 .downcast_ref::<M::Output>()
207 .expect("Wrong module type")
208 .clone(),
209 ),
210 }
211 }))
212 }
213}