fedimint_meta_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3
4pub mod db;
5
6use std::collections::BTreeMap;
7use std::future;
8
9use async_trait::async_trait;
10use db::{
11    MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
12    MetaSubmissionsKey,
13};
14use fedimint_core::config::{
15    ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
16    TypedServerModuleConfig, TypedServerModuleConsensusConfig,
17};
18use fedimint_core::core::ModuleInstanceId;
19use fedimint_core::db::{
20    CoreMigrationFn, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
21    NonCommittable,
22};
23use fedimint_core::module::audit::Audit;
24use fedimint_core::module::{
25    api_endpoint, ApiAuth, ApiEndpoint, ApiError, ApiVersion, CoreConsensusVersion, InputMeta,
26    ModuleConsensusVersion, ModuleInit, PeerHandle, SupportedModuleApiVersions,
27    TransactionItemAmount, CORE_CONSENSUS_VERSION,
28};
29use fedimint_core::{push_db_pair_items, InPoint, NumPeers, OutPoint, PeerId};
30use fedimint_logging::LOG_MODULE_META;
31use fedimint_meta_common::config::{
32    MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigLocal, MetaConfigPrivate,
33};
34pub use fedimint_meta_common::config::{MetaGenParams, MetaGenParamsConsensus, MetaGenParamsLocal};
35use fedimint_meta_common::endpoint::{
36    GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SubmitRequest,
37    GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT, SUBMIT_ENDPOINT,
38};
39use fedimint_meta_common::{
40    MetaCommonInit, MetaConsensusItem, MetaConsensusValue, MetaInput, MetaInputError, MetaKey,
41    MetaModuleTypes, MetaOutput, MetaOutputError, MetaOutputOutcome, MetaValue,
42    MODULE_CONSENSUS_VERSION,
43};
44use fedimint_server::core::{
45    DynServerModule, ServerModule, ServerModuleInit, ServerModuleInitArgs,
46};
47use futures::StreamExt;
48use rand::{thread_rng, Rng};
49use strum::IntoEnumIterator;
50use tracing::{debug, info, trace};
51
52use crate::db::{
53    DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
54    MetaSubmissionsKeyPrefix,
55};
56
57/// Generates the module
58#[derive(Debug, Clone)]
59pub struct MetaInit;
60
61// TODO: Boilerplate-code
62impl ModuleInit for MetaInit {
63    type Common = MetaCommonInit;
64
65    /// Dumps all database items for debugging
66    async fn dump_database(
67        &self,
68        dbtx: &mut DatabaseTransaction<'_>,
69        prefix_names: Vec<String>,
70    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
71        // TODO: Boilerplate-code
72        let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
73        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
74            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
75        });
76
77        for table in filtered_prefixes {
78            match table {
79                DbKeyPrefix::Desired => {
80                    push_db_pair_items!(
81                        dbtx,
82                        MetaDesiredKeyPrefix,
83                        MetaDesiredKey,
84                        MetaDesiredValue,
85                        items,
86                        "Meta Desired"
87                    );
88                }
89                DbKeyPrefix::Consensus => {
90                    push_db_pair_items!(
91                        dbtx,
92                        MetaConsensusKeyPrefix,
93                        MetaConsensusKey,
94                        MetaConsensusValue,
95                        items,
96                        "Meta Consensus"
97                    );
98                }
99                DbKeyPrefix::Submissions => {
100                    push_db_pair_items!(
101                        dbtx,
102                        MetaSubmissionsKeyPrefix,
103                        MetaSubmissionsKey,
104                        MetaSubmissionValue,
105                        items,
106                        "Meta Submissions"
107                    );
108                }
109            }
110        }
111
112        Box::new(items.into_iter())
113    }
114}
115
116/// Implementation of server module non-consensus functions
117#[async_trait]
118impl ServerModuleInit for MetaInit {
119    type Params = MetaGenParams;
120
121    /// Returns the version of this module
122    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
123        &[MODULE_CONSENSUS_VERSION]
124    }
125
126    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
127        SupportedModuleApiVersions::from_raw(
128            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
129            (
130                MODULE_CONSENSUS_VERSION.major,
131                MODULE_CONSENSUS_VERSION.minor,
132            ),
133            &[(0, 0)],
134        )
135    }
136
137    /// Initialize the module
138    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<DynServerModule> {
139        Ok(Meta {
140            cfg: args.cfg().to_typed()?,
141            our_peer_id: args.our_peer_id(),
142            num_peers: args.num_peers(),
143        }
144        .into())
145    }
146
147    /// Generates configs for all peers in a trusted manner for testing
148    fn trusted_dealer_gen(
149        &self,
150        peers: &[PeerId],
151        params: &ConfigGenModuleParams,
152    ) -> BTreeMap<PeerId, ServerModuleConfig> {
153        let _params = self.parse_params(params).unwrap();
154        // Generate a config for each peer
155        peers
156            .iter()
157            .map(|&peer| {
158                let config = MetaConfig {
159                    local: MetaConfigLocal {},
160                    private: MetaConfigPrivate,
161                    consensus: MetaConfigConsensus {},
162                };
163                (peer, config.to_erased())
164            })
165            .collect()
166    }
167
168    /// Generates configs for all peers in an untrusted manner
169    async fn distributed_gen(
170        &self,
171        _peers: &PeerHandle,
172        params: &ConfigGenModuleParams,
173    ) -> anyhow::Result<ServerModuleConfig> {
174        let _params = self.parse_params(params).unwrap();
175
176        Ok(MetaConfig {
177            local: MetaConfigLocal {},
178            private: MetaConfigPrivate,
179            consensus: MetaConfigConsensus {},
180        }
181        .to_erased())
182    }
183
184    /// Converts the consensus config into the client config
185    fn get_client_config(
186        &self,
187        config: &ServerModuleConsensusConfig,
188    ) -> anyhow::Result<MetaClientConfig> {
189        let _config = MetaConfigConsensus::from_erased(config)?;
190        Ok(MetaClientConfig {})
191    }
192
193    fn validate_config(
194        &self,
195        _identity: &PeerId,
196        _config: ServerModuleConfig,
197    ) -> anyhow::Result<()> {
198        Ok(())
199    }
200
201    /// DB migrations to move from old to newer versions
202    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
203        BTreeMap::new()
204    }
205}
206
207/// Meta module
208#[derive(Debug)]
209pub struct Meta {
210    pub cfg: MetaConfig,
211    pub our_peer_id: PeerId,
212    pub num_peers: NumPeers,
213}
214
215impl Meta {
216    async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
217        dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
218            .await
219            .map(|(k, v)| (k.0, v))
220            .collect()
221            .await
222    }
223
224    async fn get_submission(
225        dbtx: &mut DatabaseTransaction<'_>,
226        key: MetaKey,
227        peer_id: PeerId,
228    ) -> Option<MetaSubmissionValue> {
229        dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
230    }
231
232    async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
233        dbtx.get_value(&MetaConsensusKey(key))
234            .await
235            .map(|consensus_value| consensus_value.value)
236    }
237
238    async fn change_consensus(
239        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
240        key: MetaKey,
241        value: MetaValue,
242        matching_submissions: Vec<PeerId>,
243    ) {
244        let value_len = value.as_slice().len();
245        let revision = dbtx
246            .get_value(&MetaConsensusKey(key))
247            .await
248            .map(|cv| cv.revision);
249        let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
250        dbtx.insert_entry(
251            &MetaConsensusKey(key),
252            &MetaConsensusValue { revision, value },
253        )
254        .await;
255
256        info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
257
258        for peer_id in matching_submissions {
259            dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
260                .await;
261        }
262    }
263}
264
265/// Implementation of consensus for the server module
266#[async_trait]
267impl ServerModule for Meta {
268    /// Define the consensus types
269    type Common = MetaModuleTypes;
270    type Init = MetaInit;
271
272    /// Check the difference between what's desired vs submitted and consensus.
273    ///
274    /// Returns:
275    /// Items to submit as our proposal.
276    async fn consensus_proposal(
277        &self,
278        dbtx: &mut DatabaseTransaction<'_>,
279    ) -> Vec<MetaConsensusItem> {
280        let desired: Vec<_> = Self::get_desired(dbtx).await;
281
282        let mut to_submit = vec![];
283
284        for (
285            key,
286            MetaDesiredValue {
287                value: desired_value,
288                salt,
289            },
290        ) in desired
291        {
292            let consensus_value = &Self::get_consensus(dbtx, key).await;
293            let consensus_submission_value =
294                Self::get_submission(dbtx, key, self.our_peer_id).await;
295            if consensus_submission_value.as_ref()
296                == Some(&MetaSubmissionValue {
297                    value: desired_value.clone(),
298                    salt,
299                })
300            {
301                // our submission is already registered, nothing to do
302            } else if consensus_value.as_ref() == Some(&desired_value) {
303                if consensus_submission_value.is_none() {
304                    // our desired value is equal to consensus and cleared our
305                    // submission (as it is equal the
306                    // consensus) so we don't need to propose it
307                } else {
308                    // we want to submit the same value as the current consensus, usually
309                    // to clear the previous submission that did not became the consensus (we were
310                    // outvoted)
311                    to_submit.push(MetaConsensusItem {
312                        key,
313                        value: desired_value,
314                        salt,
315                    });
316                }
317            } else {
318                to_submit.push(MetaConsensusItem {
319                    key,
320                    value: desired_value,
321                    salt,
322                });
323            }
324        }
325
326        trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
327        to_submit
328    }
329
330    /// BUG: This implementation fails to return an `Err` on redundant consensus
331    /// items. If you are using this code as a template,
332    /// make sure to read the [`ServerModule::process_consensus_item`]
333    /// documentation,
334    async fn process_consensus_item<'a, 'b>(
335        &'a self,
336        dbtx: &mut DatabaseTransaction<'b>,
337        MetaConsensusItem { key, value, salt }: MetaConsensusItem,
338        peer_id: PeerId,
339    ) -> anyhow::Result<()> {
340        trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
341
342        let new_value = MetaSubmissionValue { salt, value };
343        // first of all: any new submission overrides previous submission
344        if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await {
345            if prev_value != new_value {
346                dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
347                    .await;
348            }
349        }
350        // then: if the submission is equal to the current consensus, it's ignored
351        if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
352            debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
353            return Ok(());
354        }
355
356        // otherwise, new submission is recorded
357        dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
358            .await;
359
360        // we check how many peers submitted the same value (including this peer)
361        let matching_submissions: Vec<PeerId> = dbtx
362            .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
363            .await
364            .filter(|(_submission_key, submission_value)| {
365                future::ready(new_value.value == submission_value.value)
366            })
367            .map(|(submission_key, _)| submission_key.peer_id)
368            .collect()
369            .await;
370
371        let threshold = self.num_peers.threshold();
372        info!(target: LOG_MODULE_META,
373             %peer_id,
374             %key,
375            value_len = %new_value.value.as_slice().len(),
376             matching = %matching_submissions.len(),
377            %threshold, "Peer submitted a value");
378
379        // if threshold or more, change the consensus value
380        if threshold <= matching_submissions.len() {
381            Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
382        }
383
384        Ok(())
385    }
386
387    async fn process_input<'a, 'b, 'c>(
388        &'a self,
389        _dbtx: &mut DatabaseTransaction<'c>,
390        _input: &'b MetaInput,
391        _in_point: InPoint,
392    ) -> Result<InputMeta, MetaInputError> {
393        Err(MetaInputError::NotSupported)
394    }
395
396    async fn process_output<'a, 'b>(
397        &'a self,
398        _dbtx: &mut DatabaseTransaction<'b>,
399        _output: &'a MetaOutput,
400        _out_point: OutPoint,
401    ) -> Result<TransactionItemAmount, MetaOutputError> {
402        Err(MetaOutputError::NotSupported)
403    }
404
405    async fn output_status(
406        &self,
407        _dbtx: &mut DatabaseTransaction<'_>,
408        _out_point: OutPoint,
409    ) -> Option<MetaOutputOutcome> {
410        None
411    }
412
413    async fn audit(
414        &self,
415        _dbtx: &mut DatabaseTransaction<'_>,
416        _audit: &mut Audit,
417        _module_instance_id: ModuleInstanceId,
418    ) {
419    }
420
421    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
422        vec![
423            api_endpoint! {
424                SUBMIT_ENDPOINT,
425                ApiVersion::new(0, 0),
426                async |module: &Meta, context, request: SubmitRequest| -> () {
427
428                    match context.request_auth() {
429                        None => return Err(ApiError::bad_request("Missing password".to_string())),
430                        Some(auth) => {
431                            module.handle_submit_request(&mut context.dbtx(), &auth, &request).await?;
432                        }
433                    }
434
435                    Ok(())
436                }
437            },
438            api_endpoint! {
439                GET_CONSENSUS_ENDPOINT,
440                ApiVersion::new(0, 0),
441                async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
442                    module.handle_get_consensus_request(&mut context.dbtx().into_nc(), &request).await
443                }
444            },
445            api_endpoint! {
446                GET_CONSENSUS_REV_ENDPOINT,
447                ApiVersion::new(0, 0),
448                async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
449                    module.handle_get_consensus_revision_request(&mut context.dbtx().into_nc(), &request).await
450                }
451            },
452            api_endpoint! {
453                GET_SUBMISSIONS_ENDPOINT,
454                ApiVersion::new(0, 0),
455                async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
456                    match context.request_auth() {
457                        None => return Err(ApiError::bad_request("Missing password".to_string())),
458                        Some(auth) => {
459                            module.handle_get_submissions_request(&mut context.dbtx().into_nc(),&auth, &request).await
460                        }
461                    }
462                }
463            },
464        ]
465    }
466}
467
468impl Meta {
469    async fn handle_submit_request(
470        &self,
471        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
472        _auth: &ApiAuth,
473        req: &SubmitRequest,
474    ) -> Result<(), ApiError> {
475        let salt = thread_rng().gen();
476
477        info!(target: LOG_MODULE_META,
478             key = %req.key,
479             peer_id = %self.our_peer_id,
480             value_len = %req.value.as_slice().len(),
481             "Our own guardian submitted a value");
482
483        dbtx.insert_entry(
484            &MetaDesiredKey(req.key),
485            &MetaDesiredValue {
486                value: req.value.clone(),
487                salt,
488            },
489        )
490        .await;
491
492        Ok(())
493    }
494
495    async fn handle_get_consensus_request(
496        &self,
497        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
498        req: &GetConsensusRequest,
499    ) -> Result<Option<MetaConsensusValue>, ApiError> {
500        Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
501    }
502
503    async fn handle_get_consensus_revision_request(
504        &self,
505        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
506        req: &GetConsensusRequest,
507    ) -> Result<Option<u64>, ApiError> {
508        Ok(dbtx
509            .get_value(&MetaConsensusKey(req.0))
510            .await
511            .map(|cv| cv.revision))
512    }
513
514    async fn handle_get_submissions_request(
515        &self,
516        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
517        _auth: &ApiAuth,
518        req: &GetSubmissionsRequest,
519    ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
520        Ok(dbtx
521            .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
522            .await
523            .collect::<Vec<_>>()
524            .await
525            .into_iter()
526            .map(|(k, v)| (k.peer_id, v.value))
527            .collect())
528    }
529}