async_bincode/
lib.rs

1//! Asynchronous access to a bincode-encoded item stream.
2//!
3//! This crate enables you to asynchronously read from a bincode-encoded stream, or write
4//! bincoded-encoded values. `bincode` does not support this natively, as it cannot easily [resume
5//! from stream errors while encoding or decoding](https://github.com/TyOverby/bincode/issues/229).
6//!
7//! `async-bincode` works around that on the receive side by buffering received bytes until a full
8//! element's worth of data has been received, and only then calling into bincode. To make this
9//! work, it relies on the sender to prefix each encoded element with its encoded size.
10//!
11//! On the write side, `async-bincode` buffers the serialized values, and asynchronously sends the
12//! resulting bytestream.
13//!
14//! This crate provides two sets of types in two separate modules. The types in the `futures`
15//! module work with the `futures_io`/`async-std` ecosystem. The types in the `tokio` module work
16//! with the `tokio` ecosystem.
17#![deny(missing_docs)]
18#[cfg(not(any(feature = "futures", feature = "tokio")))]
19compile_error!("async-bincode: Enable at least one of \"futures\" or \"tokio\".");
20
21#[macro_use]
22mod reader;
23#[macro_use]
24mod stream;
25#[macro_use]
26mod writer;
27
28#[cfg(feature = "futures")]
29pub mod futures;
30#[cfg(feature = "tokio")]
31pub mod tokio;
32
33pub use crate::writer::{AsyncDestination, BincodeWriterFor, SyncDestination};
34
35#[cfg(all(test, feature = "futures"))]
36mod futures_tests {
37    use crate::futures::*;
38    use ::futures::prelude::*;
39
40    #[async_std::test]
41    async fn it_works() {
42        let echo = async_std::net::TcpListener::bind("127.0.0.1:0")
43            .await
44            .unwrap();
45        let addr = echo.local_addr().unwrap();
46
47        async_std::task::spawn(async move {
48            let (stream, _) = echo.accept().await.unwrap();
49            let mut stream = AsyncBincodeStream::<_, usize, usize, _>::from(stream).for_async();
50            while let Some(item) = stream.next().await {
51                stream.send(item.unwrap()).await.unwrap();
52            }
53        });
54
55        let client = async_std::net::TcpStream::connect(&addr).await.unwrap();
56        let mut client = AsyncBincodeStream::<_, usize, usize, _>::from(client).for_async();
57        client.send(42).await.unwrap();
58        assert_eq!(client.next().await.unwrap().unwrap(), 42);
59
60        client.send(44).await.unwrap();
61        assert_eq!(client.next().await.unwrap().unwrap(), 44);
62
63        drop(client);
64    }
65
66    #[async_std::test]
67    async fn lots() {
68        let echo = async_std::net::TcpListener::bind("127.0.0.1:0")
69            .await
70            .unwrap();
71        let addr = echo.local_addr().unwrap();
72
73        async_std::task::spawn(async move {
74            let (stream, _) = echo.accept().await.unwrap();
75            let mut stream = AsyncBincodeStream::<_, usize, usize, _>::from(stream).for_async();
76            while let Some(item) = stream.next().await {
77                stream.send(item.unwrap()).await.unwrap();
78            }
79        });
80
81        let n = 81920;
82        let stream = async_std::net::TcpStream::connect(&addr).await.unwrap();
83        let mut c = AsyncBincodeStream::from(stream).for_async();
84
85        ::futures::stream::iter(0usize..n)
86            .map(Ok)
87            .forward(&mut c)
88            .await
89            .unwrap();
90
91        c.get_mut()
92            .shutdown(async_std::net::Shutdown::Write)
93            .unwrap();
94
95        let mut at = 0;
96        while let Some(got) = c.next().await.transpose().unwrap() {
97            assert_eq!(at, got);
98            at += 1;
99        }
100        assert_eq!(at, n);
101    }
102}
103
104#[cfg(all(test, feature = "tokio"))]
105mod tokio_tests {
106    use crate::tokio::*;
107    use futures::prelude::*;
108    use tokio::io::{AsyncWriteExt, BufStream};
109
110    #[tokio::test]
111    async fn it_works() {
112        let echo = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
113        let addr = echo.local_addr().unwrap();
114
115        tokio::spawn(async move {
116            let (stream, _) = echo.accept().await.unwrap();
117            let mut stream = AsyncBincodeStream::<_, usize, usize, _>::from(stream).for_async();
118            let (r, w) = stream.tcp_split();
119            r.forward(w).await.unwrap();
120        });
121
122        let client = tokio::net::TcpStream::connect(&addr).await.unwrap();
123        let mut client = AsyncBincodeStream::<_, usize, usize, _>::from(client).for_async();
124        client.send(42).await.unwrap();
125        assert_eq!(client.next().await.unwrap().unwrap(), 42);
126
127        client.send(44).await.unwrap();
128        assert_eq!(client.next().await.unwrap().unwrap(), 44);
129
130        drop(client);
131    }
132
133    #[tokio::test]
134    async fn lots() {
135        let echo = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
136        let addr = echo.local_addr().unwrap();
137
138        tokio::spawn(async move {
139            let (stream, _) = echo.accept().await.unwrap();
140            let stream =
141                AsyncBincodeStream::<_, usize, usize, _>::from(BufStream::new(stream)).for_async();
142            let (w, r) = stream.split();
143            r.forward(w).await.unwrap();
144        });
145
146        let n = 81920;
147        let stream = tokio::net::TcpStream::connect(&addr).await.unwrap();
148        let mut c = AsyncBincodeStream::from(BufStream::new(stream)).for_async();
149
150        ::futures::stream::iter(0usize..n)
151            .map(Ok)
152            .forward(&mut c)
153            .await
154            .unwrap();
155
156        let r = c.get_mut().shutdown().await;
157        if !cfg!(target_os = "macos") {
158            // https://github.com/tokio-rs/tokio/issues/4665
159            r.unwrap();
160        }
161
162        let mut at = 0;
163        while let Some(got) = c.next().await.transpose().unwrap() {
164            assert_eq!(at, got);
165            at += 1;
166        }
167        assert_eq!(at, n);
168    }
169}