tokio_proto/simple/pipeline/
client.rsuse BindClient;
use super::Pipeline;
use super::lift::{LiftBind, LiftTransport};
use simple::LiftProto;
use streaming::{self, Message};
use streaming::pipeline::StreamingPipeline;
use tokio_core::reactor::Handle;
use tokio_service::Service;
use futures::{stream, Stream, Sink, Future, Poll, IntoFuture};
use std::{fmt, io};
type MyStream<E> = stream::Empty<(), E>;
pub trait ClientProto<T: 'static>: 'static {
type Request: 'static;
type Response: 'static;
type Transport: 'static +
Stream<Item = Self::Response, Error = io::Error> +
Sink<SinkItem = Self::Request, SinkError = io::Error>;
type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
fn bind_transport(&self, io: T) -> Self::BindTransport;
}
impl<T: 'static, P: ClientProto<T>> BindClient<Pipeline, T> for P {
type ServiceRequest = P::Request;
type ServiceResponse = P::Response;
type ServiceError = io::Error;
type BindClient = ClientService<T, P>;
fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
ClientService {
inner: BindClient::<StreamingPipeline<MyStream<io::Error>>, T>::bind_client(
LiftProto::from_ref(self), handle, io
)
}
}
}
impl<T, P> streaming::pipeline::ClientProto<T> for LiftProto<P> where
T: 'static, P: ClientProto<T>
{
type Request = P::Request;
type RequestBody = ();
type Response = P::Response;
type ResponseBody = ();
type Error = io::Error;
type Transport = LiftTransport<P::Transport, io::Error>;
type BindTransport = LiftBind<T, <P::BindTransport as IntoFuture>::Future, io::Error>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
LiftBind::lift(ClientProto::bind_transport(self.lower(), io).into_future())
}
}
pub struct ClientService<T, P> where T: 'static, P: ClientProto<T> {
inner: <LiftProto<P> as BindClient<StreamingPipeline<MyStream<io::Error>>, T>>::BindClient
}
impl<T, P> Clone for ClientService<T, P> where T: 'static, P: ClientProto<T> {
fn clone(&self) -> Self {
ClientService {
inner: self.inner.clone(),
}
}
}
impl<T, P> Service for ClientService<T, P> where T: 'static, P: ClientProto<T> {
type Request = P::Request;
type Response = P::Response;
type Error = io::Error;
type Future = ClientFuture<T, P>;
fn call(&self, req: P::Request) -> Self::Future {
ClientFuture {
inner: self.inner.call(Message::WithoutBody(req))
}
}
}
impl<T, P> fmt::Debug for ClientService<T, P>
where T: 'static + fmt::Debug,
P: ClientProto<T> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ClientService {{ ... }}")
}
}
pub struct ClientFuture<T, P> where T: 'static, P: ClientProto<T> {
inner: <<LiftProto<P> as BindClient<StreamingPipeline<MyStream<io::Error>>, T>>::BindClient
as Service>::Future
}
impl<T, P> Future for ClientFuture<T, P> where P: ClientProto<T> {
type Item = P::Response;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match try_ready!(self.inner.poll()) {
Message::WithoutBody(msg) => Ok(msg.into()),
Message::WithBody(..) => panic!("bodies not supported"),
}
}
}
impl<T, P> fmt::Debug for ClientFuture<T, P>
where T: 'static + fmt::Debug,
P: ClientProto<T> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ClientFuture {{ ... }}")
}
}