1use crate::{
4 error::{Closed, Error},
5 message,
6};
7use futures_core::ready;
8use pin_project::{pin_project, project};
9use std::{
10 future::Future,
11 pin::Pin,
12 task::{Context, Poll},
13};
14
15#[pin_project]
17#[derive(Debug)]
18pub struct ResponseFuture<T> {
19 #[pin]
20 state: ResponseState<T>,
21}
22
23#[pin_project]
24#[derive(Debug)]
25enum ResponseState<T> {
26 Failed(Option<Error>),
27 Rx(#[pin] message::Rx<T>),
28 Poll(#[pin] T),
29}
30
31impl<T> ResponseFuture<T> {
32 pub(crate) fn new(rx: message::Rx<T>) -> Self {
33 ResponseFuture {
34 state: ResponseState::Rx(rx),
35 }
36 }
37
38 pub(crate) fn failed(err: Error) -> Self {
39 ResponseFuture {
40 state: ResponseState::Failed(Some(err)),
41 }
42 }
43}
44
45impl<F, T, E> Future for ResponseFuture<F>
46where
47 F: Future<Output = Result<T, E>>,
48 E: Into<Error>,
49{
50 type Output = Result<T, Error>;
51
52 #[project]
53 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54 let mut this = self.project();
55
56 loop {
57 #[project]
58 match this.state.as_mut().project() {
59 ResponseState::Failed(e) => {
60 return Poll::Ready(Err(e.take().expect("polled after error")));
61 }
62 ResponseState::Rx(rx) => match ready!(rx.poll(cx)) {
63 Ok(Ok(f)) => this.state.set(ResponseState::Poll(f)),
64 Ok(Err(e)) => return Poll::Ready(Err(e.into())),
65 Err(_) => return Poll::Ready(Err(Closed::new().into())),
66 },
67 ResponseState::Poll(fut) => return fut.poll(cx).map_err(Into::into),
68 }
69 }
70 }
71}