use std::io::Cursor;
use async_trait::async_trait;
use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
use crate::db::AlephUnitsKey;
use crate::LOG_CONSENSUS;
pub async fn load_session(db: Database) -> (Cursor<Vec<u8>>, UnitSaver) {
let mut buffer = vec![];
let mut units_index = 0;
let mut dbtx = db.begin_transaction().await;
while let Some(bytes) = dbtx.get_value(&AlephUnitsKey(units_index)).await {
buffer.extend(bytes);
units_index += 1;
}
std::mem::drop(dbtx);
if !buffer.is_empty() {
tracing::info!(target: LOG_CONSENSUS, buffer_len = %buffer.len(), "Recovering from an in-session-shutdown");
}
let unit_loader = Cursor::new(buffer);
let unit_saver = UnitSaver::new(db, units_index);
(unit_loader, unit_saver)
}
pub struct UnitSaver {
db: Database,
units_index: u64,
}
impl UnitSaver {
fn new(db: Database, units_index: u64) -> Self {
Self { db, units_index }
}
}
#[async_trait]
impl aleph_bft::BackupWriter for UnitSaver {
async fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
let mut dbtx = self.db.begin_transaction().await;
dbtx.insert_new_entry(&AlephUnitsKey(self.units_index), &data.to_owned())
.await;
dbtx.commit_tx_result()
.await
.expect("This is the only place where we write to this key");
self.units_index += 1;
Ok(())
}
}