1#![allow(deprecated)]
2#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
22
23use futures::{prelude::*, ready};
24use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
25use std::{io, iter, pin::Pin, task::Context, task::Poll};
26
27#[deprecated(
28 note = "Will be removed in the next release, see https://github.com/libp2p/rust-libp2p/issues/4522 for details."
29)]
30#[derive(Debug, Copy, Clone)]
31pub struct DeflateConfig {
32 compression: flate2::Compression,
33}
34
35impl Default for DeflateConfig {
36 fn default() -> Self {
37 DeflateConfig {
38 compression: flate2::Compression::fast(),
39 }
40 }
41}
42
43impl UpgradeInfo for DeflateConfig {
44 type Info = &'static str;
45 type InfoIter = iter::Once<Self::Info>;
46
47 fn protocol_info(&self) -> Self::InfoIter {
48 iter::once("/deflate/1.0.0")
49 }
50}
51
52impl<C> InboundUpgrade<C> for DeflateConfig
53where
54 C: AsyncRead + AsyncWrite,
55{
56 type Output = DeflateOutput<C>;
57 type Error = io::Error;
58 type Future = future::Ready<Result<Self::Output, Self::Error>>;
59
60 fn upgrade_inbound(self, r: C, _: Self::Info) -> Self::Future {
61 future::ok(DeflateOutput::new(r, self.compression))
62 }
63}
64
65impl<C> OutboundUpgrade<C> for DeflateConfig
66where
67 C: AsyncRead + AsyncWrite,
68{
69 type Output = DeflateOutput<C>;
70 type Error = io::Error;
71 type Future = future::Ready<Result<Self::Output, Self::Error>>;
72
73 fn upgrade_outbound(self, w: C, _: Self::Info) -> Self::Future {
74 future::ok(DeflateOutput::new(w, self.compression))
75 }
76}
77
78#[derive(Debug)]
80pub struct DeflateOutput<S> {
81 inner: S,
83 compress: flate2::Compress,
85 decompress: flate2::Decompress,
87 write_out: Vec<u8>,
90 read_interm: Vec<u8>,
93 inner_read_eof: bool,
96}
97
98impl<S> DeflateOutput<S> {
99 fn new(inner: S, compression: flate2::Compression) -> Self {
100 DeflateOutput {
101 inner,
102 compress: flate2::Compress::new(compression, false),
103 decompress: flate2::Decompress::new(false),
104 write_out: Vec::with_capacity(256),
105 read_interm: Vec::with_capacity(256),
106 inner_read_eof: false,
107 }
108 }
109
110 fn flush_write_out(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>
113 where
114 S: AsyncWrite + Unpin,
115 {
116 loop {
117 if self.write_out.is_empty() {
118 return Poll::Ready(Ok(()));
119 }
120
121 match AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, &self.write_out) {
122 Poll::Ready(Ok(0)) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
123 Poll::Ready(Ok(n)) => self.write_out = self.write_out.split_off(n),
124 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
125 Poll::Pending => return Poll::Pending,
126 };
127 }
128 }
129}
130
131impl<S> AsyncRead for DeflateOutput<S>
132where
133 S: AsyncRead + Unpin,
134{
135 fn poll_read(
136 mut self: Pin<&mut Self>,
137 cx: &mut Context<'_>,
138 buf: &mut [u8],
139 ) -> Poll<Result<usize, io::Error>> {
140 let this = &mut *self;
143
144 loop {
145 if this.read_interm.is_empty() && !this.inner_read_eof {
147 this.read_interm
148 .resize(this.read_interm.capacity() + 256, 0);
149
150 match AsyncRead::poll_read(Pin::new(&mut this.inner), cx, &mut this.read_interm) {
151 Poll::Ready(Ok(0)) => {
152 this.inner_read_eof = true;
153 this.read_interm.clear();
154 }
155 Poll::Ready(Ok(n)) => this.read_interm.truncate(n),
156 Poll::Ready(Err(err)) => {
157 this.read_interm.clear();
158 return Poll::Ready(Err(err));
159 }
160 Poll::Pending => {
161 this.read_interm.clear();
162 return Poll::Pending;
163 }
164 }
165 }
166 debug_assert!(!this.read_interm.is_empty() || this.inner_read_eof);
167
168 let before_out = this.decompress.total_out();
169 let before_in = this.decompress.total_in();
170 let ret = this.decompress.decompress(
171 &this.read_interm,
172 buf,
173 if this.inner_read_eof {
174 flate2::FlushDecompress::Finish
175 } else {
176 flate2::FlushDecompress::None
177 },
178 )?;
179
180 let consumed = (this.decompress.total_in() - before_in) as usize;
182 this.read_interm = this.read_interm.split_off(consumed);
183
184 let read = (this.decompress.total_out() - before_out) as usize;
185 if read != 0 || ret == flate2::Status::StreamEnd {
186 return Poll::Ready(Ok(read));
187 }
188 }
189 }
190}
191
192impl<S> AsyncWrite for DeflateOutput<S>
193where
194 S: AsyncWrite + Unpin,
195{
196 fn poll_write(
197 mut self: Pin<&mut Self>,
198 cx: &mut Context<'_>,
199 buf: &[u8],
200 ) -> Poll<Result<usize, io::Error>> {
201 let this = &mut *self;
204
205 ready!(this.flush_write_out(cx))?;
208
209 if buf.is_empty() {
211 return Poll::Ready(Ok(0));
212 }
213
214 loop {
218 let before_in = this.compress.total_in();
219 this.write_out.reserve(256); let ret = this.compress.compress_vec(
221 buf,
222 &mut this.write_out,
223 flate2::FlushCompress::None,
224 )?;
225 let written = (this.compress.total_in() - before_in) as usize;
226
227 if written != 0 || ret == flate2::Status::StreamEnd {
228 return Poll::Ready(Ok(written));
229 }
230 }
231 }
232
233 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
234 let this = &mut *self;
237
238 ready!(this.flush_write_out(cx))?;
239 this.compress
240 .compress_vec(&[], &mut this.write_out, flate2::FlushCompress::Sync)?;
241
242 loop {
243 ready!(this.flush_write_out(cx))?;
244
245 debug_assert!(this.write_out.is_empty());
246 this.write_out.reserve(256); this.compress
249 .compress_vec(&[], &mut this.write_out, flate2::FlushCompress::None)?;
250 if this.write_out.is_empty() {
251 break;
252 }
253 }
254
255 AsyncWrite::poll_flush(Pin::new(&mut this.inner), cx)
256 }
257
258 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
259 let this = &mut *self;
262
263 loop {
264 ready!(this.flush_write_out(cx))?;
265
266 debug_assert!(this.write_out.is_empty());
268 this.write_out.reserve(256); this.compress
270 .compress_vec(&[], &mut this.write_out, flate2::FlushCompress::Finish)?;
271 if this.write_out.is_empty() {
272 break;
273 }
274 }
275
276 AsyncWrite::poll_close(Pin::new(&mut this.inner), cx)
277 }
278}