tokio_sync/
oneshot.rs

1//! A channel for sending a single message between asynchronous tasks.
2
3use loom::{
4    futures::task::{self, Task},
5    sync::atomic::AtomicUsize,
6    sync::CausalCell,
7};
8
9use futures::{Async, Future, Poll};
10
11use std::fmt;
12use std::mem::{self, ManuallyDrop};
13use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
14use std::sync::Arc;
15
16/// Sends a value to the associated `Receiver`.
17///
18/// Instances are created by the [`channel`](fn.channel.html) function.
19#[derive(Debug)]
20pub struct Sender<T> {
21    inner: Option<Arc<Inner<T>>>,
22}
23
24/// Receive a value from the associated `Sender`.
25///
26/// Instances are created by the [`channel`](fn.channel.html) function.
27#[derive(Debug)]
28pub struct Receiver<T> {
29    inner: Option<Arc<Inner<T>>>,
30}
31
32pub mod error {
33    //! Oneshot error types
34
35    use std::fmt;
36
37    /// Error returned by the `Future` implementation for `Receiver`.
38    #[derive(Debug)]
39    pub struct RecvError(pub(super) ());
40
41    /// Error returned by the `try_recv` function on `Receiver`.
42    #[derive(Debug)]
43    pub struct TryRecvError(pub(super) ());
44
45    // ===== impl RecvError =====
46
47    impl fmt::Display for RecvError {
48        fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
49            use std::error::Error;
50            write!(fmt, "{}", self.description())
51        }
52    }
53
54    impl ::std::error::Error for RecvError {
55        fn description(&self) -> &str {
56            "channel closed"
57        }
58    }
59
60    // ===== impl TryRecvError =====
61
62    impl fmt::Display for TryRecvError {
63        fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
64            use std::error::Error;
65            write!(fmt, "{}", self.description())
66        }
67    }
68
69    impl ::std::error::Error for TryRecvError {
70        fn description(&self) -> &str {
71            "channel closed"
72        }
73    }
74}
75
76use self::error::*;
77
78struct Inner<T> {
79    /// Manages the state of the inner cell
80    state: AtomicUsize,
81
82    /// The value. This is set by `Sender` and read by `Receiver`. The state of
83    /// the cell is tracked by `state`.
84    value: CausalCell<Option<T>>,
85
86    /// The task to notify when the receiver drops without consuming the value.
87    tx_task: CausalCell<ManuallyDrop<Task>>,
88
89    /// The task to notify when the value is sent.
90    rx_task: CausalCell<ManuallyDrop<Task>>,
91}
92
93#[derive(Clone, Copy)]
94struct State(usize);
95
96/// Create a new one-shot channel for sending single values across asynchronous
97/// tasks.
98///
99/// The function returns separate "send" and "receive" handles. The `Sender`
100/// handle is used by the producer to send the value. The `Receiver` handle is
101/// used by the consumer to receive the value.
102///
103/// Each handle can be used on separate tasks.
104///
105/// # Examples
106///
107/// ```
108/// extern crate futures;
109/// extern crate tokio;
110///
111/// use tokio::sync::oneshot;
112/// use futures::Future;
113/// use std::thread;
114///
115/// let (sender, receiver) = oneshot::channel::<i32>();
116///
117/// # let t =
118/// thread::spawn(|| {
119///     let future = receiver.map(|i| {
120///         println!("got: {:?}", i);
121///     });
122///     // ...
123/// # return future;
124/// });
125///
126/// sender.send(3).unwrap();
127/// # t.join().unwrap().wait().unwrap();
128/// ```
129pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
130    let inner = Arc::new(Inner {
131        state: AtomicUsize::new(State::new().as_usize()),
132        value: CausalCell::new(None),
133        tx_task: CausalCell::new(ManuallyDrop::new(unsafe { mem::uninitialized() })),
134        rx_task: CausalCell::new(ManuallyDrop::new(unsafe { mem::uninitialized() })),
135    });
136
137    let tx = Sender {
138        inner: Some(inner.clone()),
139    };
140    let rx = Receiver { inner: Some(inner) };
141
142    (tx, rx)
143}
144
145impl<T> Sender<T> {
146    /// Completes this oneshot with a successful result.
147    ///
148    /// The function consumes `self` and notifies the `Receiver` handle that a
149    /// value is ready to be received.
150    ///
151    /// If the value is successfully enqueued for the remote end to receive,
152    /// then `Ok(())` is returned. If the receiving end was dropped before this
153    /// function was called, however, then `Err` is returned with the value
154    /// provided.
155    pub fn send(mut self, t: T) -> Result<(), T> {
156        let inner = self.inner.take().unwrap();
157
158        inner.value.with_mut(|ptr| unsafe {
159            *ptr = Some(t);
160        });
161
162        if !inner.complete() {
163            return Err(inner
164                .value
165                .with_mut(|ptr| unsafe { (*ptr).take() }.unwrap()));
166        }
167
168        Ok(())
169    }
170
171    /// Check if the associated [`Receiver`] handle has been dropped.
172    ///
173    /// # Return values
174    ///
175    /// If `Ok(Ready)` is returned then the associated `Receiver` has been
176    /// dropped, which means any work required for sending should be canceled.
177    ///
178    /// If `Ok(NotReady)` is returned then the associated `Receiver` is still
179    /// alive and may be able to receive a message if sent. The current task is
180    /// registered to receive a notification if the `Receiver` handle goes away.
181    ///
182    /// [`Receiver`]: struct.Receiver.html
183    pub fn poll_close(&mut self) -> Poll<(), ()> {
184        let inner = self.inner.as_ref().unwrap();
185
186        let mut state = State::load(&inner.state, Acquire);
187
188        if state.is_closed() {
189            return Ok(Async::Ready(()));
190        }
191
192        if state.is_tx_task_set() {
193            let will_notify = inner
194                .tx_task
195                .with(|ptr| unsafe { (&*ptr).will_notify_current() });
196
197            if !will_notify {
198                state = State::unset_tx_task(&inner.state);
199
200                if state.is_closed() {
201                    // Set the flag again so that the waker is released in drop
202                    State::set_tx_task(&inner.state);
203                    return Ok(Async::Ready(()));
204                } else {
205                    unsafe { inner.drop_tx_task() };
206                }
207            }
208        }
209
210        if !state.is_tx_task_set() {
211            // Attempt to set the task
212            unsafe {
213                inner.set_tx_task();
214            }
215
216            // Update the state
217            state = State::set_tx_task(&inner.state);
218
219            if state.is_closed() {
220                return Ok(Async::Ready(()));
221            }
222        }
223
224        Ok(Async::NotReady)
225    }
226
227    /// Check if the associated [`Receiver`] handle has been dropped.
228    ///
229    /// Unlike [`poll_close`], this function does not register a task for
230    /// wakeup upon close.
231    ///
232    /// [`Receiver`]: struct.Receiver.html
233    /// [`poll_close`]: struct.Sender.html#method.poll_close
234    pub fn is_closed(&self) -> bool {
235        let inner = self.inner.as_ref().unwrap();
236
237        let state = State::load(&inner.state, Acquire);
238        state.is_closed()
239    }
240}
241
242impl<T> Drop for Sender<T> {
243    fn drop(&mut self) {
244        if let Some(inner) = self.inner.as_ref() {
245            inner.complete();
246        }
247    }
248}
249
250impl<T> Receiver<T> {
251    /// Prevent the associated [`Sender`] handle from sending a value.
252    ///
253    /// Any `send` operation which happens after calling `close` is guaranteed
254    /// to fail. After calling `close`, `Receiver::poll`] should be called to
255    /// receive a value if one was sent **before** the call to `close`
256    /// completed.
257    ///
258    /// [`Sender`]: struct.Sender.html
259    pub fn close(&mut self) {
260        let inner = self.inner.as_ref().unwrap();
261        inner.close();
262    }
263
264    /// Attempts to receive a value outside of the context of a task.
265    ///
266    /// Does not register a task if no value has been sent.
267    ///
268    /// A return value of `None` must be considered immediately stale (out of
269    /// date) unless [`close`] has been called first.
270    ///
271    /// Returns an error if the sender was dropped.
272    ///
273    /// [`close`]: #method.close
274    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
275        let result = if let Some(inner) = self.inner.as_ref() {
276            let state = State::load(&inner.state, Acquire);
277
278            if state.is_complete() {
279                match unsafe { inner.consume_value() } {
280                    Some(value) => Ok(value),
281                    None => Err(TryRecvError(())),
282                }
283            } else if state.is_closed() {
284                Err(TryRecvError(()))
285            } else {
286                // Not ready, this does not clear `inner`
287                return Err(TryRecvError(()));
288            }
289        } else {
290            panic!("called after complete");
291        };
292
293        self.inner = None;
294        result
295    }
296}
297
298impl<T> Drop for Receiver<T> {
299    fn drop(&mut self) {
300        if let Some(inner) = self.inner.as_ref() {
301            inner.close();
302        }
303    }
304}
305
306impl<T> Future for Receiver<T> {
307    type Item = T;
308    type Error = RecvError;
309
310    fn poll(&mut self) -> Poll<T, RecvError> {
311        use futures::Async::{NotReady, Ready};
312
313        // If `inner` is `None`, then `poll()` has already completed.
314        let ret = if let Some(inner) = self.inner.as_ref() {
315            match inner.poll_recv() {
316                Ok(Ready(v)) => Ok(Ready(v)),
317                Ok(NotReady) => return Ok(NotReady),
318                Err(e) => Err(e),
319            }
320        } else {
321            panic!("called after complete");
322        };
323
324        self.inner = None;
325        ret
326    }
327}
328
329impl<T> Inner<T> {
330    fn complete(&self) -> bool {
331        let prev = State::set_complete(&self.state);
332
333        if prev.is_closed() {
334            return false;
335        }
336
337        if prev.is_rx_task_set() {
338            self.rx_task.with(|ptr| unsafe { (&*ptr).notify() });
339        }
340
341        true
342    }
343
344    fn poll_recv(&self) -> Poll<T, RecvError> {
345        use futures::Async::{NotReady, Ready};
346
347        // Load the state
348        let mut state = State::load(&self.state, Acquire);
349
350        if state.is_complete() {
351            match unsafe { self.consume_value() } {
352                Some(value) => Ok(Ready(value)),
353                None => Err(RecvError(())),
354            }
355        } else if state.is_closed() {
356            Err(RecvError(()))
357        } else {
358            if state.is_rx_task_set() {
359                let will_notify = self
360                    .rx_task
361                    .with(|ptr| unsafe { (&*ptr).will_notify_current() });
362
363                // Check if the task is still the same
364                if !will_notify {
365                    // Unset the task
366                    state = State::unset_rx_task(&self.state);
367                    if state.is_complete() {
368                        // Set the flag again so that the waker is released in drop
369                        State::set_rx_task(&self.state);
370
371                        return match unsafe { self.consume_value() } {
372                            Some(value) => Ok(Ready(value)),
373                            None => Err(RecvError(())),
374                        };
375                    } else {
376                        unsafe { self.drop_rx_task() };
377                    }
378                }
379            }
380
381            if !state.is_rx_task_set() {
382                // Attempt to set the task
383                unsafe {
384                    self.set_rx_task();
385                }
386
387                // Update the state
388                state = State::set_rx_task(&self.state);
389
390                if state.is_complete() {
391                    match unsafe { self.consume_value() } {
392                        Some(value) => Ok(Ready(value)),
393                        None => Err(RecvError(())),
394                    }
395                } else {
396                    return Ok(NotReady);
397                }
398            } else {
399                return Ok(NotReady);
400            }
401        }
402    }
403
404    /// Called by `Receiver` to indicate that the value will never be received.
405    fn close(&self) {
406        let prev = State::set_closed(&self.state);
407
408        if prev.is_tx_task_set() && !prev.is_complete() {
409            self.tx_task.with(|ptr| unsafe { (&*ptr).notify() });
410        }
411    }
412
413    /// Consume the value. This function does not check `state`.
414    unsafe fn consume_value(&self) -> Option<T> {
415        self.value.with_mut(|ptr| (*ptr).take())
416    }
417
418    unsafe fn drop_rx_task(&self) {
419        self.rx_task.with_mut(|ptr| ManuallyDrop::drop(&mut *ptr))
420    }
421
422    unsafe fn drop_tx_task(&self) {
423        self.tx_task.with_mut(|ptr| ManuallyDrop::drop(&mut *ptr))
424    }
425
426    unsafe fn set_rx_task(&self) {
427        self.rx_task
428            .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current()));
429    }
430
431    unsafe fn set_tx_task(&self) {
432        self.tx_task
433            .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current()));
434    }
435}
436
437unsafe impl<T: Send> Send for Inner<T> {}
438unsafe impl<T: Send> Sync for Inner<T> {}
439
440impl<T> Drop for Inner<T> {
441    fn drop(&mut self) {
442        let state = State(*self.state.get_mut());
443
444        if state.is_rx_task_set() {
445            self.rx_task.with_mut(|ptr| unsafe {
446                ManuallyDrop::drop(&mut *ptr);
447            });
448        }
449
450        if state.is_tx_task_set() {
451            self.tx_task.with_mut(|ptr| unsafe {
452                ManuallyDrop::drop(&mut *ptr);
453            });
454        }
455    }
456}
457
458impl<T: fmt::Debug> fmt::Debug for Inner<T> {
459    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
460        use std::sync::atomic::Ordering::Relaxed;
461
462        fmt.debug_struct("Inner")
463            .field("state", &State::load(&self.state, Relaxed))
464            .finish()
465    }
466}
467
468const RX_TASK_SET: usize = 0b00001;
469const VALUE_SENT: usize = 0b00010;
470const CLOSED: usize = 0b00100;
471const TX_TASK_SET: usize = 0b01000;
472
473impl State {
474    fn new() -> State {
475        State(0)
476    }
477
478    fn is_complete(&self) -> bool {
479        self.0 & VALUE_SENT == VALUE_SENT
480    }
481
482    fn set_complete(cell: &AtomicUsize) -> State {
483        // TODO: This could be `Release`, followed by an `Acquire` fence *if*
484        // the `RX_TASK_SET` flag is set. However, `loom` does not support
485        // fences yet.
486        let val = cell.fetch_or(VALUE_SENT, AcqRel);
487        State(val)
488    }
489
490    fn is_rx_task_set(&self) -> bool {
491        self.0 & RX_TASK_SET == RX_TASK_SET
492    }
493
494    fn set_rx_task(cell: &AtomicUsize) -> State {
495        let val = cell.fetch_or(RX_TASK_SET, AcqRel);
496        State(val | RX_TASK_SET)
497    }
498
499    fn unset_rx_task(cell: &AtomicUsize) -> State {
500        let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
501        State(val & !RX_TASK_SET)
502    }
503
504    fn is_closed(&self) -> bool {
505        self.0 & CLOSED == CLOSED
506    }
507
508    fn set_closed(cell: &AtomicUsize) -> State {
509        // Acquire because we want all later writes (attempting to poll) to be
510        // ordered after this.
511        let val = cell.fetch_or(CLOSED, Acquire);
512        State(val)
513    }
514
515    fn set_tx_task(cell: &AtomicUsize) -> State {
516        let val = cell.fetch_or(TX_TASK_SET, AcqRel);
517        State(val | TX_TASK_SET)
518    }
519
520    fn unset_tx_task(cell: &AtomicUsize) -> State {
521        let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
522        State(val & !TX_TASK_SET)
523    }
524
525    fn is_tx_task_set(&self) -> bool {
526        self.0 & TX_TASK_SET == TX_TASK_SET
527    }
528
529    fn as_usize(self) -> usize {
530        self.0
531    }
532
533    fn load(cell: &AtomicUsize, order: Ordering) -> State {
534        let val = cell.load(order);
535        State(val)
536    }
537}
538
539impl fmt::Debug for State {
540    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
541        fmt.debug_struct("State")
542            .field("is_complete", &self.is_complete())
543            .field("is_closed", &self.is_closed())
544            .field("is_rx_task_set", &self.is_rx_task_set())
545            .field("is_tx_task_set", &self.is_tx_task_set())
546            .finish()
547    }
548}