1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
use alloc::boxed::Box;
use core::time::Duration;

pub use tokio::time::{Sleep, Sleep as TokioTimeSleep};

use crate::{Sleepble, SleepbleWaitBoxFuture};

//
impl Sleepble for Sleep {
    fn sleep(dur: Duration) -> Self {
        tokio::time::sleep(tokio::time::Duration::from_micros(dur.as_micros() as u64))
    }

    fn wait(self) -> SleepbleWaitBoxFuture {
        Box::pin(self)
    }
}

//
#[derive(Debug)]
pub struct UnpinSleep(pub Sleep);
impl Unpin for UnpinSleep {}

impl Sleepble for UnpinSleep {
    fn sleep(dur: Duration) -> Self {
        UnpinSleep(tokio::time::sleep(tokio::time::Duration::from_micros(
            dur.as_micros() as u64,
        )))
    }

    fn wait(self) -> SleepbleWaitBoxFuture {
        Box::pin(self.0)
    }
}

#[cfg(test)]
mod tests {
    #[allow(unused_imports)]
    use super::*;

    #[cfg(feature = "std")]
    #[tokio::test]
    async fn test_sleep() {
        {
            #[cfg(feature = "std")]
            let now = std::time::Instant::now();

            crate::sleep::sleep::<Sleep>(Duration::from_millis(100)).await;

            #[cfg(feature = "std")]
            {
                let elapsed_dur = now.elapsed();
                assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
            }
        }

        {
            #[cfg(feature = "std")]
            let now = std::time::Instant::now();

            crate::sleep::sleep::<UnpinSleep>(Duration::from_millis(100)).await;

            #[cfg(feature = "std")]
            {
                let elapsed_dur = now.elapsed();
                assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
            }
        }
    }

    #[cfg(feature = "std")]
    #[tokio::test]
    async fn test_sleep_until() {
        {
            let now = std::time::Instant::now();

            crate::sleep::sleep_until::<Sleep>(
                std::time::Instant::now() + Duration::from_millis(100),
            )
            .await;

            let elapsed_dur = now.elapsed();
            assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
        }

        {
            let now = std::time::Instant::now();

            crate::sleep::sleep_until::<UnpinSleep>(
                std::time::Instant::now() + Duration::from_millis(100),
            )
            .await;

            let elapsed_dur = now.elapsed();
            assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
        }
    }

    #[cfg(feature = "rw")]
    #[cfg(test)]
    mod rw_tests {
        use core::time::Duration;
        use std::{io::ErrorKind as IoErrorKind, time::Instant};

        use async_compat::Compat;
        use tokio::{
            net::{TcpListener, TcpStream},
            runtime::Runtime,
        };

        use crate::{
            impl_tokio::Sleep,
            rw::{AsyncReadWithTimeoutExt as _, AsyncWriteWithTimeoutExt as _},
        };

        #[test]
        fn simple() -> Result<(), Box<dyn std::error::Error>> {
            let rt = Runtime::new().unwrap();

            let ret = rt.block_on(async {
                let listener = TcpListener::bind("127.0.0.1:0").await?;

                let addr = listener.local_addr()?;

                let tcp_stream_c = TcpStream::connect(addr).await?;
                let mut tcp_stream_c = Compat::new(tcp_stream_c);
                let (tcp_stream_s, _) = listener.accept().await.expect("Accept failed");
                let mut tcp_stream_s = Compat::new(tcp_stream_s);

                tcp_stream_s
                    .write_with_timeout::<Sleep>(b"foo", Duration::from_secs(1))
                    .await?;

                let mut buf = vec![0u8; 5];
                let n = tcp_stream_c
                    .read_with_timeout::<Sleep>(&mut buf, Duration::from_secs(1))
                    .await?;
                assert_eq!(n, 3);
                assert_eq!(buf, b"foo\0\0");

                let instant = Instant::now();
                let two_secs = Duration::from_secs(2);
                let three_secs = Duration::from_secs(3);
                let err = tcp_stream_c
                    .read_with_timeout::<Sleep>(&mut buf, Duration::from_secs(2))
                    .await
                    .err()
                    .unwrap();
                assert!(instant.elapsed() >= two_secs);
                assert!(instant.elapsed() < three_secs);
                assert_eq!(err.kind(), IoErrorKind::TimedOut);
                assert_eq!(err.to_string(), "read timeout");

                Result::<(), Box<dyn std::error::Error>>::Ok(())
            });

            match ret {
                Ok(_) => {}
                Err(err) => panic!("{err}"),
            }

            Ok(())
        }
    }
}