1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll as FPoll};
use wasi_common::{
sched::{
subscription::{RwEventFlags, Subscription},
Poll,
},
Context as _, Error,
};
struct FirstReady<'a, T>(Vec<Pin<Box<dyn Future<Output = T> + Send + 'a>>>);
impl<'a, T> FirstReady<'a, T> {
fn new() -> Self {
FirstReady(Vec::new())
}
fn push(&mut self, f: impl Future<Output = T> + Send + 'a) {
self.0.push(Box::pin(f));
}
}
impl<'a, T> Future for FirstReady<'a, T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> FPoll<T> {
let mut result = FPoll::Pending;
for f in self.as_mut().0.iter_mut() {
match f.as_mut().poll(cx) {
FPoll::Ready(r) => match result {
FPoll::Pending => {
result = FPoll::Ready(r);
}
_ => {}
},
_ => continue,
}
}
return result;
}
}
pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
if poll.is_empty() {
return Ok(());
}
let duration = poll
.earliest_clock_deadline()
.map(|sub| sub.duration_until());
let mut futures = FirstReady::new();
for s in poll.rw_subscriptions() {
match s {
Subscription::Read(f) => {
futures.push(async move {
f.file.readable().await.context("readable future")?;
f.complete(
f.file
.num_ready_bytes()
.await
.context("read num_ready_bytes")?,
RwEventFlags::empty(),
);
Ok::<(), Error>(())
});
}
Subscription::Write(f) => {
futures.push(async move {
f.file.writable().await.context("writable future")?;
f.complete(0, RwEventFlags::empty());
Ok(())
});
}
Subscription::MonotonicClock { .. } => unreachable!(),
}
}
if let Some(Some(remaining_duration)) = duration {
match tokio::time::timeout(remaining_duration, futures).await {
Ok(r) => r?,
Err(_deadline_elapsed) => {}
}
} else {
futures.await?;
}
Ok(())
}