wasi_common/tokio/sched/
unix.rs

1use crate::{
2    sched::{
3        subscription::{RwEventFlags, Subscription},
4        Poll,
5    },
6    Error,
7};
8use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll as FPoll};
11
12struct FirstReady<'a, T>(Vec<Pin<Box<dyn Future<Output = T> + Send + 'a>>>);
13
14impl<'a, T> FirstReady<'a, T> {
15    fn new() -> Self {
16        FirstReady(Vec::new())
17    }
18    fn push(&mut self, f: impl Future<Output = T> + Send + 'a) {
19        self.0.push(Box::pin(f));
20    }
21}
22
23impl<'a, T> Future for FirstReady<'a, T> {
24    type Output = T;
25    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> FPoll<T> {
26        let mut result = FPoll::Pending;
27        for f in self.as_mut().0.iter_mut() {
28            match f.as_mut().poll(cx) {
29                FPoll::Ready(r) => match result {
30                    // First ready gets to set the result. But, continue the loop so all futures
31                    // which are ready simultaneously (often on first poll) get to report their
32                    // readiness.
33                    FPoll::Pending => {
34                        result = FPoll::Ready(r);
35                    }
36                    _ => {}
37                },
38                _ => continue,
39            }
40        }
41        return result;
42    }
43}
44
45pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
46    if poll.is_empty() {
47        return Ok(());
48    }
49
50    let duration = poll
51        .earliest_clock_deadline()
52        .map(|sub| sub.duration_until());
53
54    let mut futures = FirstReady::new();
55    for s in poll.rw_subscriptions() {
56        match s {
57            Subscription::Read(f) => {
58                futures.push(async move {
59                    f.file
60                        .readable()
61                        .await
62                        .map_err(|e| e.context("readable future"))?;
63                    f.complete(
64                        f.file
65                            .num_ready_bytes()
66                            .map_err(|e| e.context("read num_ready_bytes"))?,
67                        RwEventFlags::empty(),
68                    );
69                    Ok::<(), Error>(())
70                });
71            }
72
73            Subscription::Write(f) => {
74                futures.push(async move {
75                    f.file
76                        .writable()
77                        .await
78                        .map_err(|e| e.context("writable future"))?;
79                    f.complete(0, RwEventFlags::empty());
80                    Ok(())
81                });
82            }
83            Subscription::MonotonicClock { .. } => unreachable!(),
84        }
85    }
86    if let Some(Some(remaining_duration)) = duration {
87        match tokio::time::timeout(remaining_duration, futures).await {
88            Ok(r) => r?,
89            Err(_deadline_elapsed) => {}
90        }
91    } else {
92        futures.await?;
93    }
94
95    Ok(())
96}