async_bincode/
tokio.rs

1//! Asynchronous access to a bincode-encoded item stream using `tokio`. See the top-level
2//! documentation and the documentation for [`AsyncBincodeReader`], [`AsyncBincodeWriter`], and
3//! [`AsyncBincodeStream`].
4
5make_reader!(tokio::io::AsyncRead, internal_poll_reader);
6make_writer!(tokio::io::AsyncWrite, poll_shutdown);
7make_stream!(
8    tokio::io::AsyncRead,
9    tokio::io::AsyncWrite,
10    tokio::io::ReadBuf,
11    ()
12);
13
14fn internal_poll_reader<R>(
15    r: std::pin::Pin<&mut R>,
16    cx: &mut std::task::Context,
17    rest: &mut [u8],
18) -> std::task::Poll<std::io::Result<usize>>
19where
20    R: tokio::io::AsyncRead + Unpin,
21{
22    let mut buf = tokio::io::ReadBuf::new(rest);
23    futures_core::ready!(r.poll_read(cx, &mut buf))?;
24    let n = buf.filled().len();
25    std::task::Poll::Ready(Ok(n))
26}
27
28impl<R, W, D> AsyncBincodeStream<tokio::net::TcpStream, R, W, D> {
29    /// Split a TCP-based stream into a read half and a write half.
30    ///
31    /// This is more performant than using a lock-based split like the one provided by `tokio-io`
32    /// or `futures-util` since we know that reads and writes to a `TcpStream` can continue
33    /// concurrently.
34    ///
35    /// Any partially sent or received state is preserved.
36    pub fn tcp_split(
37        &mut self,
38    ) -> (
39        AsyncBincodeReader<tokio::net::tcp::ReadHalf, R>,
40        AsyncBincodeWriter<tokio::net::tcp::WriteHalf, W, D>,
41    ) {
42        // First, steal the reader state so it isn't lost
43        let rbuff = self.stream.0.buffer.split();
44        // Then, fish out the writer
45        let writer = &mut self.stream.get_mut().0;
46        // And steal the writer state so it isn't lost
47        let wbuff = writer.buffer.split_off(0);
48        let wsize = writer.written;
49        // Now split the stream
50        let (r, w) = writer.get_mut().split();
51        // Then put the reader back together
52        let mut reader = AsyncBincodeReader::from(r);
53        reader.0.buffer = rbuff;
54        // And then the writer
55        let mut writer: AsyncBincodeWriter<_, _, D> = AsyncBincodeWriter::from(w).make_for();
56        writer.buffer = wbuff;
57        writer.written = wsize;
58        // All good!
59        (reader, writer)
60    }
61}