use std::io;
use std::io::{Read, Result};
use crate::fixed::FixedInt;
use crate::varint::{VarInt, VarIntMaxSize, MSB};
#[cfg(feature = "tokio_async")]
use tokio::io::{AsyncRead, AsyncReadExt};
#[cfg(feature = "futures_async")]
use futures_util::{io::AsyncRead, io::AsyncReadExt};
pub trait VarIntReader {
fn read_varint<VI: VarInt>(&mut self) -> Result<VI>;
}
#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
#[async_trait::async_trait(?Send)]
pub trait VarIntAsyncReader {
async fn read_varint_async<VI: VarInt>(&mut self) -> Result<VI>;
}
#[derive(Default)]
pub struct VarIntProcessor {
buf: [u8; 10],
maxsize: usize,
i: usize,
}
impl VarIntProcessor {
fn new<VI: VarIntMaxSize>() -> VarIntProcessor {
VarIntProcessor {
maxsize: VI::varint_max_size(),
..VarIntProcessor::default()
}
}
fn push(&mut self, b: u8) -> Result<()> {
if self.i >= self.maxsize {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unterminated varint",
));
}
self.buf[self.i] = b;
self.i += 1;
Ok(())
}
fn finished(&self) -> bool {
self.i > 0 && (self.buf[self.i - 1] & MSB == 0)
}
fn decode<VI: VarInt>(&self) -> Option<VI> {
Some(VI::decode_var(&self.buf[0..self.i])?.0)
}
}
#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
#[async_trait::async_trait(?Send)]
impl<AR: AsyncRead + Unpin> VarIntAsyncReader for AR {
async fn read_varint_async<VI: VarInt>(&mut self) -> Result<VI> {
let mut buf = [0_u8; 1];
let mut p = VarIntProcessor::new::<VI>();
while !p.finished() {
let read = self.read(&mut buf).await?;
if read == 0 && p.i == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
}
if read == 0 {
break;
}
p.push(buf[0])?;
}
p.decode()
.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"))
}
}
impl<R: Read> VarIntReader for R {
fn read_varint<VI: VarInt>(&mut self) -> Result<VI> {
let mut buf = [0_u8; 1];
let mut p = VarIntProcessor::new::<VI>();
while !p.finished() {
let read = self.read(&mut buf)?;
if read == 0 && p.i == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
}
if read == 0 {
break;
}
p.push(buf[0])?;
}
p.decode()
.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"))
}
}
pub trait FixedIntReader {
fn read_fixedint<FI: FixedInt>(&mut self) -> Result<FI>;
}
#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
#[async_trait::async_trait(?Send)]
pub trait FixedIntAsyncReader {
async fn read_fixedint_async<FI: FixedInt>(&mut self) -> Result<FI>;
}
#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
#[async_trait::async_trait(?Send)]
impl<AR: AsyncRead + Unpin> FixedIntAsyncReader for AR {
async fn read_fixedint_async<FI: FixedInt>(&mut self) -> Result<FI> {
let mut buf = [0_u8; 8];
self.read_exact(&mut buf[0..std::mem::size_of::<FI>()])
.await?;
Ok(FI::decode_fixed(&buf[0..std::mem::size_of::<FI>()]).unwrap())
}
}
impl<R: Read> FixedIntReader for R {
fn read_fixedint<FI: FixedInt>(&mut self) -> Result<FI> {
let mut buf = [0_u8; 8];
self.read_exact(&mut buf[0..std::mem::size_of::<FI>()])?;
Ok(FI::decode_fixed(&buf[0..std::mem::size_of::<FI>()]).unwrap())
}
}