1#![deny(missing_docs)]
18#[cfg(not(any(feature = "futures", feature = "tokio")))]
19compile_error!("async-bincode: Enable at least one of \"futures\" or \"tokio\".");
20
21#[macro_use]
22mod reader;
23#[macro_use]
24mod stream;
25#[macro_use]
26mod writer;
27
28#[cfg(feature = "futures")]
29pub mod futures;
30#[cfg(feature = "tokio")]
31pub mod tokio;
32
33pub use crate::writer::{AsyncDestination, BincodeWriterFor, SyncDestination};
34
35#[cfg(all(test, feature = "futures"))]
36mod futures_tests {
37 use crate::futures::*;
38 use ::futures::prelude::*;
39 use ::macro_rules_attribute::apply;
40 use ::smol_macros::test;
41
42 #[apply(test!)]
43 async fn it_works() {
44 let echo = smol::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
45 let addr = echo.local_addr().unwrap();
46
47 let server = smol::spawn(async move {
48 let (stream, _) = echo.accept().await.unwrap();
49 let mut stream = AsyncBincodeStream::<_, usize, usize, _>::from(stream).for_async();
50 while let Some(item) = stream.next().await {
51 stream.send(item.unwrap()).await.unwrap();
52 }
53 });
54
55 let client = smol::net::TcpStream::connect(&addr).await.unwrap();
56 let mut client = AsyncBincodeStream::<_, usize, usize, _>::from(client).for_async();
57 client.send(42).await.unwrap();
58 assert_eq!(client.next().await.unwrap().unwrap(), 42);
59
60 client.send(44).await.unwrap();
61 assert_eq!(client.next().await.unwrap().unwrap(), 44);
62
63 drop(client);
64 server.await;
65 }
66
67 #[apply(test!)]
68 #[cfg_attr(target_os = "macos", ignore)]
77 async fn lots() {
78 let echo = smol::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
79 let addr = echo.local_addr().unwrap();
80
81 let server = smol::spawn(async move {
82 let (stream, _) = echo.accept().await.unwrap();
83 let mut stream = AsyncBincodeStream::<_, usize, usize, _>::from(stream).for_async();
84 while let Some(item) = stream.next().await {
85 stream.send(item.unwrap()).await.unwrap();
86 }
87 });
88
89 let n = 81920;
90 let stream = smol::net::TcpStream::connect(&addr).await.unwrap();
91 let mut c = AsyncBincodeStream::from(stream).for_async();
92
93 ::futures::stream::iter(0usize..n)
94 .map(Ok)
95 .forward(&mut c)
96 .await
97 .unwrap();
98
99 c.get_mut().flush().await.unwrap();
100 c.get_mut().shutdown(smol::net::Shutdown::Write).unwrap();
101
102 let mut at = 0;
103 while let Some(got) = c.next().await.transpose().unwrap() {
104 assert_eq!(at, got);
105 at += 1;
106 }
107 assert_eq!(at, n);
108
109 server.await;
110 }
111}
112
113#[cfg(all(test, feature = "tokio"))]
114mod tokio_tests {
115 use crate::tokio::*;
116 use futures::prelude::*;
117 use tokio::io::{AsyncWriteExt, BufStream};
118
119 #[tokio::test]
120 async fn it_works() {
121 let echo = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
122 let addr = echo.local_addr().unwrap();
123
124 tokio::spawn(async move {
125 let (stream, _) = echo.accept().await.unwrap();
126 let mut stream = AsyncBincodeStream::<_, usize, usize, _>::from(stream).for_async();
127 let (r, w) = stream.tcp_split();
128 r.map_err(Box::new)
129 .map_err(Box::<dyn std::error::Error>::from)
130 .forward(w.sink_map_err(Box::new).sink_err_into())
131 .await
132 .unwrap();
133 });
134
135 let client = tokio::net::TcpStream::connect(&addr).await.unwrap();
136 let mut client = AsyncBincodeStream::<_, usize, usize, _>::from(client).for_async();
137 client.send(42).await.unwrap();
138 assert_eq!(client.next().await.unwrap().unwrap(), 42);
139
140 client.send(44).await.unwrap();
141 assert_eq!(client.next().await.unwrap().unwrap(), 44);
142
143 drop(client);
144 }
145
146 #[test]
147 fn sync_destination() {
148 let echo = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
149 let addr = echo.local_addr().unwrap();
150
151 let tokio = tokio::runtime::Runtime::new().unwrap();
152 let server = tokio.spawn(async move {
153 echo.set_nonblocking(true).unwrap();
154 let echo = tokio::net::TcpListener::from_std(echo).unwrap();
155 let (stream, _) = echo.accept().await.unwrap();
156 let mut w = AsyncBincodeWriter::<_, i32, _>::from(stream);
157 for i in 0..20 {
158 w.send(i).await.unwrap();
159 }
160 w.close().await.unwrap();
161 });
162
163 let mut client = std::io::BufReader::new(std::net::TcpStream::connect(&addr).unwrap());
164 for i in 0..20 {
165 let goti: i32 =
166 bincode::decode_from_reader(&mut client, bincode::config::standard()).unwrap();
167 assert_eq!(goti, i);
168 }
169
170 bincode::decode_from_reader::<i32, _, _>(&mut client, bincode::config::standard())
172 .unwrap_err();
173 drop(client);
174
175 tokio.block_on(server).unwrap();
176 }
177
178 #[tokio::test]
179 async fn lots() {
180 let echo = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
181 let addr = echo.local_addr().unwrap();
182
183 tokio::spawn(async move {
184 let (stream, _) = echo.accept().await.unwrap();
185 let stream =
186 AsyncBincodeStream::<_, usize, usize, _>::from(BufStream::new(stream)).for_async();
187 let (w, r) = stream.split();
188 r.map_err(Box::new)
189 .map_err(Box::<dyn std::error::Error>::from)
190 .forward(w.sink_map_err(Box::new).sink_err_into())
191 .await
192 .unwrap();
193 });
194
195 let n = 81920;
196 let stream = tokio::net::TcpStream::connect(&addr).await.unwrap();
197 let mut c = AsyncBincodeStream::from(BufStream::new(stream)).for_async();
198
199 ::futures::stream::iter(0usize..n)
200 .map(Ok)
201 .forward(&mut c)
202 .await
203 .unwrap();
204
205 let r = c.get_mut().shutdown().await;
206 if !cfg!(target_os = "macos") {
207 r.unwrap();
209 }
210
211 let mut at = 0;
212 while let Some(got) = c.next().await.transpose().unwrap() {
213 assert_eq!(at, got);
214 at += 1;
215 }
216 assert_eq!(at, n);
217 }
218}