deno_kv 0.85.0

Implementation of the Deno database API
Documentation
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

use std::cell::RefCell;
use std::rc::Rc;

use crate::remote::RemoteDbHandlerPermissions;
use crate::sqlite::SqliteDbHandler;
use crate::sqlite::SqliteDbHandlerPermissions;
use crate::AtomicWrite;
use crate::Database;
use crate::DatabaseHandler;
use crate::QueueMessageHandle;
use crate::ReadRange;
use crate::SnapshotReadOptions;
use async_trait::async_trait;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::OpState;
use denokv_proto::CommitResult;
use denokv_proto::ReadRangeOutput;
use denokv_proto::WatchStream;

pub struct MultiBackendDbHandler {
  backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
}

impl MultiBackendDbHandler {
  pub fn new(
    backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
  ) -> Self {
    Self { backends }
  }

  pub fn remote_or_sqlite<
    P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static,
  >(
    default_storage_dir: Option<std::path::PathBuf>,
    versionstamp_rng_seed: Option<u64>,
    http_options: crate::remote::HttpOptions,
  ) -> Self {
    Self::new(vec![
      (
        &["https://", "http://"],
        Box::new(crate::remote::RemoteDbHandler::<P>::new(http_options)),
      ),
      (
        &[""],
        Box::new(SqliteDbHandler::<P>::new(
          default_storage_dir,
          versionstamp_rng_seed,
        )),
      ),
    ])
  }
}

#[async_trait(?Send)]
impl DatabaseHandler for MultiBackendDbHandler {
  type DB = RcDynamicDb;

  async fn open(
    &self,
    state: Rc<RefCell<OpState>>,
    path: Option<String>,
  ) -> Result<Self::DB, AnyError> {
    for (prefixes, handler) in &self.backends {
      for &prefix in *prefixes {
        if prefix.is_empty() {
          return handler.dyn_open(state.clone(), path.clone()).await;
        }
        let Some(path) = &path else {
          continue;
        };
        if path.starts_with(prefix) {
          return handler.dyn_open(state.clone(), Some(path.clone())).await;
        }
      }
    }
    Err(type_error(format!(
      "No backend supports the given path: {:?}",
      path
    )))
  }
}

#[async_trait(?Send)]
pub trait DynamicDbHandler {
  async fn dyn_open(
    &self,
    state: Rc<RefCell<OpState>>,
    path: Option<String>,
  ) -> Result<RcDynamicDb, AnyError>;
}

#[async_trait(?Send)]
impl DatabaseHandler for Box<dyn DynamicDbHandler> {
  type DB = RcDynamicDb;

  async fn open(
    &self,
    state: Rc<RefCell<OpState>>,
    path: Option<String>,
  ) -> Result<Self::DB, AnyError> {
    (**self).dyn_open(state, path).await
  }
}

#[async_trait(?Send)]
impl<T, DB> DynamicDbHandler for T
where
  T: DatabaseHandler<DB = DB>,
  DB: Database + 'static,
{
  async fn dyn_open(
    &self,
    state: Rc<RefCell<OpState>>,
    path: Option<String>,
  ) -> Result<RcDynamicDb, AnyError> {
    Ok(RcDynamicDb(Rc::new(self.open(state, path).await?)))
  }
}

#[async_trait(?Send)]
pub trait DynamicDb {
  async fn dyn_snapshot_read(
    &self,
    requests: Vec<ReadRange>,
    options: SnapshotReadOptions,
  ) -> Result<Vec<ReadRangeOutput>, AnyError>;

  async fn dyn_atomic_write(
    &self,
    write: AtomicWrite,
  ) -> Result<Option<CommitResult>, AnyError>;

  async fn dyn_dequeue_next_message(
    &self,
  ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;

  fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream;

  fn dyn_close(&self);
}

#[derive(Clone)]
pub struct RcDynamicDb(Rc<dyn DynamicDb>);

#[async_trait(?Send)]
impl Database for RcDynamicDb {
  type QMH = Box<dyn QueueMessageHandle>;

  async fn snapshot_read(
    &self,
    requests: Vec<ReadRange>,
    options: SnapshotReadOptions,
  ) -> Result<Vec<ReadRangeOutput>, AnyError> {
    (*self.0).dyn_snapshot_read(requests, options).await
  }

  async fn atomic_write(
    &self,
    write: AtomicWrite,
  ) -> Result<Option<CommitResult>, AnyError> {
    (*self.0).dyn_atomic_write(write).await
  }

  async fn dequeue_next_message(
    &self,
  ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
    (*self.0).dyn_dequeue_next_message().await
  }

  fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
    (*self.0).dyn_watch(keys)
  }

  fn close(&self) {
    (*self.0).dyn_close()
  }
}

#[async_trait(?Send)]
impl<T, QMH> DynamicDb for T
where
  T: Database<QMH = QMH>,
  QMH: QueueMessageHandle + 'static,
{
  async fn dyn_snapshot_read(
    &self,
    requests: Vec<ReadRange>,
    options: SnapshotReadOptions,
  ) -> Result<Vec<ReadRangeOutput>, AnyError> {
    Ok(self.snapshot_read(requests, options).await?)
  }

  async fn dyn_atomic_write(
    &self,
    write: AtomicWrite,
  ) -> Result<Option<CommitResult>, AnyError> {
    Ok(self.atomic_write(write).await?)
  }

  async fn dyn_dequeue_next_message(
    &self,
  ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
    Ok(
      self
        .dequeue_next_message()
        .await?
        .map(|x| Box::new(x) as Box<dyn QueueMessageHandle>),
    )
  }

  fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
    self.watch(keys)
  }

  fn dyn_close(&self) {
    self.close()
  }
}