tokio_util/codec/framed.rs
1use crate::codec::decoder::Decoder;
2use crate::codec::encoder::Encoder;
3use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
4
5use futures_core::Stream;
6use tokio::io::{AsyncRead, AsyncWrite};
7
8use bytes::BytesMut;
9use futures_sink::Sink;
10use pin_project_lite::pin_project;
11use std::fmt;
12use std::io;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pin_project! {
17 /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18 /// the `Encoder` and `Decoder` traits to encode and decode frames.
19 ///
20 /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
21 /// by using the `new` function seen below.
22 ///
23 /// # Cancellation safety
24 ///
25 /// * [`futures_util::sink::SinkExt::send`]: if send is used as the event in a
26 /// `tokio::select!` statement and some other branch completes first, then it is
27 /// guaranteed that the message was not sent, but the message itself is lost.
28 /// * [`tokio_stream::StreamExt::next`]: This method is cancel safe. The returned
29 /// future only holds onto a reference to the underlying stream, so dropping it will
30 /// never lose a value.
31 ///
32 /// [`Stream`]: futures_core::Stream
33 /// [`Sink`]: futures_sink::Sink
34 /// [`AsyncRead`]: tokio::io::AsyncRead
35 /// [`Decoder::framed`]: crate::codec::Decoder::framed()
36 /// [`futures_util::sink::SinkExt::send`]: futures_util::sink::SinkExt::send
37 /// [`tokio_stream::StreamExt::next`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.next
38 pub struct Framed<T, U> {
39 #[pin]
40 inner: FramedImpl<T, U, RWFrames>
41 }
42}
43
44impl<T, U> Framed<T, U>
45where
46 T: AsyncRead + AsyncWrite,
47{
48 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
49 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
50 ///
51 /// Raw I/O objects work with byte sequences, but higher-level code usually
52 /// wants to batch these into meaningful chunks, called "frames". This
53 /// method layers framing on top of an I/O object, by using the codec
54 /// traits to handle encoding and decoding of messages frames. Note that
55 /// the incoming and outgoing frame types may be distinct.
56 ///
57 /// This function returns a *single* object that is both [`Stream`] and
58 /// [`Sink`]; grouping this into a single object is often useful for layering
59 /// things like gzip or TLS, which require both read and write access to the
60 /// underlying object.
61 ///
62 /// If you want to work more directly with the streams and sink, consider
63 /// calling [`split`] on the `Framed` returned by this method, which will
64 /// break them into separate objects, allowing them to interact more easily.
65 ///
66 /// Note that, for some byte sources, the stream can be resumed after an EOF
67 /// by reading from it, even after it has returned `None`. Repeated attempts
68 /// to do so, without new data available, continue to return `None` without
69 /// creating more (closing) frames.
70 ///
71 /// [`Stream`]: futures_core::Stream
72 /// [`Sink`]: futures_sink::Sink
73 /// [`Decode`]: crate::codec::Decoder
74 /// [`Encoder`]: crate::codec::Encoder
75 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
76 pub fn new(inner: T, codec: U) -> Framed<T, U> {
77 Framed {
78 inner: FramedImpl {
79 inner,
80 codec,
81 state: Default::default(),
82 },
83 }
84 }
85
86 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
87 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data,
88 /// with a specific read buffer initial capacity.
89 ///
90 /// Raw I/O objects work with byte sequences, but higher-level code usually
91 /// wants to batch these into meaningful chunks, called "frames". This
92 /// method layers framing on top of an I/O object, by using the codec
93 /// traits to handle encoding and decoding of messages frames. Note that
94 /// the incoming and outgoing frame types may be distinct.
95 ///
96 /// This function returns a *single* object that is both [`Stream`] and
97 /// [`Sink`]; grouping this into a single object is often useful for layering
98 /// things like gzip or TLS, which require both read and write access to the
99 /// underlying object.
100 ///
101 /// If you want to work more directly with the streams and sink, consider
102 /// calling [`split`] on the `Framed` returned by this method, which will
103 /// break them into separate objects, allowing them to interact more easily.
104 ///
105 /// [`Stream`]: futures_core::Stream
106 /// [`Sink`]: futures_sink::Sink
107 /// [`Decode`]: crate::codec::Decoder
108 /// [`Encoder`]: crate::codec::Encoder
109 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
110 pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
111 Framed {
112 inner: FramedImpl {
113 inner,
114 codec,
115 state: RWFrames {
116 read: ReadFrame {
117 eof: false,
118 is_readable: false,
119 buffer: BytesMut::with_capacity(capacity),
120 has_errored: false,
121 },
122 write: WriteFrame::default(),
123 },
124 },
125 }
126 }
127}
128
129impl<T, U> Framed<T, U> {
130 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
131 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
132 ///
133 /// Raw I/O objects work with byte sequences, but higher-level code usually
134 /// wants to batch these into meaningful chunks, called "frames". This
135 /// method layers framing on top of an I/O object, by using the `Codec`
136 /// traits to handle encoding and decoding of messages frames. Note that
137 /// the incoming and outgoing frame types may be distinct.
138 ///
139 /// This function returns a *single* object that is both [`Stream`] and
140 /// [`Sink`]; grouping this into a single object is often useful for layering
141 /// things like gzip or TLS, which require both read and write access to the
142 /// underlying object.
143 ///
144 /// This objects takes a stream and a `readbuffer` and a `writebuffer`. These field
145 /// can be obtained from an existing `Framed` with the [`into_parts`] method.
146 ///
147 /// If you want to work more directly with the streams and sink, consider
148 /// calling [`split`] on the `Framed` returned by this method, which will
149 /// break them into separate objects, allowing them to interact more easily.
150 ///
151 /// [`Stream`]: futures_core::Stream
152 /// [`Sink`]: futures_sink::Sink
153 /// [`Decoder`]: crate::codec::Decoder
154 /// [`Encoder`]: crate::codec::Encoder
155 /// [`into_parts`]: crate::codec::Framed::into_parts()
156 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
157 pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
158 Framed {
159 inner: FramedImpl {
160 inner: parts.io,
161 codec: parts.codec,
162 state: RWFrames {
163 read: parts.read_buf.into(),
164 write: parts.write_buf.into(),
165 },
166 },
167 }
168 }
169
170 /// Returns a reference to the underlying I/O stream wrapped by
171 /// `Framed`.
172 ///
173 /// Note that care should be taken to not tamper with the underlying stream
174 /// of data coming in as it may corrupt the stream of frames otherwise
175 /// being worked with.
176 pub fn get_ref(&self) -> &T {
177 &self.inner.inner
178 }
179
180 /// Returns a mutable reference to the underlying I/O stream wrapped by
181 /// `Framed`.
182 ///
183 /// Note that care should be taken to not tamper with the underlying stream
184 /// of data coming in as it may corrupt the stream of frames otherwise
185 /// being worked with.
186 pub fn get_mut(&mut self) -> &mut T {
187 &mut self.inner.inner
188 }
189
190 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
191 /// `Framed`.
192 ///
193 /// Note that care should be taken to not tamper with the underlying stream
194 /// of data coming in as it may corrupt the stream of frames otherwise
195 /// being worked with.
196 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
197 self.project().inner.project().inner
198 }
199
200 /// Returns a reference to the underlying codec wrapped by
201 /// `Framed`.
202 ///
203 /// Note that care should be taken to not tamper with the underlying codec
204 /// as it may corrupt the stream of frames otherwise being worked with.
205 pub fn codec(&self) -> &U {
206 &self.inner.codec
207 }
208
209 /// Returns a mutable reference to the underlying codec wrapped by
210 /// `Framed`.
211 ///
212 /// Note that care should be taken to not tamper with the underlying codec
213 /// as it may corrupt the stream of frames otherwise being worked with.
214 pub fn codec_mut(&mut self) -> &mut U {
215 &mut self.inner.codec
216 }
217
218 /// Maps the codec `U` to `C`, preserving the read and write buffers
219 /// wrapped by `Framed`.
220 ///
221 /// Note that care should be taken to not tamper with the underlying codec
222 /// as it may corrupt the stream of frames otherwise being worked with.
223 pub fn map_codec<C, F>(self, map: F) -> Framed<T, C>
224 where
225 F: FnOnce(U) -> C,
226 {
227 // This could be potentially simplified once rust-lang/rust#86555 hits stable
228 let parts = self.into_parts();
229 Framed::from_parts(FramedParts {
230 io: parts.io,
231 codec: map(parts.codec),
232 read_buf: parts.read_buf,
233 write_buf: parts.write_buf,
234 _priv: (),
235 })
236 }
237
238 /// Returns a mutable reference to the underlying codec wrapped by
239 /// `Framed`.
240 ///
241 /// Note that care should be taken to not tamper with the underlying codec
242 /// as it may corrupt the stream of frames otherwise being worked with.
243 pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U {
244 self.project().inner.project().codec
245 }
246
247 /// Returns a reference to the read buffer.
248 pub fn read_buffer(&self) -> &BytesMut {
249 &self.inner.state.read.buffer
250 }
251
252 /// Returns a mutable reference to the read buffer.
253 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
254 &mut self.inner.state.read.buffer
255 }
256
257 /// Returns a reference to the write buffer.
258 pub fn write_buffer(&self) -> &BytesMut {
259 &self.inner.state.write.buffer
260 }
261
262 /// Returns a mutable reference to the write buffer.
263 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
264 &mut self.inner.state.write.buffer
265 }
266
267 /// Returns backpressure boundary
268 pub fn backpressure_boundary(&self) -> usize {
269 self.inner.state.write.backpressure_boundary
270 }
271
272 /// Updates backpressure boundary
273 pub fn set_backpressure_boundary(&mut self, boundary: usize) {
274 self.inner.state.write.backpressure_boundary = boundary;
275 }
276
277 /// Consumes the `Framed`, returning its underlying I/O stream.
278 ///
279 /// Note that care should be taken to not tamper with the underlying stream
280 /// of data coming in as it may corrupt the stream of frames otherwise
281 /// being worked with.
282 pub fn into_inner(self) -> T {
283 self.inner.inner
284 }
285
286 /// Consumes the `Framed`, returning its underlying I/O stream, the buffer
287 /// with unprocessed data, and the codec.
288 ///
289 /// Note that care should be taken to not tamper with the underlying stream
290 /// of data coming in as it may corrupt the stream of frames otherwise
291 /// being worked with.
292 pub fn into_parts(self) -> FramedParts<T, U> {
293 FramedParts {
294 io: self.inner.inner,
295 codec: self.inner.codec,
296 read_buf: self.inner.state.read.buffer,
297 write_buf: self.inner.state.write.buffer,
298 _priv: (),
299 }
300 }
301}
302
303// This impl just defers to the underlying FramedImpl
304impl<T, U> Stream for Framed<T, U>
305where
306 T: AsyncRead,
307 U: Decoder,
308{
309 type Item = Result<U::Item, U::Error>;
310
311 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312 self.project().inner.poll_next(cx)
313 }
314}
315
316// This impl just defers to the underlying FramedImpl
317impl<T, I, U> Sink<I> for Framed<T, U>
318where
319 T: AsyncWrite,
320 U: Encoder<I>,
321 U::Error: From<io::Error>,
322{
323 type Error = U::Error;
324
325 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
326 self.project().inner.poll_ready(cx)
327 }
328
329 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
330 self.project().inner.start_send(item)
331 }
332
333 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
334 self.project().inner.poll_flush(cx)
335 }
336
337 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
338 self.project().inner.poll_close(cx)
339 }
340}
341
342impl<T, U> fmt::Debug for Framed<T, U>
343where
344 T: fmt::Debug,
345 U: fmt::Debug,
346{
347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
348 f.debug_struct("Framed")
349 .field("io", self.get_ref())
350 .field("codec", self.codec())
351 .finish()
352 }
353}
354
355/// `FramedParts` contains an export of the data of a Framed transport.
356/// It can be used to construct a new [`Framed`] with a different codec.
357/// It contains all current buffers and the inner transport.
358///
359/// [`Framed`]: crate::codec::Framed
360#[derive(Debug)]
361#[allow(clippy::manual_non_exhaustive)]
362pub struct FramedParts<T, U> {
363 /// The inner transport used to read bytes to and write bytes to
364 pub io: T,
365
366 /// The codec
367 pub codec: U,
368
369 /// The buffer with read but unprocessed data.
370 pub read_buf: BytesMut,
371
372 /// A buffer with unprocessed data which are not written yet.
373 pub write_buf: BytesMut,
374
375 /// This private field allows us to add additional fields in the future in a
376 /// backwards compatible way.
377 _priv: (),
378}
379
380impl<T, U> FramedParts<T, U> {
381 /// Create a new, default, `FramedParts`
382 pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
383 where
384 U: Encoder<I>,
385 {
386 FramedParts {
387 io,
388 codec,
389 read_buf: BytesMut::new(),
390 write_buf: BytesMut::new(),
391 _priv: (),
392 }
393 }
394}