async_bincode/
lib.rs

1//! Asynchronous access to a bincode-encoded item stream.
2//!
3//! This crate enables you to asynchronously read from a bincode-encoded stream, or write
4//! bincoded-encoded values. `bincode` does not support this natively, as it cannot easily [resume
5//! from stream errors while encoding or decoding](https://github.com/TyOverby/bincode/issues/229).
6//!
7//! `async-bincode` works around that on the receive side by buffering received bytes until a full
8//! element's worth of data has been received, and only then calling into bincode. To make this
9//! work, it relies on the sender to prefix each encoded element with its encoded size.
10//!
11//! On the write side, `async-bincode` buffers the serialized values, and asynchronously sends the
12//! resulting bytestream.
13//!
14//! This crate provides two sets of types in two separate modules. The types in the `futures`
15//! module work with the `futures_io`/`async-std` ecosystem. The types in the `tokio` module work
16//! with the `tokio` ecosystem.
17#![deny(missing_docs)]
18#[cfg(not(any(feature = "futures", feature = "tokio")))]
19compile_error!("async-bincode: Enable at least one of \"futures\" or \"tokio\".");
20
21#[macro_use]
22mod reader;
23#[macro_use]
24mod stream;
25#[macro_use]
26mod writer;
27
28#[cfg(feature = "futures")]
29pub mod futures;
30#[cfg(feature = "tokio")]
31pub mod tokio;
32
33pub use crate::writer::{AsyncDestination, BincodeWriterFor, SyncDestination};
34
35#[cfg(all(test, feature = "futures"))]
36mod futures_tests {
37    use crate::futures::*;
38    use ::futures::prelude::*;
39    use ::macro_rules_attribute::apply;
40    use ::smol_macros::test;
41
42    #[apply(test!)]
43    async fn it_works() {
44        let echo = smol::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
45        let addr = echo.local_addr().unwrap();
46
47        let server = smol::spawn(async move {
48            let (stream, _) = echo.accept().await.unwrap();
49            let mut stream = AsyncBincodeStream::<_, usize, usize, _>::from(stream).for_async();
50            while let Some(item) = stream.next().await {
51                stream.send(item.unwrap()).await.unwrap();
52            }
53        });
54
55        let client = smol::net::TcpStream::connect(&addr).await.unwrap();
56        let mut client = AsyncBincodeStream::<_, usize, usize, _>::from(client).for_async();
57        client.send(42).await.unwrap();
58        assert_eq!(client.next().await.unwrap().unwrap(), 42);
59
60        client.send(44).await.unwrap();
61        assert_eq!(client.next().await.unwrap().unwrap(), 44);
62
63        drop(client);
64        server.await;
65    }
66
67    #[apply(test!)]
68    // apparently this breaks smol...
69    // the flush/shutdown fails with
70    //
71    //   `Result::unwrap()` on an `Err` value: Os { code: 57, kind: NotConnected, message: "Socket is not connected" }
72    //
73    // and send fails in the "server" fail with
74    //
75    //   `Result::unwrap()` on an `Err` value: Io { inner: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }, index: 0 }
76    #[cfg_attr(target_os = "macos", ignore)]
77    async fn lots() {
78        let echo = smol::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
79        let addr = echo.local_addr().unwrap();
80
81        let server = smol::spawn(async move {
82            let (stream, _) = echo.accept().await.unwrap();
83            let mut stream = AsyncBincodeStream::<_, usize, usize, _>::from(stream).for_async();
84            while let Some(item) = stream.next().await {
85                stream.send(item.unwrap()).await.unwrap();
86            }
87        });
88
89        let n = 81920;
90        let stream = smol::net::TcpStream::connect(&addr).await.unwrap();
91        let mut c = AsyncBincodeStream::from(stream).for_async();
92
93        ::futures::stream::iter(0usize..n)
94            .map(Ok)
95            .forward(&mut c)
96            .await
97            .unwrap();
98
99        c.get_mut().flush().await.unwrap();
100        c.get_mut().shutdown(smol::net::Shutdown::Write).unwrap();
101
102        let mut at = 0;
103        while let Some(got) = c.next().await.transpose().unwrap() {
104            assert_eq!(at, got);
105            at += 1;
106        }
107        assert_eq!(at, n);
108
109        server.await;
110    }
111}
112
113#[cfg(all(test, feature = "tokio"))]
114mod tokio_tests {
115    use crate::tokio::*;
116    use futures::prelude::*;
117    use tokio::io::{AsyncWriteExt, BufStream};
118
119    #[tokio::test]
120    async fn it_works() {
121        let echo = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
122        let addr = echo.local_addr().unwrap();
123
124        tokio::spawn(async move {
125            let (stream, _) = echo.accept().await.unwrap();
126            let mut stream = AsyncBincodeStream::<_, usize, usize, _>::from(stream).for_async();
127            let (r, w) = stream.tcp_split();
128            r.map_err(Box::new)
129                .map_err(Box::<dyn std::error::Error>::from)
130                .forward(w.sink_map_err(Box::new).sink_err_into())
131                .await
132                .unwrap();
133        });
134
135        let client = tokio::net::TcpStream::connect(&addr).await.unwrap();
136        let mut client = AsyncBincodeStream::<_, usize, usize, _>::from(client).for_async();
137        client.send(42).await.unwrap();
138        assert_eq!(client.next().await.unwrap().unwrap(), 42);
139
140        client.send(44).await.unwrap();
141        assert_eq!(client.next().await.unwrap().unwrap(), 44);
142
143        drop(client);
144    }
145
146    #[test]
147    fn sync_destination() {
148        let echo = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
149        let addr = echo.local_addr().unwrap();
150
151        let tokio = tokio::runtime::Runtime::new().unwrap();
152        let server = tokio.spawn(async move {
153            echo.set_nonblocking(true).unwrap();
154            let echo = tokio::net::TcpListener::from_std(echo).unwrap();
155            let (stream, _) = echo.accept().await.unwrap();
156            let mut w = AsyncBincodeWriter::<_, i32, _>::from(stream);
157            for i in 0..20 {
158                w.send(i).await.unwrap();
159            }
160            w.close().await.unwrap();
161        });
162
163        let mut client = std::io::BufReader::new(std::net::TcpStream::connect(&addr).unwrap());
164        for i in 0..20 {
165            let goti: i32 =
166                bincode::decode_from_reader(&mut client, bincode::config::standard()).unwrap();
167            assert_eq!(goti, i);
168        }
169
170        // another read should now fail
171        bincode::decode_from_reader::<i32, _, _>(&mut client, bincode::config::standard())
172            .unwrap_err();
173        drop(client);
174
175        tokio.block_on(server).unwrap();
176    }
177
178    #[tokio::test]
179    async fn lots() {
180        let echo = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
181        let addr = echo.local_addr().unwrap();
182
183        tokio::spawn(async move {
184            let (stream, _) = echo.accept().await.unwrap();
185            let stream =
186                AsyncBincodeStream::<_, usize, usize, _>::from(BufStream::new(stream)).for_async();
187            let (w, r) = stream.split();
188            r.map_err(Box::new)
189                .map_err(Box::<dyn std::error::Error>::from)
190                .forward(w.sink_map_err(Box::new).sink_err_into())
191                .await
192                .unwrap();
193        });
194
195        let n = 81920;
196        let stream = tokio::net::TcpStream::connect(&addr).await.unwrap();
197        let mut c = AsyncBincodeStream::from(BufStream::new(stream)).for_async();
198
199        ::futures::stream::iter(0usize..n)
200            .map(Ok)
201            .forward(&mut c)
202            .await
203            .unwrap();
204
205        let r = c.get_mut().shutdown().await;
206        if !cfg!(target_os = "macos") {
207            // https://github.com/tokio-rs/tokio/issues/4665
208            r.unwrap();
209        }
210
211        let mut at = 0;
212        while let Some(got) = c.next().await.transpose().unwrap() {
213            assert_eq!(at, got);
214            at += 1;
215        }
216        assert_eq!(at, n);
217    }
218}