async_timer/timer/
apple.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
//! Dispatch Source based Timer

use core::{ptr, task, time};
use core::pin::Pin;
use core::future::Future;

use crate::state::TimerState;
use crate::alloc::boxed::Box;

use libc::{c_long, c_ulong, c_void, uintptr_t};

#[allow(non_camel_case_types)]
mod ffi {
    use super::*;

    pub type dispatch_object_t = *const c_void;
    pub type dispatch_queue_t = *const c_void;
    pub type dispatch_source_t = *const c_void;
    pub type dispatch_source_type_t = *const c_void;
    pub type dispatch_time_t = u64;

    pub const DISPATCH_TIME_FOREVER: dispatch_time_t = !0;
    //pub const DISPATCH_WALLTIME_NOW: dispatch_time_t = !1;
    pub const QOS_CLASS_DEFAULT: c_long = 0x15;

    extern "C" {
        pub static _dispatch_source_type_timer: c_long;

        pub fn dispatch_get_global_queue(identifier: c_long, flags: c_ulong) -> dispatch_queue_t;
        pub fn dispatch_source_create(type_: dispatch_source_type_t, handle: uintptr_t, mask: c_ulong, queue: dispatch_queue_t) -> dispatch_source_t;
        pub fn dispatch_source_set_timer(source: dispatch_source_t, start: dispatch_time_t, interval: u64, leeway: u64);
        pub fn dispatch_source_set_event_handler_f(source: dispatch_source_t, handler: unsafe extern "C" fn(*mut c_void));
        pub fn dispatch_set_context(object: dispatch_object_t, context: *mut c_void);
        pub fn dispatch_resume(object: dispatch_object_t);
        pub fn dispatch_suspend(object: dispatch_object_t);
        pub fn dispatch_release(object: dispatch_object_t);
        pub fn dispatch_source_cancel(object: dispatch_object_t);
        pub fn dispatch_walltime(when: *const c_void, delta: i64) -> dispatch_time_t;
    }
}

//TODO: Investigate why sometimes it is called multiple times
unsafe extern "C" fn timer_handler(context: *mut c_void) {
    let state = context as *mut TimerState;

    (*state).wake();
}

struct TimerHandle {
    inner: ffi::dispatch_source_t,
    //Suspension count. Incremented suspend, and decremented on each resume
    s_count: u8,
}

impl Drop for TimerHandle {
    fn drop(&mut self) {
        unsafe {
            ffi::dispatch_source_cancel(self.inner);

            //It is error to release while source is suspended
            //So we decrement it
            self.resume();

            ffi::dispatch_release(self.inner);
        }
    }
}

impl TimerHandle {
    fn new(state: *mut TimerState) -> Self {
        let inner = unsafe {
            let queue = ffi::dispatch_get_global_queue(ffi::QOS_CLASS_DEFAULT, 0);
            ffi::dispatch_source_create(&ffi::_dispatch_source_type_timer as *const _ as ffi::dispatch_source_type_t, 0, 0, queue)
        };

        os_assert!(!inner.is_null());

        unsafe {
            ffi::dispatch_source_set_event_handler_f(inner, timer_handler);
            ffi::dispatch_set_context(inner, state as *mut _);
        }

        Self {
            inner,
            //Starts as suspended
            s_count: 1,
        }
    }

    fn suspend(&mut self) {
        if self.s_count == 0 {
            unsafe {
                ffi::dispatch_suspend(self.inner);
            }

            self.s_count += 1;
        }
    }

    fn resume(&mut self) {
        while self.s_count > 0 {
            unsafe {
                ffi::dispatch_resume(self.inner)
            }

            self.s_count -= 1;
        }
    }

    fn set_delay(&mut self, timeout: time::Duration) {
        self.suspend();

        unsafe {
            let start = ffi::dispatch_walltime(ptr::null(), timeout.as_nanos() as i64);
            ffi::dispatch_source_set_timer(self.inner, start, ffi::DISPATCH_TIME_FOREVER, 0);
        }

        self.resume();
    }
}

unsafe impl Send for TimerHandle {}
unsafe impl Sync for TimerHandle {}

enum State {
    Init(time::Duration),
    Running(TimerHandle, Box<TimerState>),
}

///Posix Timer
///
///Currently implemented only for `Linux` and `Android` as BSD systems
///proved to be a bit  problematic
pub struct AppleTimer {
    state: State,
}

impl AppleTimer {
    #[inline]
    ///Creates new instance
    pub const fn new(time: time::Duration) -> Self {
        Self {
            state: State::Init(time),
        }
    }
}

impl super::Timer for AppleTimer {
    #[inline(always)]
    fn new(timeout: time::Duration) -> Self {
        assert_time!(timeout);
        Self::new(timeout)
    }

    #[inline]
    fn is_ticking(&self) -> bool {
        match &self.state {
            State::Init(_) => false,
            State::Running(_, ref state) => !state.is_done(),
        }
    }

    #[inline]
    fn is_expired(&self) -> bool {
        match &self.state {
            State::Init(_) => false,
            State::Running(_, ref state) => state.is_done(),
        }
    }

    fn restart(&mut self, new_value: time::Duration) {
        assert_time!(new_value);

        match &mut self.state {
            State::Init(ref mut timeout) => {
                *timeout = new_value;
            },
            State::Running(fd, ref mut state) => {
                state.reset();
                fd.set_delay(new_value);
            }
        }
    }

    fn restart_ctx(&mut self, new_value: time::Duration, waker: &task::Waker) {
        assert_time!(new_value);

        match &mut self.state {
            State::Init(ref mut timeout) => {
                *timeout = new_value;
            },
            State::Running(fd, ref mut state) => {
                state.register(waker);
                state.reset();
                fd.set_delay(new_value);
            }
        }
    }

    fn cancel(&mut self) {
        match self.state {
            State::Init(_) => (),
            State::Running(ref mut fd, ref state) => {
                state.cancel();
                fd.suspend();
            }
        }
    }
}

impl super::SyncTimer for AppleTimer {
    fn init<R, F: Fn(&TimerState) -> R>(&mut self, init: F) -> R {
        if let State::Init(timeout) = self.state {
            let state = Box::into_raw(Box::new(TimerState::new()));
            let mut fd = TimerHandle::new(state);

            let state = unsafe { Box::from_raw(state) };
            init(&state);

            fd.set_delay(timeout);

            self.state = State::Running(fd, state)
        }

        match &self.state {
            State::Running(_, ref state) => init(state),
            State::Init(_) => unreach!(),
        }
    }
}

impl Future for AppleTimer {
    type Output = ();

    #[inline]
    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Self::Output> {
        crate::timer::poll_sync(self.get_mut(), ctx)
    }
}