use crate::{
map::{ChainedMapper, MapService, Mapper},
Service, ServiceConnection,
};
use futures_lite::Stream;
use futures_sink::Sink;
use pin_project::pin_project;
use std::{
fmt::Debug,
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
pub type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;
#[derive(Debug)]
pub struct RpcClient<S, C, SInner = S> {
pub(crate) source: C,
pub(crate) map: Arc<dyn MapService<S, SInner>>,
}
impl<S, C: Clone, SInner> Clone for RpcClient<S, C, SInner> {
fn clone(&self) -> Self {
Self {
source: self.source.clone(),
map: Arc::clone(&self.map),
}
}
}
#[pin_project]
#[derive(Debug)]
pub struct UpdateSink<S, C, T, SInner = S>(
#[pin] pub C::SendSink,
pub PhantomData<T>,
pub Arc<dyn MapService<S, SInner>>,
)
where
S: Service,
SInner: Service,
C: ServiceConnection<S>,
T: Into<SInner::Req>;
impl<S, C, T, SInner> Sink<T> for UpdateSink<S, C, T, SInner>
where
S: Service,
SInner: Service,
C: ServiceConnection<S>,
T: Into<SInner::Req>,
{
type Error = C::SendError;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let req = self.2.req_into_outer(item.into());
self.project().0.start_send(req)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_close(cx)
}
}
impl<S, C> RpcClient<S, C, S>
where
S: Service,
C: ServiceConnection<S>,
{
pub fn new(source: C) -> Self {
Self {
source,
map: Arc::new(Mapper::new()),
}
}
}
impl<S, C, SInner> RpcClient<S, C, SInner>
where
S: Service,
C: ServiceConnection<S>,
SInner: Service,
{
pub fn into_inner(self) -> C {
self.source
}
pub fn map<SNext>(self) -> RpcClient<S, C, SNext>
where
SNext: Service,
SNext::Req: Into<SInner::Req> + TryFrom<SInner::Req>,
SNext::Res: Into<SInner::Res> + TryFrom<SInner::Res>,
{
let map = ChainedMapper::new(self.map);
RpcClient {
source: self.source,
map: Arc::new(map),
}
}
}
impl<S, C, SInner> AsRef<C> for RpcClient<S, C, SInner>
where
S: Service,
C: ServiceConnection<S>,
SInner: Service,
{
fn as_ref(&self) -> &C {
&self.source
}
}
#[pin_project]
pub(crate) struct DeferDrop<S: Stream, X>(#[pin] pub S, pub X);
impl<S: Stream, X> Stream for DeferDrop<S, X> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
}
}