1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3
4pub mod db;
5
6use std::collections::BTreeMap;
7use std::future;
8
9use async_trait::async_trait;
10use db::{
11 MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
12 MetaSubmissionsKey,
13};
14use fedimint_core::config::{
15 ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
16 TypedServerModuleConfig, TypedServerModuleConsensusConfig,
17};
18use fedimint_core::core::ModuleInstanceId;
19use fedimint_core::db::{
20 CoreMigrationFn, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
21 NonCommittable,
22};
23use fedimint_core::module::audit::Audit;
24use fedimint_core::module::{
25 api_endpoint, ApiAuth, ApiEndpoint, ApiError, ApiVersion, CoreConsensusVersion, InputMeta,
26 ModuleConsensusVersion, ModuleInit, PeerHandle, SupportedModuleApiVersions,
27 TransactionItemAmount, CORE_CONSENSUS_VERSION,
28};
29use fedimint_core::{push_db_pair_items, InPoint, NumPeers, OutPoint, PeerId};
30use fedimint_logging::LOG_MODULE_META;
31use fedimint_meta_common::config::{
32 MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigLocal, MetaConfigPrivate,
33};
34pub use fedimint_meta_common::config::{MetaGenParams, MetaGenParamsConsensus, MetaGenParamsLocal};
35use fedimint_meta_common::endpoint::{
36 GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SubmitRequest,
37 GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT, SUBMIT_ENDPOINT,
38};
39use fedimint_meta_common::{
40 MetaCommonInit, MetaConsensusItem, MetaConsensusValue, MetaInput, MetaInputError, MetaKey,
41 MetaModuleTypes, MetaOutput, MetaOutputError, MetaOutputOutcome, MetaValue,
42 MODULE_CONSENSUS_VERSION,
43};
44use fedimint_server::core::{
45 DynServerModule, ServerModule, ServerModuleInit, ServerModuleInitArgs,
46};
47use futures::StreamExt;
48use rand::{thread_rng, Rng};
49use strum::IntoEnumIterator;
50use tracing::{debug, info, trace};
51
52use crate::db::{
53 DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
54 MetaSubmissionsKeyPrefix,
55};
56
57#[derive(Debug, Clone)]
59pub struct MetaInit;
60
61impl ModuleInit for MetaInit {
63 type Common = MetaCommonInit;
64
65 async fn dump_database(
67 &self,
68 dbtx: &mut DatabaseTransaction<'_>,
69 prefix_names: Vec<String>,
70 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
71 let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
73 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
74 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
75 });
76
77 for table in filtered_prefixes {
78 match table {
79 DbKeyPrefix::Desired => {
80 push_db_pair_items!(
81 dbtx,
82 MetaDesiredKeyPrefix,
83 MetaDesiredKey,
84 MetaDesiredValue,
85 items,
86 "Meta Desired"
87 );
88 }
89 DbKeyPrefix::Consensus => {
90 push_db_pair_items!(
91 dbtx,
92 MetaConsensusKeyPrefix,
93 MetaConsensusKey,
94 MetaConsensusValue,
95 items,
96 "Meta Consensus"
97 );
98 }
99 DbKeyPrefix::Submissions => {
100 push_db_pair_items!(
101 dbtx,
102 MetaSubmissionsKeyPrefix,
103 MetaSubmissionsKey,
104 MetaSubmissionValue,
105 items,
106 "Meta Submissions"
107 );
108 }
109 }
110 }
111
112 Box::new(items.into_iter())
113 }
114}
115
116#[async_trait]
118impl ServerModuleInit for MetaInit {
119 type Params = MetaGenParams;
120
121 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
123 &[MODULE_CONSENSUS_VERSION]
124 }
125
126 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
127 SupportedModuleApiVersions::from_raw(
128 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
129 (
130 MODULE_CONSENSUS_VERSION.major,
131 MODULE_CONSENSUS_VERSION.minor,
132 ),
133 &[(0, 0)],
134 )
135 }
136
137 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<DynServerModule> {
139 Ok(Meta {
140 cfg: args.cfg().to_typed()?,
141 our_peer_id: args.our_peer_id(),
142 num_peers: args.num_peers(),
143 }
144 .into())
145 }
146
147 fn trusted_dealer_gen(
149 &self,
150 peers: &[PeerId],
151 params: &ConfigGenModuleParams,
152 ) -> BTreeMap<PeerId, ServerModuleConfig> {
153 let _params = self.parse_params(params).unwrap();
154 peers
156 .iter()
157 .map(|&peer| {
158 let config = MetaConfig {
159 local: MetaConfigLocal {},
160 private: MetaConfigPrivate,
161 consensus: MetaConfigConsensus {},
162 };
163 (peer, config.to_erased())
164 })
165 .collect()
166 }
167
168 async fn distributed_gen(
170 &self,
171 _peers: &PeerHandle,
172 params: &ConfigGenModuleParams,
173 ) -> anyhow::Result<ServerModuleConfig> {
174 let _params = self.parse_params(params).unwrap();
175
176 Ok(MetaConfig {
177 local: MetaConfigLocal {},
178 private: MetaConfigPrivate,
179 consensus: MetaConfigConsensus {},
180 }
181 .to_erased())
182 }
183
184 fn get_client_config(
186 &self,
187 config: &ServerModuleConsensusConfig,
188 ) -> anyhow::Result<MetaClientConfig> {
189 let _config = MetaConfigConsensus::from_erased(config)?;
190 Ok(MetaClientConfig {})
191 }
192
193 fn validate_config(
194 &self,
195 _identity: &PeerId,
196 _config: ServerModuleConfig,
197 ) -> anyhow::Result<()> {
198 Ok(())
199 }
200
201 fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
203 BTreeMap::new()
204 }
205}
206
207#[derive(Debug)]
209pub struct Meta {
210 pub cfg: MetaConfig,
211 pub our_peer_id: PeerId,
212 pub num_peers: NumPeers,
213}
214
215impl Meta {
216 async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
217 dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
218 .await
219 .map(|(k, v)| (k.0, v))
220 .collect()
221 .await
222 }
223
224 async fn get_submission(
225 dbtx: &mut DatabaseTransaction<'_>,
226 key: MetaKey,
227 peer_id: PeerId,
228 ) -> Option<MetaSubmissionValue> {
229 dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
230 }
231
232 async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
233 dbtx.get_value(&MetaConsensusKey(key))
234 .await
235 .map(|consensus_value| consensus_value.value)
236 }
237
238 async fn change_consensus(
239 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
240 key: MetaKey,
241 value: MetaValue,
242 matching_submissions: Vec<PeerId>,
243 ) {
244 let value_len = value.as_slice().len();
245 let revision = dbtx
246 .get_value(&MetaConsensusKey(key))
247 .await
248 .map(|cv| cv.revision);
249 let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
250 dbtx.insert_entry(
251 &MetaConsensusKey(key),
252 &MetaConsensusValue { revision, value },
253 )
254 .await;
255
256 info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
257
258 for peer_id in matching_submissions {
259 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
260 .await;
261 }
262 }
263}
264
265#[async_trait]
267impl ServerModule for Meta {
268 type Common = MetaModuleTypes;
270 type Init = MetaInit;
271
272 async fn consensus_proposal(
277 &self,
278 dbtx: &mut DatabaseTransaction<'_>,
279 ) -> Vec<MetaConsensusItem> {
280 let desired: Vec<_> = Self::get_desired(dbtx).await;
281
282 let mut to_submit = vec![];
283
284 for (
285 key,
286 MetaDesiredValue {
287 value: desired_value,
288 salt,
289 },
290 ) in desired
291 {
292 let consensus_value = &Self::get_consensus(dbtx, key).await;
293 let consensus_submission_value =
294 Self::get_submission(dbtx, key, self.our_peer_id).await;
295 if consensus_submission_value.as_ref()
296 == Some(&MetaSubmissionValue {
297 value: desired_value.clone(),
298 salt,
299 })
300 {
301 } else if consensus_value.as_ref() == Some(&desired_value) {
303 if consensus_submission_value.is_none() {
304 } else {
308 to_submit.push(MetaConsensusItem {
312 key,
313 value: desired_value,
314 salt,
315 });
316 }
317 } else {
318 to_submit.push(MetaConsensusItem {
319 key,
320 value: desired_value,
321 salt,
322 });
323 }
324 }
325
326 trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
327 to_submit
328 }
329
330 async fn process_consensus_item<'a, 'b>(
335 &'a self,
336 dbtx: &mut DatabaseTransaction<'b>,
337 MetaConsensusItem { key, value, salt }: MetaConsensusItem,
338 peer_id: PeerId,
339 ) -> anyhow::Result<()> {
340 trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
341
342 let new_value = MetaSubmissionValue { salt, value };
343 if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await {
345 if prev_value != new_value {
346 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
347 .await;
348 }
349 }
350 if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
352 debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
353 return Ok(());
354 }
355
356 dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
358 .await;
359
360 let matching_submissions: Vec<PeerId> = dbtx
362 .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
363 .await
364 .filter(|(_submission_key, submission_value)| {
365 future::ready(new_value.value == submission_value.value)
366 })
367 .map(|(submission_key, _)| submission_key.peer_id)
368 .collect()
369 .await;
370
371 let threshold = self.num_peers.threshold();
372 info!(target: LOG_MODULE_META,
373 %peer_id,
374 %key,
375 value_len = %new_value.value.as_slice().len(),
376 matching = %matching_submissions.len(),
377 %threshold, "Peer submitted a value");
378
379 if threshold <= matching_submissions.len() {
381 Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
382 }
383
384 Ok(())
385 }
386
387 async fn process_input<'a, 'b, 'c>(
388 &'a self,
389 _dbtx: &mut DatabaseTransaction<'c>,
390 _input: &'b MetaInput,
391 _in_point: InPoint,
392 ) -> Result<InputMeta, MetaInputError> {
393 Err(MetaInputError::NotSupported)
394 }
395
396 async fn process_output<'a, 'b>(
397 &'a self,
398 _dbtx: &mut DatabaseTransaction<'b>,
399 _output: &'a MetaOutput,
400 _out_point: OutPoint,
401 ) -> Result<TransactionItemAmount, MetaOutputError> {
402 Err(MetaOutputError::NotSupported)
403 }
404
405 async fn output_status(
406 &self,
407 _dbtx: &mut DatabaseTransaction<'_>,
408 _out_point: OutPoint,
409 ) -> Option<MetaOutputOutcome> {
410 None
411 }
412
413 async fn audit(
414 &self,
415 _dbtx: &mut DatabaseTransaction<'_>,
416 _audit: &mut Audit,
417 _module_instance_id: ModuleInstanceId,
418 ) {
419 }
420
421 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
422 vec![
423 api_endpoint! {
424 SUBMIT_ENDPOINT,
425 ApiVersion::new(0, 0),
426 async |module: &Meta, context, request: SubmitRequest| -> () {
427
428 match context.request_auth() {
429 None => return Err(ApiError::bad_request("Missing password".to_string())),
430 Some(auth) => {
431 module.handle_submit_request(&mut context.dbtx(), &auth, &request).await?;
432 }
433 }
434
435 Ok(())
436 }
437 },
438 api_endpoint! {
439 GET_CONSENSUS_ENDPOINT,
440 ApiVersion::new(0, 0),
441 async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
442 module.handle_get_consensus_request(&mut context.dbtx().into_nc(), &request).await
443 }
444 },
445 api_endpoint! {
446 GET_CONSENSUS_REV_ENDPOINT,
447 ApiVersion::new(0, 0),
448 async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
449 module.handle_get_consensus_revision_request(&mut context.dbtx().into_nc(), &request).await
450 }
451 },
452 api_endpoint! {
453 GET_SUBMISSIONS_ENDPOINT,
454 ApiVersion::new(0, 0),
455 async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
456 match context.request_auth() {
457 None => return Err(ApiError::bad_request("Missing password".to_string())),
458 Some(auth) => {
459 module.handle_get_submissions_request(&mut context.dbtx().into_nc(),&auth, &request).await
460 }
461 }
462 }
463 },
464 ]
465 }
466}
467
468impl Meta {
469 async fn handle_submit_request(
470 &self,
471 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
472 _auth: &ApiAuth,
473 req: &SubmitRequest,
474 ) -> Result<(), ApiError> {
475 let salt = thread_rng().gen();
476
477 info!(target: LOG_MODULE_META,
478 key = %req.key,
479 peer_id = %self.our_peer_id,
480 value_len = %req.value.as_slice().len(),
481 "Our own guardian submitted a value");
482
483 dbtx.insert_entry(
484 &MetaDesiredKey(req.key),
485 &MetaDesiredValue {
486 value: req.value.clone(),
487 salt,
488 },
489 )
490 .await;
491
492 Ok(())
493 }
494
495 async fn handle_get_consensus_request(
496 &self,
497 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
498 req: &GetConsensusRequest,
499 ) -> Result<Option<MetaConsensusValue>, ApiError> {
500 Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
501 }
502
503 async fn handle_get_consensus_revision_request(
504 &self,
505 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
506 req: &GetConsensusRequest,
507 ) -> Result<Option<u64>, ApiError> {
508 Ok(dbtx
509 .get_value(&MetaConsensusKey(req.0))
510 .await
511 .map(|cv| cv.revision))
512 }
513
514 async fn handle_get_submissions_request(
515 &self,
516 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
517 _auth: &ApiAuth,
518 req: &GetSubmissionsRequest,
519 ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
520 Ok(dbtx
521 .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
522 .await
523 .collect::<Vec<_>>()
524 .await
525 .into_iter()
526 .map(|(k, v)| (k.peer_id, v.value))
527 .collect())
528 }
529}