1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll as FPoll};
use wasi_common::{
    sched::{
        subscription::{RwEventFlags, Subscription},
        Poll,
    },
    Error,
};

struct FirstReady<'a, T>(Vec<Pin<Box<dyn Future<Output = T> + Send + 'a>>>);

impl<'a, T> FirstReady<'a, T> {
    fn new() -> Self {
        FirstReady(Vec::new())
    }
    fn push(&mut self, f: impl Future<Output = T> + Send + 'a) {
        self.0.push(Box::pin(f));
    }
}

impl<'a, T> Future for FirstReady<'a, T> {
    type Output = T;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> FPoll<T> {
        let mut result = FPoll::Pending;
        for f in self.as_mut().0.iter_mut() {
            match f.as_mut().poll(cx) {
                FPoll::Ready(r) => match result {
                    // First ready gets to set the result. But, continue the loop so all futures
                    // which are ready simultaneously (often on first poll) get to report their
                    // readiness.
                    FPoll::Pending => {
                        result = FPoll::Ready(r);
                    }
                    _ => {}
                },
                _ => continue,
            }
        }
        return result;
    }
}

pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
    if poll.is_empty() {
        return Ok(());
    }

    let duration = poll
        .earliest_clock_deadline()
        .map(|sub| sub.duration_until());

    let mut futures = FirstReady::new();
    for s in poll.rw_subscriptions() {
        match s {
            Subscription::Read(f) => {
                futures.push(async move {
                    f.file
                        .readable()
                        .await
                        .map_err(|e| e.context("readable future"))?;
                    f.complete(
                        f.file
                            .num_ready_bytes()
                            .map_err(|e| e.context("read num_ready_bytes"))?,
                        RwEventFlags::empty(),
                    );
                    Ok::<(), Error>(())
                });
            }

            Subscription::Write(f) => {
                futures.push(async move {
                    f.file
                        .writable()
                        .await
                        .map_err(|e| e.context("writable future"))?;
                    f.complete(0, RwEventFlags::empty());
                    Ok(())
                });
            }
            Subscription::MonotonicClock { .. } => unreachable!(),
        }
    }
    if let Some(Some(remaining_duration)) = duration {
        match tokio::time::timeout(remaining_duration, futures).await {
            Ok(r) => r?,
            Err(_deadline_elapsed) => {}
        }
    } else {
        futures.await?;
    }

    Ok(())
}