1use ::kvdb::{DBTransaction, KeyValueDB};
20
21use crate::{error, Change, ColumnId, Database, Transaction};
22
23struct DbAdapter<D: KeyValueDB + 'static>(D);
24
25fn handle_err<T>(result: std::io::Result<T>) -> T {
26 match result {
27 Ok(r) => r,
28 Err(e) => {
29 panic!("Critical database error: {:?}", e);
30 },
31 }
32}
33
34pub fn as_database<D, H>(db: D) -> std::sync::Arc<dyn Database<H>>
36where
37 D: KeyValueDB + 'static,
38 H: Clone + AsRef<[u8]>,
39{
40 std::sync::Arc::new(DbAdapter(db))
41}
42
43impl<D: KeyValueDB> DbAdapter<D> {
44 fn read_counter(&self, col: ColumnId, key: &[u8]) -> error::Result<(Vec<u8>, Option<u32>)> {
46 let mut counter_key = key.to_vec();
48 counter_key.push(0);
49 Ok(match self.0.get(col, &counter_key).map_err(|e| error::DatabaseError(Box::new(e)))? {
50 Some(data) => {
51 let mut counter_data = [0; 4];
52 if data.len() != 4 {
53 return Err(error::DatabaseError(Box::new(std::io::Error::new(
54 std::io::ErrorKind::Other,
55 format!("Unexpected counter len {}", data.len()),
56 ))))
57 }
58 counter_data.copy_from_slice(&data);
59 let counter = u32::from_le_bytes(counter_data);
60 (counter_key, Some(counter))
61 },
62 None => (counter_key, None),
63 })
64 }
65}
66
67impl<D: KeyValueDB, H: Clone + AsRef<[u8]>> Database<H> for DbAdapter<D> {
68 fn commit(&self, transaction: Transaction<H>) -> error::Result<()> {
69 let mut tx = DBTransaction::new();
70 for change in transaction.0.into_iter() {
71 match change {
72 Change::Set(col, key, value) => tx.put_vec(col, &key, value),
73 Change::Remove(col, key) => tx.delete(col, &key),
74 Change::Store(col, key, value) => match self.read_counter(col, key.as_ref())? {
75 (counter_key, Some(mut counter)) => {
76 counter += 1;
77 tx.put(col, &counter_key, &counter.to_le_bytes());
78 },
79 (counter_key, None) => {
80 let d = 1u32.to_le_bytes();
81 tx.put(col, &counter_key, &d);
82 tx.put_vec(col, key.as_ref(), value);
83 },
84 },
85 Change::Reference(col, key) => {
86 if let (counter_key, Some(mut counter)) =
87 self.read_counter(col, key.as_ref())?
88 {
89 counter += 1;
90 tx.put(col, &counter_key, &counter.to_le_bytes());
91 }
92 },
93 Change::Release(col, key) => {
94 if let (counter_key, Some(mut counter)) =
95 self.read_counter(col, key.as_ref())?
96 {
97 counter -= 1;
98 if counter == 0 {
99 tx.delete(col, &counter_key);
100 tx.delete(col, key.as_ref());
101 } else {
102 tx.put(col, &counter_key, &counter.to_le_bytes());
103 }
104 }
105 },
106 }
107 }
108 self.0.write(tx).map_err(|e| error::DatabaseError(Box::new(e)))
109 }
110
111 fn get(&self, col: ColumnId, key: &[u8]) -> Option<Vec<u8>> {
112 handle_err(self.0.get(col, key))
113 }
114
115 fn contains(&self, col: ColumnId, key: &[u8]) -> bool {
116 handle_err(self.0.has_key(col, key))
117 }
118}