tokio_util/codec/
mod.rs

1//! Adaptors from `AsyncRead`/`AsyncWrite` to Stream/Sink
2//!
3//! Raw I/O objects work with byte sequences, but higher-level code usually
4//! wants to batch these into meaningful chunks, called "frames".
5//!
6//! This module contains adapters to go from streams of bytes, [`AsyncRead`] and
7//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
8//! Framed streams are also known as transports.
9//!
10//! # Example encoding using `LinesCodec`
11//!
12//! The following example demonstrates how to use a codec such as [`LinesCodec`] to
13//! write framed data. [`FramedWrite`] can be used to achieve this. Data sent to
14//! [`FramedWrite`] are first framed according to a specific codec, and then sent to
15//! an implementor of [`AsyncWrite`].
16//!
17//! ```
18//! use futures::sink::SinkExt;
19//! use tokio_util::codec::LinesCodec;
20//! use tokio_util::codec::FramedWrite;
21//!
22//! #[tokio::main]
23//! async fn main() {
24//!     let buffer = Vec::new();
25//!     let messages = vec!["Hello", "World"];
26//!     let encoder = LinesCodec::new();
27//!
28//!     // FramedWrite is a sink which means you can send values into it
29//!     // asynchronously.
30//!     let mut writer = FramedWrite::new(buffer, encoder);
31//!
32//!     // To be able to send values into a FramedWrite, you need to bring the
33//!     // `SinkExt` trait into scope.
34//!     writer.send(messages[0]).await.unwrap();
35//!     writer.send(messages[1]).await.unwrap();
36//!
37//!     let buffer = writer.get_ref();
38//!
39//!     assert_eq!(buffer.as_slice(), "Hello\nWorld\n".as_bytes());
40//! }
41//!```
42//!
43//! # Example decoding using `LinesCodec`
44//! The following example demonstrates how to use a codec such as [`LinesCodec`] to
45//! read a stream of framed data. [`FramedRead`] can be used to achieve this. [`FramedRead`]
46//! will keep reading from an [`AsyncRead`] implementor until a whole frame, according to a codec,
47//! can be parsed.
48//!
49//!```
50//! use tokio_stream::StreamExt;
51//! use tokio_util::codec::LinesCodec;
52//! use tokio_util::codec::FramedRead;
53//!
54//! #[tokio::main]
55//! async fn main() {
56//!     let message = "Hello\nWorld".as_bytes();
57//!     let decoder = LinesCodec::new();
58//!
59//!     // FramedRead can be used to read a stream of values that are framed according to
60//!     // a codec. FramedRead will read from its input (here `buffer`) until a whole frame
61//!     // can be parsed.
62//!     let mut reader = FramedRead::new(message, decoder);
63//!
64//!     // To read values from a FramedRead, you need to bring the
65//!     // `StreamExt` trait into scope.
66//!     let frame1 = reader.next().await.unwrap().unwrap();
67//!     let frame2 = reader.next().await.unwrap().unwrap();
68//!
69//!     assert!(reader.next().await.is_none());
70//!     assert_eq!(frame1, "Hello");
71//!     assert_eq!(frame2, "World");
72//! }
73//! ```
74//!
75//! # The Decoder trait
76//!
77//! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an
78//! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify
79//! how sequences of bytes are turned into a sequence of frames, and to
80//! determine where the boundaries between frames are.  The job of the
81//! `FramedRead` is to repeatedly switch between reading more data from the IO
82//! resource, and asking the decoder whether we have received enough data to
83//! decode another frame of data.
84//!
85//! The main method on the `Decoder` trait is the [`decode`] method. This method
86//! takes as argument the data that has been read so far, and when it is called,
87//! it will be in one of the following situations:
88//!
89//!  1. The buffer contains less than a full frame.
90//!  2. The buffer contains exactly a full frame.
91//!  3. The buffer contains more than a full frame.
92//!
93//! In the first situation, the decoder should return `Ok(None)`.
94//!
95//! In the second situation, the decoder should clear the provided buffer and
96//! return `Ok(Some(the_decoded_frame))`.
97//!
98//! In the third situation, the decoder should use a method such as [`split_to`]
99//! or [`advance`] to modify the buffer such that the frame is removed from the
100//! buffer, but any data in the buffer after that frame should still remain in
101//! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in
102//! this case.
103//!
104//! Finally the decoder may return an error if the data is invalid in some way.
105//! The decoder should _not_ return an error just because it has yet to receive
106//! a full frame.
107//!
108//! It is guaranteed that, from one call to `decode` to another, the provided
109//! buffer will contain the exact same data as before, except that if more data
110//! has arrived through the IO resource, that data will have been appended to
111//! the buffer.  This means that reading frames from a `FramedRead` is
112//! essentially equivalent to the following loop:
113//!
114//! ```no_run
115//! use tokio::io::AsyncReadExt;
116//! # // This uses async_stream to create an example that compiles.
117//! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! {
118//! # use tokio_util::codec::Decoder;
119//! # let mut decoder = tokio_util::codec::BytesCodec::new();
120//! # let io_resource = &mut &[0u8, 1, 2, 3][..];
121//!
122//! let mut buf = bytes::BytesMut::new();
123//! loop {
124//!     // The read_buf call will append to buf rather than overwrite existing data.
125//!     let len = io_resource.read_buf(&mut buf).await?;
126//!
127//!     if len == 0 {
128//!         while let Some(frame) = decoder.decode_eof(&mut buf)? {
129//!             yield frame;
130//!         }
131//!         break;
132//!     }
133//!
134//!     while let Some(frame) = decoder.decode(&mut buf)? {
135//!         yield frame;
136//!     }
137//! }
138//! # }}
139//! ```
140//! The example above uses `yield` whenever the `Stream` produces an item.
141//!
142//! ## Example decoder
143//!
144//! As an example, consider a protocol that can be used to send strings where
145//! each frame is a four byte integer that contains the length of the frame,
146//! followed by that many bytes of string data. The decoder fails with an error
147//! if the string data is not valid utf-8 or too long.
148//!
149//! Such a decoder can be written like this:
150//! ```
151//! use tokio_util::codec::Decoder;
152//! use bytes::{BytesMut, Buf};
153//!
154//! struct MyStringDecoder {}
155//!
156//! const MAX: usize = 8 * 1024 * 1024;
157//!
158//! impl Decoder for MyStringDecoder {
159//!     type Item = String;
160//!     type Error = std::io::Error;
161//!
162//!     fn decode(
163//!         &mut self,
164//!         src: &mut BytesMut
165//!     ) -> Result<Option<Self::Item>, Self::Error> {
166//!         if src.len() < 4 {
167//!             // Not enough data to read length marker.
168//!             return Ok(None);
169//!         }
170//!
171//!         // Read length marker.
172//!         let mut length_bytes = [0u8; 4];
173//!         length_bytes.copy_from_slice(&src[..4]);
174//!         let length = u32::from_le_bytes(length_bytes) as usize;
175//!
176//!         // Check that the length is not too large to avoid a denial of
177//!         // service attack where the server runs out of memory.
178//!         if length > MAX {
179//!             return Err(std::io::Error::new(
180//!                 std::io::ErrorKind::InvalidData,
181//!                 format!("Frame of length {} is too large.", length)
182//!             ));
183//!         }
184//!
185//!         if src.len() < 4 + length {
186//!             // The full string has not yet arrived.
187//!             //
188//!             // We reserve more space in the buffer. This is not strictly
189//!             // necessary, but is a good idea performance-wise.
190//!             src.reserve(4 + length - src.len());
191//!
192//!             // We inform the Framed that we need more bytes to form the next
193//!             // frame.
194//!             return Ok(None);
195//!         }
196//!
197//!         // Use advance to modify src such that it no longer contains
198//!         // this frame.
199//!         let data = src[4..4 + length].to_vec();
200//!         src.advance(4 + length);
201//!
202//!         // Convert the data to a string, or fail if it is not valid utf-8.
203//!         match String::from_utf8(data) {
204//!             Ok(string) => Ok(Some(string)),
205//!             Err(utf8_error) => {
206//!                 Err(std::io::Error::new(
207//!                     std::io::ErrorKind::InvalidData,
208//!                     utf8_error.utf8_error(),
209//!                 ))
210//!             },
211//!         }
212//!     }
213//! }
214//! ```
215//!
216//! # The Encoder trait
217//!
218//! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn
219//! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to
220//! specify how frames are turned into a sequences of bytes.  The job of the
221//! `FramedWrite` is to take the resulting sequence of bytes and write it to the
222//! IO resource.
223//!
224//! The main method on the `Encoder` trait is the [`encode`] method. This method
225//! takes an item that is being written, and a buffer to write the item to. The
226//! buffer may already contain data, and in this case, the encoder should append
227//! the new frame the to buffer rather than overwrite the existing data.
228//!
229//! It is guaranteed that, from one call to `encode` to another, the provided
230//! buffer will contain the exact same data as before, except that some of the
231//! data may have been removed from the front of the buffer. Writing to a
232//! `FramedWrite` is essentially equivalent to the following loop:
233//!
234//! ```no_run
235//! use tokio::io::AsyncWriteExt;
236//! use bytes::Buf; // for advance
237//! # use tokio_util::codec::Encoder;
238//! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() }
239//! # async fn no_more_frames() { }
240//! # #[tokio::main] async fn main() -> std::io::Result<()> {
241//! # let mut io_resource = tokio::io::sink();
242//! # let mut encoder = tokio_util::codec::BytesCodec::new();
243//!
244//! const MAX: usize = 8192;
245//!
246//! let mut buf = bytes::BytesMut::new();
247//! loop {
248//!     tokio::select! {
249//!         num_written = io_resource.write(&buf), if !buf.is_empty() => {
250//!             buf.advance(num_written?);
251//!         },
252//!         frame = next_frame(), if buf.len() < MAX => {
253//!             encoder.encode(frame, &mut buf)?;
254//!         },
255//!         _ = no_more_frames() => {
256//!             io_resource.write_all(&buf).await?;
257//!             io_resource.shutdown().await?;
258//!             return Ok(());
259//!         },
260//!     }
261//! }
262//! # }
263//! ```
264//! Here the `next_frame` method corresponds to any frames you write to the
265//! `FramedWrite`. The `no_more_frames` method corresponds to closing the
266//! `FramedWrite` with [`SinkExt::close`].
267//!
268//! ## Example encoder
269//!
270//! As an example, consider a protocol that can be used to send strings where
271//! each frame is a four byte integer that contains the length of the frame,
272//! followed by that many bytes of string data. The encoder will fail if the
273//! string is too long.
274//!
275//! Such an encoder can be written like this:
276//! ```
277//! use tokio_util::codec::Encoder;
278//! use bytes::BytesMut;
279//!
280//! struct MyStringEncoder {}
281//!
282//! const MAX: usize = 8 * 1024 * 1024;
283//!
284//! impl Encoder<String> for MyStringEncoder {
285//!     type Error = std::io::Error;
286//!
287//!     fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
288//!         // Don't send a string if it is longer than the other end will
289//!         // accept.
290//!         if item.len() > MAX {
291//!             return Err(std::io::Error::new(
292//!                 std::io::ErrorKind::InvalidData,
293//!                 format!("Frame of length {} is too large.", item.len())
294//!             ));
295//!         }
296//!
297//!         // Convert the length into a byte array.
298//!         // The cast to u32 cannot overflow due to the length check above.
299//!         let len_slice = u32::to_le_bytes(item.len() as u32);
300//!
301//!         // Reserve space in the buffer.
302//!         dst.reserve(4 + item.len());
303//!
304//!         // Write the length and string to the buffer.
305//!         dst.extend_from_slice(&len_slice);
306//!         dst.extend_from_slice(item.as_bytes());
307//!         Ok(())
308//!     }
309//! }
310//! ```
311//!
312//! [`AsyncRead`]: tokio::io::AsyncRead
313//! [`AsyncWrite`]: tokio::io::AsyncWrite
314//! [`Stream`]: futures_core::Stream
315//! [`Sink`]: futures_sink::Sink
316//! [`SinkExt`]: futures::sink::SinkExt
317//! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close
318//! [`FramedRead`]: struct@crate::codec::FramedRead
319//! [`FramedWrite`]: struct@crate::codec::FramedWrite
320//! [`Framed`]: struct@crate::codec::Framed
321//! [`Decoder`]: trait@crate::codec::Decoder
322//! [`decode`]: fn@crate::codec::Decoder::decode
323//! [`encode`]: fn@crate::codec::Encoder::encode
324//! [`split_to`]: fn@bytes::BytesMut::split_to
325//! [`advance`]: fn@bytes::Buf::advance
326
327mod bytes_codec;
328pub use self::bytes_codec::BytesCodec;
329
330mod decoder;
331pub use self::decoder::Decoder;
332
333mod encoder;
334pub use self::encoder::Encoder;
335
336mod framed_impl;
337#[allow(unused_imports)]
338pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
339
340mod framed;
341pub use self::framed::{Framed, FramedParts};
342
343mod framed_read;
344pub use self::framed_read::FramedRead;
345
346mod framed_write;
347pub use self::framed_write::FramedWrite;
348
349pub mod length_delimited;
350pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError};
351
352mod lines_codec;
353pub use self::lines_codec::{LinesCodec, LinesCodecError};
354
355mod any_delimiter_codec;
356pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError};