1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//! Types which operate over [`Stream`](futures_core::stream::Stream)`<Item =
//! `[`io::Result`](std::io::Result)`<`[`Bytes`](bytes_05::Bytes)`>>` streams, both encoders and
//! decoders for various formats.
//!
//! The `Stream` is treated as a single byte-stream to be compressed/decompressed, each item is a
//! chunk of data from this byte-stream. There is not guaranteed to be a one-to-one relationship
//! between chunks of data from the underlying stream and the resulting compressed/decompressed
//! stream, the encoders and decoders will buffer the incoming data and choose their own boundaries
//! at which to yield a new item.
//!
//! # Deprecation Migration
//!
//! This feature and module was deprecated because it's choosing one point in a large solution
//! space of "stream of byte chunks" to represent an IO data stream, and the conversion between
//! these solutions and standard IO data streams like `futures::io::AsyncBufRead` /
//! `tokio::io::AsyncBufRead` should be zero-cost.
//!
//! ```rust
//! use bytes_05::Bytes;
//! use futures::{stream::Stream, TryStreamExt};
//! use std::io::Result;
//!
//! /// For code that looks like this, choose one of the options below to replace it
//! fn from(
//!     input: impl Stream<Item = Result<bytes_05::Bytes>>,
//! ) -> impl Stream<Item = Result<bytes_05::Bytes>> {
//!     #[allow(deprecated)]
//!     async_compression::stream::GzipEncoder::new(input)
//! }
//!
//! /// Direct replacement with `tokio` v0.2 and `bytes` v0.5 using `tokio-util` v0.3
//! fn tokio_02_bytes_05(
//!     input: impl Stream<Item = Result<bytes_05::Bytes>>,
//! ) -> impl Stream<Item = Result<bytes_05::Bytes>> {
//!     tokio_util_03::codec::FramedRead::new(
//!         async_compression::tokio_02::bufread::GzipEncoder::new(
//!             tokio_02::io::stream_reader(input),
//!         ),
//!         tokio_util_03::codec::BytesCodec::new(),
//!     ).map_ok(|bytes| bytes.freeze())
//! }
//!
//! /// Upgrade replacement with `tokio` v0.3 and `bytes` v0.5 using `tokio-util` v0.4
//! fn tokio_03_bytes_05(
//!     input: impl Stream<Item = Result<bytes_05::Bytes>>,
//! ) -> impl Stream<Item = Result<bytes_05::Bytes>> {
//!     tokio_util_04::io::ReaderStream::new(
//!         async_compression::tokio_03::bufread::GzipEncoder::new(
//!             tokio_util_04::io::StreamReader::new(input),
//!         ),
//!     )
//! }
//!
//! /// Upgrade replacement with `tokio` v0.3 and `bytes` v0.6 using `tokio-util` v0.5
//! fn tokio_03_bytes_06(
//!     input: impl Stream<Item = Result<bytes_06::Bytes>>,
//! ) -> impl Stream<Item = Result<bytes_06::Bytes>> {
//!     tokio_util_05::io::ReaderStream::new(
//!         async_compression::tokio_03::bufread::GzipEncoder::new(
//!             tokio_util_05::io::StreamReader::new(input),
//!         ),
//!     )
//! }
//!
//! /// Upgrade replacement with `tokio` v1.0 and `bytes` v1.0 using `tokio-util` v0.6
//! fn tokio_bytes(
//!     input: impl Stream<Item = Result<bytes::Bytes>>,
//! ) -> impl Stream<Item = Result<bytes::Bytes>> {
//!     tokio_util_06::io::ReaderStream::new(
//!         async_compression::tokio::bufread::GzipEncoder::new(
//!             tokio_util_06::io::StreamReader::new(input),
//!         ),
//!     )
//! }
//!
//! /// What if you didn't want anything to do with `bytes`, but just a `Vec<u8>` instead?
//! fn futures_vec(
//!     input: impl Stream<Item = Result<Vec<u8>>> + Unpin,
//! ) -> impl Stream<Item = Result<Vec<u8>>> {
//!     use futures::io::AsyncReadExt;
//!
//!     futures::stream::try_unfold(
//!         async_compression::futures::bufread::GzipEncoder::new(input.into_async_read()),
//!         |mut encoder| async move {
//!             let mut chunk = vec![0; 8 * 1024];
//!             let len = encoder.read(&mut chunk).await?;
//!             if len == 0 {
//!                 Ok(None)
//!             } else {
//!                 chunk.truncate(len);
//!                 Ok(Some((chunk, encoder)))
//!             }
//!         })
//! }
//! #
//! # futures::executor::block_on(async {
//! #     let data = || futures::stream::iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]);
//! #     let expected: Vec<Vec<u8>> = from(data().map_ok(bytes_05::Bytes::from))
//! #         .map_ok(|bytes| bytes.as_ref().into())
//! #         .try_collect()
//! #         .await?;
//! #
//! #     assert_eq!(
//! #         expected,
//! #         tokio_02_bytes_05(data().map_ok(bytes_05::Bytes::from))
//! #             .map_ok(|bytes| bytes.as_ref().into())
//! #             .try_collect::<Vec<Vec<u8>>>()
//! #             .await?,
//! #     );
//! #     assert_eq!(
//! #         expected,
//! #         tokio_03_bytes_05(data().map_ok(bytes_05::Bytes::from))
//! #             .map_ok(|bytes| bytes.as_ref().into())
//! #             .try_collect::<Vec<Vec<u8>>>()
//! #             .await?,
//! #     );
//! #     assert_eq!(
//! #         expected,
//! #         tokio_03_bytes_06(data().map_ok(bytes_06::Bytes::from))
//! #             .map_ok(|bytes| bytes.as_ref().into())
//! #             .try_collect::<Vec<Vec<u8>>>()
//! #             .await?,
//! #     );
//! #     assert_eq!(
//! #         expected,
//! #         tokio_bytes(data().map_ok(bytes::Bytes::from))
//! #             .map_ok(|bytes| bytes.as_ref().into())
//! #             .try_collect::<Vec<Vec<u8>>>()
//! #             .await?,
//! #     );
//! #     assert_eq!(
//! #         expected,
//! #         futures_vec(data())
//! #             .try_collect::<Vec<Vec<u8>>>()
//! #             .await?
//! #     );
//! #     Ok::<_, std::io::Error>(())
//! # })?; Ok::<_, std::io::Error>(())
//! ```

#![deprecated(
    since = "0.3.8",
    note = "See `async-compression::stream` docs for migration"
)]

#[macro_use]
mod macros;
mod generic;

pub(crate) use self::generic::{Decoder, Encoder};

algos!(stream<S>);