use crate::error::BodyError as Error;
use alloc::borrow::ToOwned;
use alloc::boxed::Box;
use alloc::string::{String, ToString};
use alloc::sync::Arc;
use core::convert::Infallible;
use core::ops::{Deref, DerefMut};
use bytes::Bytes;
use futures::{Stream, TryStream, TryStreamExt};
use http_body::{Body, Frame, SizeHint};
use http_body_util::{BodyExt, BodyStream, Collected, Empty, Full, StreamBody};
use product_os_http::{Request, Response};
#[cfg(feature = "incoming")]
use hyper::body::Incoming;
#[cfg(feature = "box_body")]
use http_body_util::combinators::BoxBody;
#[cfg(feature = "std")]
use std::{pin::Pin, task::Poll};
use crate::BodyError;
#[derive(Debug)]
enum Internal {
Collected(Collected<Bytes>),
Empty(Empty<Bytes>),
Full(Full<Bytes>),
String(String),
#[cfg(feature = "box_body")]
BoxBody(BoxBody<Bytes, Error>),
#[cfg(feature = "incoming")]
Incoming(Incoming),
}
#[derive(Debug)]
pub struct BodyBytes {
inner: Internal,
}
impl BodyBytes {
pub fn empty() -> Self {
Self::from(Empty::new())
}
pub fn new(bytes: Bytes) -> Self {
Self {
inner: Internal::Full(Full::new(bytes)),
}
}
pub async fn convert_to_bytes(self) -> Result<Bytes, Error> {
match self.inner {
Internal::String(s) => {
Ok(Bytes::from(s.to_owned()))
},
Internal::Empty(e) => {
let bytes = e.collect().await.unwrap().to_bytes();
Ok(bytes)
},
Internal::Full(f) => {
let bytes = f.collect().await.unwrap().to_bytes();
Ok(bytes)
}
Internal::Collected(c) => {
let bytes = c.collect().await.unwrap().to_bytes();
Ok(bytes)
},
#[cfg(feature = "box_body")]
Internal::BoxBody(b) => {
let bytes = b.collect().await.unwrap().to_bytes();
Ok(bytes)
},
#[cfg(feature = "incoming")]
Internal::Incoming(i) => {
let bytes = i.collect().await.unwrap().to_bytes();
Ok(bytes)
},
}
}
pub async fn as_bytes(&mut self) -> Result<Bytes, Error> {
match &mut self.inner {
Internal::String(s) => {
Ok(Bytes::from(s.to_owned()))
},
Internal::Empty(e) => {
let bytes = e.collect().await.unwrap().to_bytes();
self.inner = Internal::Empty(Empty::new());
Ok(bytes)
},
Internal::Full(f) => {
let bytes = f.collect().await.unwrap().to_bytes();
self.inner = Internal::Full(Full::new(bytes.to_owned()));
Ok(bytes)
}
Internal::Collected(c) => {
let bytes = c.collect().await.unwrap().to_bytes();
self.inner = Internal::Collected(Full::new(bytes.to_owned()).collect().await.unwrap());
Ok(bytes)
},
#[cfg(feature = "box_body")]
Internal::BoxBody(b) => {
Err(Error::DecodeToBytes(String::from("Box body not yet implemented")))
},
#[cfg(feature = "incoming")]
Internal::Incoming(i) => {
Err(Error::DecodeToBytes(String::from("Incoming not yet implemented")))
},
}
}
#[cfg(feature = "box_body")]
pub fn from_stream<S>(stream: S) -> Self
where
S: TryStream + Send + Sync + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<Error>,
{
Self {
inner: Internal::BoxBody(BoxBody::new(StreamBody::new(
stream
.map_ok(Into::into)
.map_ok(Frame::data)
.map_err(Into::into),
))),
}
}
}
impl Body for BodyBytes {
type Data = Bytes;
type Error = Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match &mut self.inner {
Internal::Collected(body) => Pin::new(body).poll_frame(cx).map_err(|e| match e {}),
Internal::Empty(body) => Pin::new(body).poll_frame(cx).map_err(|e| match e {}),
Internal::Full(body) => Pin::new(body).poll_frame(cx).map_err(|e| match e {}),
Internal::String(body) => Pin::new(body).poll_frame(cx).map_err(|e| match e {}),
#[cfg(feature = "box_body")]
Internal::BoxBody(body) => Pin::new(body).poll_frame(cx),
#[cfg(feature = "incoming")]
Internal::Incoming(body) => Pin::new(body).poll_frame(cx).map_err(|e| match e { _ => { BodyError::Network(e.to_string()) }}),
}
}
fn is_end_stream(&self) -> bool {
match &self.inner {
Internal::Collected(body) => body.is_end_stream(),
Internal::Empty(body) => body.is_end_stream(),
Internal::Full(body) => body.is_end_stream(),
Internal::String(body) => body.is_end_stream(),
#[cfg(feature = "box_body")]
Internal::BoxBody(body) => body.is_end_stream(),
#[cfg(feature = "incoming")]
Internal::Incoming(body) => body.is_end_stream(),
}
}
fn size_hint(&self) -> SizeHint {
match &self.inner {
Internal::Collected(body) => body.size_hint(),
Internal::Empty(body) => body.size_hint(),
Internal::Full(body) => body.size_hint(),
Internal::String(body) => body.size_hint(),
#[cfg(feature = "box_body")]
Internal::BoxBody(body) => body.size_hint(),
#[cfg(feature = "incoming")]
Internal::Incoming(body) => body.size_hint(),
}
}
}
impl From<Collected<Bytes>> for BodyBytes {
fn from(value: Collected<Bytes>) -> Self {
Self {
inner: Internal::Collected(value),
}
}
}
impl From<Empty<Bytes>> for BodyBytes {
fn from(value: Empty<Bytes>) -> Self {
Self {
inner: Internal::Empty(value),
}
}
}
impl From<Full<Bytes>> for BodyBytes {
fn from(value: Full<Bytes>) -> Self {
Self {
inner: Internal::Full(value),
}
}
}
#[cfg(feature = "box_body")]
impl From<BoxBody<Bytes, Error>> for BodyBytes {
fn from(value: BoxBody<Bytes, Error>) -> Self {
Self {
inner: Internal::BoxBody(value),
}
}
}
#[cfg(feature = "incoming")]
impl From<Incoming> for BodyBytes {
fn from(value: Incoming) -> Self {
Self {
inner: Internal::Incoming(value),
}
}
}
impl From<String> for BodyBytes {
fn from(value: String) -> Self {
Self {
inner: Internal::String(value),
}
}
}
impl From<&'static str> for BodyBytes {
fn from(value: &'static str) -> Self {
Self {
inner: Internal::Full(Full::new(Bytes::from_static(value.as_bytes()))),
}
}
}
impl From<&'static [u8]> for BodyBytes {
fn from(value: &'static [u8]) -> Self {
Self {
inner: Internal::Full(Full::new(Bytes::from_static(value))),
}
}
}
impl<T> From<Request<T>> for BodyBytes
where
T: Into<BodyBytes>,
{
fn from(value: Request<T>) -> Self {
value.into_body().into()
}
}
impl<T> From<Response<T>> for BodyBytes
where
T: Into<BodyBytes>,
{
fn from(value: Response<T>) -> Self {
value.into_body().into()
}
}
#[cfg(feature = "box_body")]
impl<S> From<StreamBody<S>> for BodyBytes
where
S: Stream<Item = Result<Frame<Bytes>, Error>> + Send + Sync + 'static,
{
fn from(value: StreamBody<S>) -> Self {
Self {
inner: Internal::BoxBody(BoxBody::new(value)),
}
}
}
impl Clone for BodyBytes {
fn clone(&self) -> Self {
match &self.inner {
Internal::Empty(body) => {
Self {
inner: Internal::Empty(body.to_owned())
}
}
Internal::Full(body) => {
Self {
inner: Internal::Full(body.to_owned())
}
}
Internal::String(body) => {
Self {
inner: Internal::String(body.to_owned())
}
}
#[cfg(feature = "box_body")]
Internal::BoxBody(body) => {
panic!("Clone not yet implemented")
}
#[cfg(feature = "incoming")]
Internal::Incoming(body) => {
panic!("Clone not yet implemented")
}
_ => panic!("Unable to handle clone")
}
}
}