tokio_retry/
future.rs

1use std::cmp;
2use std::error;
3use std::fmt;
4use std::future::Future;
5use std::iter::{IntoIterator, Iterator};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use pin_project::pin_project;
10use tokio::time::{sleep_until, Duration, Instant, Sleep};
11
12use super::action::Action;
13use super::condition::Condition;
14
15#[pin_project(project = RetryStateProj)]
16enum RetryState<A>
17where
18    A: Action,
19{
20    Running(#[pin] A::Future),
21    Sleeping(#[pin] Sleep),
22}
23
24impl<A: Action> RetryState<A> {
25    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> RetryFuturePoll<A> {
26        match self.project() {
27            RetryStateProj::Running(future) => RetryFuturePoll::Running(future.poll(cx)),
28            RetryStateProj::Sleeping(future) => RetryFuturePoll::Sleeping(future.poll(cx)),
29        }
30    }
31}
32
33enum RetryFuturePoll<A>
34where
35    A: Action,
36{
37    Running(Poll<Result<A::Item, A::Error>>),
38    Sleeping(Poll<()>),
39}
40
41/// Future that drives multiple attempts at an action via a retry strategy.
42#[pin_project]
43pub struct Retry<I, A>
44where
45    I: Iterator<Item = Duration>,
46    A: Action,
47{
48    #[pin]
49    retry_if: RetryIf<I, A, fn(&A::Error) -> bool>,
50}
51
52impl<I, A> Retry<I, A>
53where
54    I: Iterator<Item = Duration>,
55    A: Action,
56{
57    pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(
58        strategy: T,
59        action: A,
60    ) -> Retry<I, A> {
61        Retry {
62            retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool),
63        }
64    }
65}
66
67impl<I, A> Future for Retry<I, A>
68where
69    I: Iterator<Item = Duration>,
70    A: Action,
71{
72    type Output = Result<A::Item, A::Error>;
73
74    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
75        let this = self.project();
76        this.retry_if.poll(cx)
77    }
78}
79
80/// Future that drives multiple attempts at an action via a retry strategy. Retries are only attempted if
81/// the `Error` returned by the future satisfies a given condition.
82#[pin_project]
83pub struct RetryIf<I, A, C>
84where
85    I: Iterator<Item = Duration>,
86    A: Action,
87    C: Condition<A::Error>,
88{
89    strategy: I,
90    #[pin]
91    state: RetryState<A>,
92    action: A,
93    condition: C,
94}
95
96impl<I, A, C> RetryIf<I, A, C>
97where
98    I: Iterator<Item = Duration>,
99    A: Action,
100    C: Condition<A::Error>,
101{
102    pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(
103        strategy: T,
104        mut action: A,
105        condition: C,
106    ) -> RetryIf<I, A, C> {
107        RetryIf {
108            strategy: strategy.into_iter(),
109            state: RetryState::Running(action.run()),
110            action: action,
111            condition: condition,
112        }
113    }
114
115    fn attempt(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<A::Item, A::Error>> {
116        let future = {
117            let mut this = self.as_mut().project();
118            this.action.run()
119        };
120        self.as_mut()
121            .project()
122            .state
123            .set(RetryState::Running(future));
124        self.poll(cx)
125    }
126
127    fn retry(
128        mut self: Pin<&mut Self>,
129        err: A::Error,
130        cx: &mut Context,
131    ) -> Result<Poll<Result<A::Item, A::Error>>, A::Error> {
132        match self.as_mut().project().strategy.next() {
133            None => Err(err),
134            Some(duration) => {
135                let deadline = Instant::now() + duration;
136                let future = sleep_until(deadline);
137                self.as_mut()
138                    .project()
139                    .state
140                    .set(RetryState::Sleeping(future));
141                Ok(self.poll(cx))
142            }
143        }
144    }
145}
146
147impl<I, A, C> Future for RetryIf<I, A, C>
148where
149    I: Iterator<Item = Duration>,
150    A: Action,
151    C: Condition<A::Error>,
152{
153    type Output = Result<A::Item, A::Error>;
154
155    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
156        match self.as_mut().project().state.poll(cx) {
157            RetryFuturePoll::Running(poll_result) => match poll_result {
158                Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)),
159                Poll::Pending => Poll::Pending,
160                Poll::Ready(Err(err)) => {
161                    if self.as_mut().project().condition.should_retry(&err) {
162                        match self.retry(err, cx) {
163                            Ok(poll) => poll,
164                            Err(err) => Poll::Ready(Err(err)),
165                        }
166                    } else {
167                        Poll::Ready(Err(err))
168                    }
169                }
170            },
171            RetryFuturePoll::Sleeping(poll_result) => match poll_result {
172                Poll::Pending => Poll::Pending,
173                Poll::Ready(_) => self.attempt(cx),
174            },
175        }
176    }
177}