use crate::{AsyncBincodeReader, AsyncBincodeWriter};
use crate::{AsyncDestination, SyncDestination};
use futures_core::Stream;
use futures_sink::Sink;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io};
use tokio::io::AsyncRead;
#[derive(Debug)]
pub struct AsyncBincodeStream<S, R, W, D> {
stream: AsyncBincodeReader<InternalAsyncWriter<S, W, D>, R>,
}
#[doc(hidden)]
pub struct InternalAsyncWriter<S, T, D>(AsyncBincodeWriter<S, T, D>);
impl<S: fmt::Debug, T, D> fmt::Debug for InternalAsyncWriter<S, T, D> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.get_ref().fmt(f)
}
}
impl<S, R, W> Default for AsyncBincodeStream<S, R, W, SyncDestination>
where
S: Default,
{
fn default() -> Self {
Self::from(S::default())
}
}
impl<S, R, W, D> AsyncBincodeStream<S, R, W, D> {
pub fn get_ref(&self) -> &S {
&self.stream.get_ref().0.get_ref()
}
pub fn get_mut(&mut self) -> &mut S {
self.stream.get_mut().0.get_mut()
}
pub fn into_inner(self) -> S {
self.stream.into_inner().0.into_inner()
}
}
impl<S, R, W> From<S> for AsyncBincodeStream<S, R, W, SyncDestination> {
fn from(stream: S) -> Self {
AsyncBincodeStream {
stream: AsyncBincodeReader::from(InternalAsyncWriter(AsyncBincodeWriter::from(stream))),
}
}
}
impl<S, R, W, D> AsyncBincodeStream<S, R, W, D> {
pub fn for_async(self) -> AsyncBincodeStream<S, R, W, AsyncDestination> {
let stream = self.into_inner();
AsyncBincodeStream {
stream: AsyncBincodeReader::from(InternalAsyncWriter(
AsyncBincodeWriter::from(stream).for_async(),
)),
}
}
pub fn for_sync(self) -> AsyncBincodeStream<S, R, W, SyncDestination> {
AsyncBincodeStream::from(self.into_inner())
}
}
impl<R, W, D> AsyncBincodeStream<tokio::net::TcpStream, R, W, D> {
pub fn tcp_split(
&mut self,
) -> (
AsyncBincodeReader<tokio::net::tcp::ReadHalf, R>,
AsyncBincodeWriter<tokio::net::tcp::WriteHalf, W, D>,
) {
let rbuff = self.stream.buffer.split();
let writer = &mut self.stream.get_mut().0;
let wbuff = writer.buffer.split_off(0);
let wsize = writer.written;
let (r, w) = writer.get_mut().split();
let mut reader = AsyncBincodeReader::from(r);
reader.buffer = rbuff;
let mut writer: AsyncBincodeWriter<_, _, D> = AsyncBincodeWriter::from(w).make_for();
writer.buffer = wbuff;
writer.written = wsize;
(reader, writer)
}
}
impl<S, T, D> AsyncRead for InternalAsyncWriter<S, T, D>
where
S: AsyncRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(self.get_mut().get_mut()).poll_read(cx, buf)
}
}
impl<S, T, D> Deref for InternalAsyncWriter<S, T, D> {
type Target = AsyncBincodeWriter<S, T, D>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<S, T, D> DerefMut for InternalAsyncWriter<S, T, D> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<S, R, W, D> Stream for AsyncBincodeStream<S, R, W, D>
where
S: Unpin,
AsyncBincodeReader<InternalAsyncWriter<S, W, D>, R>: Stream<Item = Result<R, bincode::Error>>,
{
type Item = Result<R, bincode::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}
impl<S, R, W, D> Sink<W> for AsyncBincodeStream<S, R, W, D>
where
S: Unpin,
AsyncBincodeWriter<S, W, D>: Sink<W, Error = bincode::Error>,
{
type Error = bincode::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut **self.stream.get_mut()).poll_ready(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: W) -> Result<(), Self::Error> {
Pin::new(&mut **self.stream.get_mut()).start_send(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut **self.stream.get_mut()).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut **self.stream.get_mut()).poll_close(cx)
}
}