tokio_io/
split.rs

1use std::io::{self, Read, Write};
2
3use bytes::{Buf, BufMut};
4use futures::sync::BiLock;
5use futures::{Async, Poll};
6
7use {AsyncRead, AsyncWrite};
8
9/// The readable half of an object returned from `AsyncRead::split`.
10#[derive(Debug)]
11pub struct ReadHalf<T> {
12    handle: BiLock<T>,
13}
14
15impl<T: AsyncRead + AsyncWrite> ReadHalf<T> {
16    /// Reunite with a previously split `WriteHalf`.
17    ///
18    /// # Panics
19    ///
20    /// If this `ReadHalf` and the given `WriteHalf` do not originate from
21    /// the same `AsyncRead::split` operation this method will panic.
22    pub fn unsplit(self, w: WriteHalf<T>) -> T {
23        if let Ok(x) = self.handle.reunite(w.handle) {
24            x
25        } else {
26            panic!("Unrelated `WriteHalf` passed to `ReadHalf::unsplit`.")
27        }
28    }
29}
30
31/// The writable half of an object returned from `AsyncRead::split`.
32#[derive(Debug)]
33pub struct WriteHalf<T> {
34    handle: BiLock<T>,
35}
36
37impl<T: AsyncRead + AsyncWrite> WriteHalf<T> {
38    /// Reunite with a previously split `ReadHalf`.
39    ///
40    /// # panics
41    ///
42    /// If this `WriteHalf` and the given `ReadHalf` do not originate from
43    /// the same `AsyncRead::split` operation this method will panic.
44    pub fn unsplit(self, r: ReadHalf<T>) -> T {
45        if let Ok(x) = self.handle.reunite(r.handle) {
46            x
47        } else {
48            panic!("Unrelated `ReadHalf` passed to `WriteHalf::unsplit`.")
49        }
50    }
51}
52
53pub fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) {
54    let (a, b) = BiLock::new(t);
55    (ReadHalf { handle: a }, WriteHalf { handle: b })
56}
57
58fn would_block() -> io::Error {
59    io::Error::new(io::ErrorKind::WouldBlock, "would block")
60}
61
62impl<T: AsyncRead> Read for ReadHalf<T> {
63    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
64        match self.handle.poll_lock() {
65            Async::Ready(mut l) => l.read(buf),
66            Async::NotReady => Err(would_block()),
67        }
68    }
69}
70
71impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
72    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
73        let mut l = try_ready!(wrap_as_io(self.handle.poll_lock()));
74        l.read_buf(buf)
75    }
76}
77
78impl<T: AsyncWrite> Write for WriteHalf<T> {
79    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
80        match self.handle.poll_lock() {
81            Async::Ready(mut l) => l.write(buf),
82            Async::NotReady => Err(would_block()),
83        }
84    }
85
86    fn flush(&mut self) -> io::Result<()> {
87        match self.handle.poll_lock() {
88            Async::Ready(mut l) => l.flush(),
89            Async::NotReady => Err(would_block()),
90        }
91    }
92}
93
94impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
95    fn shutdown(&mut self) -> Poll<(), io::Error> {
96        let mut l = try_ready!(wrap_as_io(self.handle.poll_lock()));
97        l.shutdown()
98    }
99
100    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
101    where
102        Self: Sized,
103    {
104        let mut l = try_ready!(wrap_as_io(self.handle.poll_lock()));
105        l.write_buf(buf)
106    }
107}
108
109fn wrap_as_io<T>(t: Async<T>) -> Result<Async<T>, io::Error> {
110    Ok(t)
111}
112
113#[cfg(test)]
114mod tests {
115    extern crate tokio_current_thread;
116
117    use super::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
118    use bytes::{BytesMut, IntoBuf};
119    use futures::sync::BiLock;
120    use futures::{future::lazy, future::ok, Async, Poll};
121
122    use std::io::{self, Read, Write};
123
124    struct RW;
125
126    impl Read for RW {
127        fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
128            Ok(1)
129        }
130    }
131
132    impl AsyncRead for RW {}
133
134    impl Write for RW {
135        fn write(&mut self, _: &[u8]) -> io::Result<usize> {
136            Ok(1)
137        }
138
139        fn flush(&mut self) -> io::Result<()> {
140            Ok(())
141        }
142    }
143
144    impl AsyncWrite for RW {
145        fn shutdown(&mut self) -> Poll<(), io::Error> {
146            Ok(Async::Ready(()))
147        }
148    }
149
150    #[test]
151    fn split_readhalf_translate_wouldblock_to_not_ready() {
152        tokio_current_thread::block_on_all(lazy(move || {
153            let rw = RW {};
154            let (a, b) = BiLock::new(rw);
155            let mut rx = ReadHalf { handle: a };
156
157            let mut buf = BytesMut::with_capacity(64);
158
159            // First read is uncontended, should go through.
160            assert!(rx.read_buf(&mut buf).unwrap().is_ready());
161
162            // Take lock from write side.
163            let lock = b.poll_lock();
164
165            // Second read should be NotReady.
166            assert!(!rx.read_buf(&mut buf).unwrap().is_ready());
167
168            drop(lock);
169
170            // Back to uncontended.
171            assert!(rx.read_buf(&mut buf).unwrap().is_ready());
172
173            ok::<(), ()>(())
174        }))
175        .unwrap();
176    }
177
178    #[test]
179    fn split_writehalf_translate_wouldblock_to_not_ready() {
180        tokio_current_thread::block_on_all(lazy(move || {
181            let rw = RW {};
182            let (a, b) = BiLock::new(rw);
183            let mut tx = WriteHalf { handle: a };
184
185            let bufmut = BytesMut::with_capacity(64);
186            let mut buf = bufmut.into_buf();
187
188            // First write is uncontended, should go through.
189            assert!(tx.write_buf(&mut buf).unwrap().is_ready());
190
191            // Take lock from read side.
192            let lock = b.poll_lock();
193
194            // Second write should be NotReady.
195            assert!(!tx.write_buf(&mut buf).unwrap().is_ready());
196
197            drop(lock);
198
199            // Back to uncontended.
200            assert!(tx.write_buf(&mut buf).unwrap().is_ready());
201
202            ok::<(), ()>(())
203        }))
204        .unwrap();
205    }
206
207    #[test]
208    fn unsplit_ok() {
209        let (r, w) = RW.split();
210        r.unsplit(w);
211
212        let (r, w) = RW.split();
213        w.unsplit(r);
214    }
215
216    #[test]
217    #[should_panic]
218    fn unsplit_err1() {
219        let (r, _) = RW.split();
220        let (_, w) = RW.split();
221        r.unsplit(w);
222    }
223
224    #[test]
225    #[should_panic]
226    fn unsplit_err2() {
227        let (_, w) = RW.split();
228        let (r, _) = RW.split();
229        r.unsplit(w);
230    }
231
232    #[test]
233    #[should_panic]
234    fn unsplit_err3() {
235        let (_, w) = RW.split();
236        let (r, _) = RW.split();
237        w.unsplit(r);
238    }
239
240    #[test]
241    #[should_panic]
242    fn unsplit_err4() {
243        let (r, _) = RW.split();
244        let (_, w) = RW.split();
245        w.unsplit(r);
246    }
247}