1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use futures::{
stream::{Fuse, FusedStream},
Stream, StreamExt,
};
use pin_project::pin_project;
use snafu::{Backtrace, ResultExt, Snafu};
use std::{
collections::{hash_map::Entry, HashMap},
hash::Hash,
pin::Pin,
task::{Context, Poll},
};
use time::delay_queue::Expired;
use tokio::time::{
self,
delay_queue::{self, DelayQueue},
Instant,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("timer failure: {}", source))]
TimerError {
source: time::Error,
backtrace: Backtrace,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct ScheduleRequest<T> {
pub message: T,
pub run_at: Instant,
}
struct ScheduledEntry {
run_at: Instant,
queue_key: delay_queue::Key,
}
#[pin_project(project = SchedulerProj)]
struct Scheduler<T, R> {
queue: DelayQueue<T>,
scheduled: HashMap<T, ScheduledEntry>,
#[pin]
requests: Fuse<R>,
}
impl<T, R: Stream> Scheduler<T, R> {
fn new(requests: R) -> Self {
Self {
queue: DelayQueue::new(),
scheduled: HashMap::new(),
requests: requests.fuse(),
}
}
}
impl<T: Hash + Eq + Clone, R> SchedulerProj<'_, T, R> {
fn schedule_message(&mut self, request: ScheduleRequest<T>) {
match self.scheduled.entry(request.message) {
Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => {
let entry = old_entry.get_mut();
self.queue.reset_at(&entry.queue_key, request.run_at);
entry.run_at = request.run_at;
}
Entry::Occupied(_old_entry) => {
}
Entry::Vacant(entry) => {
let message = entry.key().clone();
entry.insert(ScheduledEntry {
run_at: request.run_at,
queue_key: self.queue.insert_at(message, request.run_at),
});
}
}
}
fn poll_pop_queue_message(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<delay_queue::Expired<T>, time::Error>>> {
let message = self.queue.poll_expired(cx);
if let Poll::Ready(Some(Ok(message))) = &message {
self.scheduled.remove(message.get_ref()).expect(
"Expired message was popped from the Scheduler queue, but was not in the metadata map",
);
}
message
}
}
impl<T, R> Stream for Scheduler<T, R>
where
T: Eq + Hash + Clone,
R: Stream<Item = ScheduleRequest<T>>,
{
type Item = Result<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
while let Poll::Ready(Some(request)) = this.requests.as_mut().poll_next(cx) {
this.schedule_message(request);
}
match this.poll_pop_queue_message(cx) {
Poll::Ready(Some(expired)) => {
Poll::Ready(Some(expired.map(Expired::into_inner).context(TimerError)))
}
Poll::Ready(None) => {
if this.requests.is_terminated() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}
pub fn scheduler<T: Eq + Hash + Clone>(
requests: impl Stream<Item = ScheduleRequest<T>>,
) -> impl Stream<Item = Result<T>> {
Scheduler::new(requests)
}