1use std::cmp;
2use std::error;
3use std::fmt;
4use std::future::Future;
5use std::iter::{IntoIterator, Iterator};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use pin_project::pin_project;
10use tokio::time::{sleep_until, Duration, Instant, Sleep};
11
12use super::action::Action;
13use super::condition::Condition;
14
15#[pin_project(project = RetryStateProj)]
16enum RetryState<A>
17where
18 A: Action,
19{
20 Running(#[pin] A::Future),
21 Sleeping(#[pin] Sleep),
22}
23
24impl<A: Action> RetryState<A> {
25 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> RetryFuturePoll<A> {
26 match self.project() {
27 RetryStateProj::Running(future) => RetryFuturePoll::Running(future.poll(cx)),
28 RetryStateProj::Sleeping(future) => RetryFuturePoll::Sleeping(future.poll(cx)),
29 }
30 }
31}
32
33enum RetryFuturePoll<A>
34where
35 A: Action,
36{
37 Running(Poll<Result<A::Item, A::Error>>),
38 Sleeping(Poll<()>),
39}
40
41#[pin_project]
43pub struct Retry<I, A>
44where
45 I: Iterator<Item = Duration>,
46 A: Action,
47{
48 #[pin]
49 retry_if: RetryIf<I, A, fn(&A::Error) -> bool>,
50}
51
52impl<I, A> Retry<I, A>
53where
54 I: Iterator<Item = Duration>,
55 A: Action,
56{
57 pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(
58 strategy: T,
59 action: A,
60 ) -> Retry<I, A> {
61 Retry {
62 retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool),
63 }
64 }
65}
66
67impl<I, A> Future for Retry<I, A>
68where
69 I: Iterator<Item = Duration>,
70 A: Action,
71{
72 type Output = Result<A::Item, A::Error>;
73
74 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
75 let this = self.project();
76 this.retry_if.poll(cx)
77 }
78}
79
80#[pin_project]
83pub struct RetryIf<I, A, C>
84where
85 I: Iterator<Item = Duration>,
86 A: Action,
87 C: Condition<A::Error>,
88{
89 strategy: I,
90 #[pin]
91 state: RetryState<A>,
92 action: A,
93 condition: C,
94}
95
96impl<I, A, C> RetryIf<I, A, C>
97where
98 I: Iterator<Item = Duration>,
99 A: Action,
100 C: Condition<A::Error>,
101{
102 pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(
103 strategy: T,
104 mut action: A,
105 condition: C,
106 ) -> RetryIf<I, A, C> {
107 RetryIf {
108 strategy: strategy.into_iter(),
109 state: RetryState::Running(action.run()),
110 action: action,
111 condition: condition,
112 }
113 }
114
115 fn attempt(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<A::Item, A::Error>> {
116 let future = {
117 let mut this = self.as_mut().project();
118 this.action.run()
119 };
120 self.as_mut()
121 .project()
122 .state
123 .set(RetryState::Running(future));
124 self.poll(cx)
125 }
126
127 fn retry(
128 mut self: Pin<&mut Self>,
129 err: A::Error,
130 cx: &mut Context,
131 ) -> Result<Poll<Result<A::Item, A::Error>>, A::Error> {
132 match self.as_mut().project().strategy.next() {
133 None => Err(err),
134 Some(duration) => {
135 let deadline = Instant::now() + duration;
136 let future = sleep_until(deadline);
137 self.as_mut()
138 .project()
139 .state
140 .set(RetryState::Sleeping(future));
141 Ok(self.poll(cx))
142 }
143 }
144 }
145}
146
147impl<I, A, C> Future for RetryIf<I, A, C>
148where
149 I: Iterator<Item = Duration>,
150 A: Action,
151 C: Condition<A::Error>,
152{
153 type Output = Result<A::Item, A::Error>;
154
155 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
156 match self.as_mut().project().state.poll(cx) {
157 RetryFuturePoll::Running(poll_result) => match poll_result {
158 Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)),
159 Poll::Pending => Poll::Pending,
160 Poll::Ready(Err(err)) => {
161 if self.as_mut().project().condition.should_retry(&err) {
162 match self.retry(err, cx) {
163 Ok(poll) => poll,
164 Err(err) => Poll::Ready(Err(err)),
165 }
166 } else {
167 Poll::Ready(Err(err))
168 }
169 }
170 },
171 RetryFuturePoll::Sleeping(poll_result) => match poll_result {
172 Poll::Pending => Poll::Pending,
173 Poll::Ready(_) => self.attempt(cx),
174 },
175 }
176 }
177}