tower_buffer/
future.rs

1//! Future types
2
3use crate::{
4    error::{Closed, Error},
5    message,
6};
7use futures_core::ready;
8use pin_project::{pin_project, project};
9use std::{
10    future::Future,
11    pin::Pin,
12    task::{Context, Poll},
13};
14
15/// Future eventually completed with the response to the original request.
16#[pin_project]
17#[derive(Debug)]
18pub struct ResponseFuture<T> {
19    #[pin]
20    state: ResponseState<T>,
21}
22
23#[pin_project]
24#[derive(Debug)]
25enum ResponseState<T> {
26    Failed(Option<Error>),
27    Rx(#[pin] message::Rx<T>),
28    Poll(#[pin] T),
29}
30
31impl<T> ResponseFuture<T> {
32    pub(crate) fn new(rx: message::Rx<T>) -> Self {
33        ResponseFuture {
34            state: ResponseState::Rx(rx),
35        }
36    }
37
38    pub(crate) fn failed(err: Error) -> Self {
39        ResponseFuture {
40            state: ResponseState::Failed(Some(err)),
41        }
42    }
43}
44
45impl<F, T, E> Future for ResponseFuture<F>
46where
47    F: Future<Output = Result<T, E>>,
48    E: Into<Error>,
49{
50    type Output = Result<T, Error>;
51
52    #[project]
53    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54        let mut this = self.project();
55
56        loop {
57            #[project]
58            match this.state.as_mut().project() {
59                ResponseState::Failed(e) => {
60                    return Poll::Ready(Err(e.take().expect("polled after error")));
61                }
62                ResponseState::Rx(rx) => match ready!(rx.poll(cx)) {
63                    Ok(Ok(f)) => this.state.set(ResponseState::Poll(f)),
64                    Ok(Err(e)) => return Poll::Ready(Err(e.into())),
65                    Err(_) => return Poll::Ready(Err(Closed::new().into())),
66                },
67                ResponseState::Poll(fut) => return fut.poll(cx).map_err(Into::into),
68            }
69        }
70    }
71}