pub mod engine;
pub mod err;
#[cfg(feature = "protocol-http")]
pub mod headers;
pub mod method;
pub mod opt;
mod conn;
pub use method::query::Response;
use method::BoxFuture;
use semver::Version;
use tokio::sync::watch;
use crate::api::conn::Router;
use crate::api::err::Error;
use crate::api::opt::Endpoint;
use semver::BuildMetadata;
use semver::VersionReq;
use std::fmt;
use std::fmt::Debug;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::OnceLock;
use self::opt::EndpointKind;
use self::opt::WaitFor;
pub type Result<T> = std::result::Result<T, crate::Error>;
type Waiter = (watch::Sender<Option<WaitFor>>, watch::Receiver<Option<WaitFor>>);
const SUPPORTED_VERSIONS: (&str, &str) = (">=1.0.0, <3.0.0", "20230701.55918b7c");
const REVISION_SUPPORTED_SERVER_VERSION: Version = Version::new(1, 2, 0);
pub trait Connection: conn::Connection {}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Connect<C: Connection, Response> {
router: Arc<OnceLock<Router>>,
engine: PhantomData<C>,
address: Result<Endpoint>,
capacity: usize,
waiter: Arc<Waiter>,
response_type: PhantomData<Response>,
}
impl<C, R> Connect<C, R>
where
C: Connection,
{
pub const fn with_capacity(mut self, capacity: usize) -> Self {
self.capacity = capacity;
self
}
}
impl<Client> IntoFuture for Connect<Client, Surreal<Client>>
where
Client: Connection,
{
type Output = Result<Surreal<Client>>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut endpoint = self.address?;
let endpoint_kind = EndpointKind::from(endpoint.url.scheme());
let mut client = Client::connect(endpoint.clone(), self.capacity).await?;
if endpoint_kind.is_remote() {
let mut version = client.version().await?;
version.pre = Default::default();
client.check_server_version(&version).await?;
if version >= REVISION_SUPPORTED_SERVER_VERSION && endpoint_kind.is_ws() {
endpoint.supports_revision = true;
client = Client::connect(endpoint, self.capacity).await?;
}
}
client.waiter.0.send(Some(WaitFor::Connection)).ok();
Ok(client)
})
}
}
impl<Client> IntoFuture for Connect<Client, ()>
where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
if self.router.get().is_some() {
return Err(Error::AlreadyConnected.into());
}
let mut endpoint = self.address?;
let endpoint_kind = EndpointKind::from(endpoint.url.scheme());
let mut client = Client::connect(endpoint.clone(), self.capacity).await?;
if endpoint_kind.is_remote() {
let mut version = client.version().await?;
version.pre = Default::default();
client.check_server_version(&version).await?;
if version >= REVISION_SUPPORTED_SERVER_VERSION && endpoint_kind.is_ws() {
endpoint.supports_revision = true;
client = Client::connect(endpoint, self.capacity).await?;
}
}
let cell =
Arc::into_inner(client.router).expect("new connection to have no references");
let router = cell.into_inner().expect("router to be set");
self.router.set(router).map_err(|_| Error::AlreadyConnected)?;
self.waiter.0.send(Some(WaitFor::Connection)).ok();
Ok(())
})
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub(crate) enum ExtraFeatures {
Backup,
LiveQueries,
}
pub struct Surreal<C: Connection> {
router: Arc<OnceLock<Router>>,
waiter: Arc<Waiter>,
engine: PhantomData<C>,
}
impl<C> Surreal<C>
where
C: Connection,
{
pub(crate) fn new_from_router_waiter(
router: Arc<OnceLock<Router>>,
waiter: Arc<Waiter>,
) -> Self {
Surreal {
router,
waiter,
engine: PhantomData,
}
}
async fn check_server_version(&self, version: &Version) -> Result<()> {
let (versions, build_meta) = SUPPORTED_VERSIONS;
let req = VersionReq::parse(versions).expect("valid supported versions");
let build_meta = BuildMetadata::new(build_meta).expect("valid supported build metadata");
let server_build = &version.build;
if !req.matches(version) {
return Err(Error::VersionMismatch {
server_version: version.clone(),
supported_versions: versions.to_owned(),
}
.into());
} else if !server_build.is_empty() && server_build < &build_meta {
return Err(Error::BuildMetadataMismatch {
server_metadata: server_build.clone(),
supported_metadata: build_meta,
}
.into());
}
Ok(())
}
}
impl<C> Clone for Surreal<C>
where
C: Connection,
{
fn clone(&self) -> Self {
Self {
router: self.router.clone(),
waiter: self.waiter.clone(),
engine: self.engine,
}
}
}
impl<C> Debug for Surreal<C>
where
C: Connection,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Surreal")
.field("router", &self.router)
.field("engine", &self.engine)
.finish()
}
}
trait OnceLockExt {
fn with_value(value: Router) -> OnceLock<Router> {
let cell = OnceLock::new();
match cell.set(value) {
Ok(()) => cell,
Err(_) => unreachable!("don't have exclusive access to `cell`"),
}
}
fn extract(&self) -> Result<&Router>;
}
impl OnceLockExt for OnceLock<Router> {
fn extract(&self) -> Result<&Router> {
let router = self.get().ok_or(Error::ConnectionUninitialised)?;
Ok(router)
}
}