wasi_common/tokio/sched/
unix.rs1use crate::{
2 sched::{
3 subscription::{RwEventFlags, Subscription},
4 Poll,
5 },
6 Error,
7};
8use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll as FPoll};
11
12struct FirstReady<'a, T>(Vec<Pin<Box<dyn Future<Output = T> + Send + 'a>>>);
13
14impl<'a, T> FirstReady<'a, T> {
15 fn new() -> Self {
16 FirstReady(Vec::new())
17 }
18 fn push(&mut self, f: impl Future<Output = T> + Send + 'a) {
19 self.0.push(Box::pin(f));
20 }
21}
22
23impl<'a, T> Future for FirstReady<'a, T> {
24 type Output = T;
25 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> FPoll<T> {
26 let mut result = FPoll::Pending;
27 for f in self.as_mut().0.iter_mut() {
28 match f.as_mut().poll(cx) {
29 FPoll::Ready(r) => match result {
30 FPoll::Pending => {
34 result = FPoll::Ready(r);
35 }
36 _ => {}
37 },
38 _ => continue,
39 }
40 }
41 return result;
42 }
43}
44
45pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
46 if poll.is_empty() {
47 return Ok(());
48 }
49
50 let duration = poll
51 .earliest_clock_deadline()
52 .map(|sub| sub.duration_until());
53
54 let mut futures = FirstReady::new();
55 for s in poll.rw_subscriptions() {
56 match s {
57 Subscription::Read(f) => {
58 futures.push(async move {
59 f.file
60 .readable()
61 .await
62 .map_err(|e| e.context("readable future"))?;
63 f.complete(
64 f.file
65 .num_ready_bytes()
66 .map_err(|e| e.context("read num_ready_bytes"))?,
67 RwEventFlags::empty(),
68 );
69 Ok::<(), Error>(())
70 });
71 }
72
73 Subscription::Write(f) => {
74 futures.push(async move {
75 f.file
76 .writable()
77 .await
78 .map_err(|e| e.context("writable future"))?;
79 f.complete(0, RwEventFlags::empty());
80 Ok(())
81 });
82 }
83 Subscription::MonotonicClock { .. } => unreachable!(),
84 }
85 }
86 if let Some(Some(remaining_duration)) = duration {
87 match tokio::time::timeout(remaining_duration, futures).await {
88 Ok(r) => r?,
89 Err(_deadline_elapsed) => {}
90 }
91 } else {
92 futures.await?;
93 }
94
95 Ok(())
96}