1use loom::{
4 futures::task::{self, Task},
5 sync::atomic::AtomicUsize,
6 sync::CausalCell,
7};
8
9use futures::{Async, Future, Poll};
10
11use std::fmt;
12use std::mem::{self, ManuallyDrop};
13use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
14use std::sync::Arc;
15
16#[derive(Debug)]
20pub struct Sender<T> {
21 inner: Option<Arc<Inner<T>>>,
22}
23
24#[derive(Debug)]
28pub struct Receiver<T> {
29 inner: Option<Arc<Inner<T>>>,
30}
31
32pub mod error {
33 use std::fmt;
36
37 #[derive(Debug)]
39 pub struct RecvError(pub(super) ());
40
41 #[derive(Debug)]
43 pub struct TryRecvError(pub(super) ());
44
45 impl fmt::Display for RecvError {
48 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
49 use std::error::Error;
50 write!(fmt, "{}", self.description())
51 }
52 }
53
54 impl ::std::error::Error for RecvError {
55 fn description(&self) -> &str {
56 "channel closed"
57 }
58 }
59
60 impl fmt::Display for TryRecvError {
63 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
64 use std::error::Error;
65 write!(fmt, "{}", self.description())
66 }
67 }
68
69 impl ::std::error::Error for TryRecvError {
70 fn description(&self) -> &str {
71 "channel closed"
72 }
73 }
74}
75
76use self::error::*;
77
78struct Inner<T> {
79 state: AtomicUsize,
81
82 value: CausalCell<Option<T>>,
85
86 tx_task: CausalCell<ManuallyDrop<Task>>,
88
89 rx_task: CausalCell<ManuallyDrop<Task>>,
91}
92
93#[derive(Clone, Copy)]
94struct State(usize);
95
96pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
130 let inner = Arc::new(Inner {
131 state: AtomicUsize::new(State::new().as_usize()),
132 value: CausalCell::new(None),
133 tx_task: CausalCell::new(ManuallyDrop::new(unsafe { mem::uninitialized() })),
134 rx_task: CausalCell::new(ManuallyDrop::new(unsafe { mem::uninitialized() })),
135 });
136
137 let tx = Sender {
138 inner: Some(inner.clone()),
139 };
140 let rx = Receiver { inner: Some(inner) };
141
142 (tx, rx)
143}
144
145impl<T> Sender<T> {
146 pub fn send(mut self, t: T) -> Result<(), T> {
156 let inner = self.inner.take().unwrap();
157
158 inner.value.with_mut(|ptr| unsafe {
159 *ptr = Some(t);
160 });
161
162 if !inner.complete() {
163 return Err(inner
164 .value
165 .with_mut(|ptr| unsafe { (*ptr).take() }.unwrap()));
166 }
167
168 Ok(())
169 }
170
171 pub fn poll_close(&mut self) -> Poll<(), ()> {
184 let inner = self.inner.as_ref().unwrap();
185
186 let mut state = State::load(&inner.state, Acquire);
187
188 if state.is_closed() {
189 return Ok(Async::Ready(()));
190 }
191
192 if state.is_tx_task_set() {
193 let will_notify = inner
194 .tx_task
195 .with(|ptr| unsafe { (&*ptr).will_notify_current() });
196
197 if !will_notify {
198 state = State::unset_tx_task(&inner.state);
199
200 if state.is_closed() {
201 State::set_tx_task(&inner.state);
203 return Ok(Async::Ready(()));
204 } else {
205 unsafe { inner.drop_tx_task() };
206 }
207 }
208 }
209
210 if !state.is_tx_task_set() {
211 unsafe {
213 inner.set_tx_task();
214 }
215
216 state = State::set_tx_task(&inner.state);
218
219 if state.is_closed() {
220 return Ok(Async::Ready(()));
221 }
222 }
223
224 Ok(Async::NotReady)
225 }
226
227 pub fn is_closed(&self) -> bool {
235 let inner = self.inner.as_ref().unwrap();
236
237 let state = State::load(&inner.state, Acquire);
238 state.is_closed()
239 }
240}
241
242impl<T> Drop for Sender<T> {
243 fn drop(&mut self) {
244 if let Some(inner) = self.inner.as_ref() {
245 inner.complete();
246 }
247 }
248}
249
250impl<T> Receiver<T> {
251 pub fn close(&mut self) {
260 let inner = self.inner.as_ref().unwrap();
261 inner.close();
262 }
263
264 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
275 let result = if let Some(inner) = self.inner.as_ref() {
276 let state = State::load(&inner.state, Acquire);
277
278 if state.is_complete() {
279 match unsafe { inner.consume_value() } {
280 Some(value) => Ok(value),
281 None => Err(TryRecvError(())),
282 }
283 } else if state.is_closed() {
284 Err(TryRecvError(()))
285 } else {
286 return Err(TryRecvError(()));
288 }
289 } else {
290 panic!("called after complete");
291 };
292
293 self.inner = None;
294 result
295 }
296}
297
298impl<T> Drop for Receiver<T> {
299 fn drop(&mut self) {
300 if let Some(inner) = self.inner.as_ref() {
301 inner.close();
302 }
303 }
304}
305
306impl<T> Future for Receiver<T> {
307 type Item = T;
308 type Error = RecvError;
309
310 fn poll(&mut self) -> Poll<T, RecvError> {
311 use futures::Async::{NotReady, Ready};
312
313 let ret = if let Some(inner) = self.inner.as_ref() {
315 match inner.poll_recv() {
316 Ok(Ready(v)) => Ok(Ready(v)),
317 Ok(NotReady) => return Ok(NotReady),
318 Err(e) => Err(e),
319 }
320 } else {
321 panic!("called after complete");
322 };
323
324 self.inner = None;
325 ret
326 }
327}
328
329impl<T> Inner<T> {
330 fn complete(&self) -> bool {
331 let prev = State::set_complete(&self.state);
332
333 if prev.is_closed() {
334 return false;
335 }
336
337 if prev.is_rx_task_set() {
338 self.rx_task.with(|ptr| unsafe { (&*ptr).notify() });
339 }
340
341 true
342 }
343
344 fn poll_recv(&self) -> Poll<T, RecvError> {
345 use futures::Async::{NotReady, Ready};
346
347 let mut state = State::load(&self.state, Acquire);
349
350 if state.is_complete() {
351 match unsafe { self.consume_value() } {
352 Some(value) => Ok(Ready(value)),
353 None => Err(RecvError(())),
354 }
355 } else if state.is_closed() {
356 Err(RecvError(()))
357 } else {
358 if state.is_rx_task_set() {
359 let will_notify = self
360 .rx_task
361 .with(|ptr| unsafe { (&*ptr).will_notify_current() });
362
363 if !will_notify {
365 state = State::unset_rx_task(&self.state);
367 if state.is_complete() {
368 State::set_rx_task(&self.state);
370
371 return match unsafe { self.consume_value() } {
372 Some(value) => Ok(Ready(value)),
373 None => Err(RecvError(())),
374 };
375 } else {
376 unsafe { self.drop_rx_task() };
377 }
378 }
379 }
380
381 if !state.is_rx_task_set() {
382 unsafe {
384 self.set_rx_task();
385 }
386
387 state = State::set_rx_task(&self.state);
389
390 if state.is_complete() {
391 match unsafe { self.consume_value() } {
392 Some(value) => Ok(Ready(value)),
393 None => Err(RecvError(())),
394 }
395 } else {
396 return Ok(NotReady);
397 }
398 } else {
399 return Ok(NotReady);
400 }
401 }
402 }
403
404 fn close(&self) {
406 let prev = State::set_closed(&self.state);
407
408 if prev.is_tx_task_set() && !prev.is_complete() {
409 self.tx_task.with(|ptr| unsafe { (&*ptr).notify() });
410 }
411 }
412
413 unsafe fn consume_value(&self) -> Option<T> {
415 self.value.with_mut(|ptr| (*ptr).take())
416 }
417
418 unsafe fn drop_rx_task(&self) {
419 self.rx_task.with_mut(|ptr| ManuallyDrop::drop(&mut *ptr))
420 }
421
422 unsafe fn drop_tx_task(&self) {
423 self.tx_task.with_mut(|ptr| ManuallyDrop::drop(&mut *ptr))
424 }
425
426 unsafe fn set_rx_task(&self) {
427 self.rx_task
428 .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current()));
429 }
430
431 unsafe fn set_tx_task(&self) {
432 self.tx_task
433 .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current()));
434 }
435}
436
437unsafe impl<T: Send> Send for Inner<T> {}
438unsafe impl<T: Send> Sync for Inner<T> {}
439
440impl<T> Drop for Inner<T> {
441 fn drop(&mut self) {
442 let state = State(*self.state.get_mut());
443
444 if state.is_rx_task_set() {
445 self.rx_task.with_mut(|ptr| unsafe {
446 ManuallyDrop::drop(&mut *ptr);
447 });
448 }
449
450 if state.is_tx_task_set() {
451 self.tx_task.with_mut(|ptr| unsafe {
452 ManuallyDrop::drop(&mut *ptr);
453 });
454 }
455 }
456}
457
458impl<T: fmt::Debug> fmt::Debug for Inner<T> {
459 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
460 use std::sync::atomic::Ordering::Relaxed;
461
462 fmt.debug_struct("Inner")
463 .field("state", &State::load(&self.state, Relaxed))
464 .finish()
465 }
466}
467
468const RX_TASK_SET: usize = 0b00001;
469const VALUE_SENT: usize = 0b00010;
470const CLOSED: usize = 0b00100;
471const TX_TASK_SET: usize = 0b01000;
472
473impl State {
474 fn new() -> State {
475 State(0)
476 }
477
478 fn is_complete(&self) -> bool {
479 self.0 & VALUE_SENT == VALUE_SENT
480 }
481
482 fn set_complete(cell: &AtomicUsize) -> State {
483 let val = cell.fetch_or(VALUE_SENT, AcqRel);
487 State(val)
488 }
489
490 fn is_rx_task_set(&self) -> bool {
491 self.0 & RX_TASK_SET == RX_TASK_SET
492 }
493
494 fn set_rx_task(cell: &AtomicUsize) -> State {
495 let val = cell.fetch_or(RX_TASK_SET, AcqRel);
496 State(val | RX_TASK_SET)
497 }
498
499 fn unset_rx_task(cell: &AtomicUsize) -> State {
500 let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
501 State(val & !RX_TASK_SET)
502 }
503
504 fn is_closed(&self) -> bool {
505 self.0 & CLOSED == CLOSED
506 }
507
508 fn set_closed(cell: &AtomicUsize) -> State {
509 let val = cell.fetch_or(CLOSED, Acquire);
512 State(val)
513 }
514
515 fn set_tx_task(cell: &AtomicUsize) -> State {
516 let val = cell.fetch_or(TX_TASK_SET, AcqRel);
517 State(val | TX_TASK_SET)
518 }
519
520 fn unset_tx_task(cell: &AtomicUsize) -> State {
521 let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
522 State(val & !TX_TASK_SET)
523 }
524
525 fn is_tx_task_set(&self) -> bool {
526 self.0 & TX_TASK_SET == TX_TASK_SET
527 }
528
529 fn as_usize(self) -> usize {
530 self.0
531 }
532
533 fn load(cell: &AtomicUsize, order: Ordering) -> State {
534 let val = cell.load(order);
535 State(val)
536 }
537}
538
539impl fmt::Debug for State {
540 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
541 fmt.debug_struct("State")
542 .field("is_complete", &self.is_complete())
543 .field("is_closed", &self.is_closed())
544 .field("is_rx_task_set", &self.is_rx_task_set())
545 .field("is_tx_task_set", &self.is_tx_task_set())
546 .finish()
547 }
548}