tokio_util/codec/
length_delimited.rs

1//! Frame a stream of bytes based on a length prefix
2//!
3//! Many protocols delimit their frames by prefacing frame data with a
4//! frame head that specifies the length of the frame. The
5//! `length_delimited` module provides utilities for handling the length
6//! based framing. This allows the consumer to work with entire frames
7//! without having to worry about buffering or other framing logic.
8//!
9//! # Getting started
10//!
11//! If implementing a protocol from scratch, using length delimited framing
12//! is an easy way to get started. [`LengthDelimitedCodec::new()`] will
13//! return a length delimited codec using default configuration values.
14//! This can then be used to construct a framer to adapt a full-duplex
15//! byte stream into a stream of frames.
16//!
17//! ```
18//! use tokio::io::{AsyncRead, AsyncWrite};
19//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
20//!
21//! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T)
22//!     -> Framed<T, LengthDelimitedCodec>
23//! {
24//!     Framed::new(io, LengthDelimitedCodec::new())
25//! }
26//! # pub fn main() {}
27//! ```
28//!
29//! The returned transport implements `Sink + Stream` for `BytesMut`. It
30//! encodes the frame with a big-endian `u32` header denoting the frame
31//! payload length:
32//!
33//! ```text
34//! +----------+--------------------------------+
35//! | len: u32 |          frame payload         |
36//! +----------+--------------------------------+
37//! ```
38//!
39//! Specifically, given the following:
40//!
41//! ```
42//! use tokio::io::{AsyncRead, AsyncWrite};
43//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
44//!
45//! use futures::SinkExt;
46//! use bytes::Bytes;
47//!
48//! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>>
49//! where
50//!     T: AsyncRead + AsyncWrite + Unpin,
51//! {
52//!     let mut transport = Framed::new(io, LengthDelimitedCodec::new());
53//!     let frame = Bytes::from("hello world");
54//!
55//!     transport.send(frame).await?;
56//!     Ok(())
57//! }
58//! ```
59//!
60//! The encoded frame will look like this:
61//!
62//! ```text
63//! +---- len: u32 ----+---- data ----+
64//! | \x00\x00\x00\x0b |  hello world |
65//! +------------------+--------------+
66//! ```
67//!
68//! # Decoding
69//!
70//! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`],
71//! such that each yielded [`BytesMut`] value contains the contents of an
72//! entire frame. There are many configuration parameters enabling
73//! [`FramedRead`] to handle a wide range of protocols. Here are some
74//! examples that will cover the various options at a high level.
75//!
76//! ## Example 1
77//!
78//! The following will parse a `u16` length field at offset 0, omitting the
79//! frame head in the yielded `BytesMut`.
80//!
81//! ```
82//! # use tokio_stream::StreamExt;
83//! # use tokio_util::codec::LengthDelimitedCodec;
84//! # #[tokio::main]
85//! # async fn main() {
86//! # let io: &[u8] = b"\x00\x0BHello world";
87//! let mut reader = LengthDelimitedCodec::builder()
88//!     .length_field_offset(0) // default value
89//!     .length_field_type::<u16>()
90//!     .length_adjustment(0)   // default value
91//!     .new_read(io);
92//! # let res = reader.next().await.unwrap().unwrap().to_vec();
93//! # assert_eq!(res, b"Hello world");
94//! # }
95//! ```
96//!
97//! The following frame will be decoded as such:
98//!
99//! ```text
100//!          INPUT                        DECODED
101//! +-- len ---+--- Payload ---+     +--- Payload ---+
102//! | \x00\x0B |  Hello world  | --> |  Hello world  |
103//! +----------+---------------+     +---------------+
104//! ```
105//!
106//! The value of the length field is 11 (`\x0B`) which represents the length
107//! of the payload, `hello world`. By default, [`FramedRead`] assumes that
108//! the length field represents the number of bytes that **follows** the
109//! length field. Thus, the entire frame has a length of 13: 2 bytes for the
110//! frame head + 11 bytes for the payload.
111//!
112//! ## Example 2
113//!
114//! The following will parse a `u16` length field at offset 0, including the
115//! frame head in the yielded `BytesMut`.
116//!
117//! ```
118//! # use tokio_stream::StreamExt;
119//! # use tokio_util::codec::LengthDelimitedCodec;
120//! # #[tokio::main]
121//! # async fn main() {
122//! # let io: &[u8] = b"\x00\x0BHello world";
123//! let mut reader = LengthDelimitedCodec::builder()
124//!     .length_field_offset(0) // default value
125//!     .length_field_type::<u16>()
126//!     .length_adjustment(2)   // Add head size to length
127//!     .num_skip(0)            // Do NOT skip the head
128//!     .new_read(io);
129//! # let res = reader.next().await.unwrap().unwrap().to_vec();
130//! # assert_eq!(res, b"\x00\x0BHello world");
131//! # }
132//! ```
133//!
134//! The following frame will be decoded as such:
135//!
136//! ```text
137//!          INPUT                           DECODED
138//! +-- len ---+--- Payload ---+     +-- len ---+--- Payload ---+
139//! | \x00\x0B |  Hello world  | --> | \x00\x0B |  Hello world  |
140//! +----------+---------------+     +----------+---------------+
141//! ```
142//!
143//! This is similar to the first example, the only difference is that the
144//! frame head is **included** in the yielded `BytesMut` value. To achieve
145//! this, we need to add the header size to the length with `length_adjustment`,
146//! and set `num_skip` to `0` to prevent skipping the head.
147//!
148//! ## Example 3
149//!
150//! The following will parse a `u16` length field at offset 0, omitting the
151//! frame head in the yielded `BytesMut`. In this case, the length field
152//! **includes** the frame head length.
153//!
154//! ```
155//! # use tokio_stream::StreamExt;
156//! # use tokio_util::codec::LengthDelimitedCodec;
157//! # #[tokio::main]
158//! # async fn main() {
159//! # let io: &[u8] = b"\x00\x0DHello world";
160//! let mut reader = LengthDelimitedCodec::builder()
161//!     .length_field_offset(0) // default value
162//!     .length_field_type::<u16>()
163//!     .length_adjustment(-2)  // size of head
164//!     .new_read(io);
165//! # let res = reader.next().await.unwrap().unwrap().to_vec();
166//! # assert_eq!(res, b"Hello world");
167//! # }
168//! ```
169//!
170//! The following frame will be decoded as such:
171//!
172//! ```text
173//!          INPUT                           DECODED
174//! +-- len ---+--- Payload ---+     +--- Payload ---+
175//! | \x00\x0D |  Hello world  | --> |  Hello world  |
176//! +----------+---------------+     +---------------+
177//! ```
178//!
179//! In most cases, the length field represents the length of the payload
180//! only, as shown in the previous examples. However, in some protocols the
181//! length field represents the length of the whole frame, including the
182//! head. In such cases, we specify a negative `length_adjustment` to adjust
183//! the value provided in the frame head to represent the payload length.
184//!
185//! ## Example 4
186//!
187//! The following will parse a 3 byte length field at offset 0 in a 5 byte
188//! frame head, including the frame head in the yielded `BytesMut`.
189//!
190//! ```
191//! # use tokio_stream::StreamExt;
192//! # use tokio_util::codec::LengthDelimitedCodec;
193//! # #[tokio::main]
194//! # async fn main() {
195//! # let io: &[u8] = b"\x00\x00\x0B\xCA\xFEHello world";
196//! let mut reader = LengthDelimitedCodec::builder()
197//!     .length_field_offset(0) // default value
198//!     .length_field_length(3)
199//!     .length_adjustment(3 + 2)  // len field and remaining head
200//!     .num_skip(0)
201//!     .new_read(io);
202//! # let res = reader.next().await.unwrap().unwrap().to_vec();
203//! # assert_eq!(res, b"\x00\x00\x0B\xCA\xFEHello world");
204//! # }
205//! ```
206//!
207//! The following frame will be decoded as such:
208//!
209//! ```text
210//!                  INPUT
211//! +---- len -----+- head -+--- Payload ---+
212//! | \x00\x00\x0B | \xCAFE |  Hello world  |
213//! +--------------+--------+---------------+
214//!
215//!                  DECODED
216//! +---- len -----+- head -+--- Payload ---+
217//! | \x00\x00\x0B | \xCAFE |  Hello world  |
218//! +--------------+--------+---------------+
219//! ```
220//!
221//! A more advanced example that shows a case where there is extra frame
222//! head data between the length field and the payload. In such cases, it is
223//! usually desirable to include the frame head as part of the yielded
224//! `BytesMut`. This lets consumers of the length delimited framer to
225//! process the frame head as needed.
226//!
227//! The positive `length_adjustment` value lets `FramedRead` factor in the
228//! additional head into the frame length calculation.
229//!
230//! ## Example 5
231//!
232//! The following will parse a `u16` length field at offset 1 of a 4 byte
233//! frame head. The first byte and the length field will be omitted from the
234//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
235//! included.
236//!
237//! ```
238//! # use tokio_stream::StreamExt;
239//! # use tokio_util::codec::LengthDelimitedCodec;
240//! # #[tokio::main]
241//! # async fn main() {
242//! # let io: &[u8] = b"\xCA\x00\x0B\xFEHello world";
243//! let mut reader = LengthDelimitedCodec::builder()
244//!     .length_field_offset(1) // length of hdr1
245//!     .length_field_type::<u16>()
246//!     .length_adjustment(1)  // length of hdr2
247//!     .num_skip(3) // length of hdr1 + LEN
248//!     .new_read(io);
249//! # let res = reader.next().await.unwrap().unwrap().to_vec();
250//! # assert_eq!(res, b"\xFEHello world");
251//! # }
252//! ```
253//!
254//! The following frame will be decoded as such:
255//!
256//! ```text
257//!                  INPUT
258//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
259//! |  \xCA  | \x00\x0B |  \xFE  |  Hello world  |
260//! +--------+----------+--------+---------------+
261//!
262//!          DECODED
263//! +- hdr2 -+--- Payload ---+
264//! |  \xFE  |  Hello world  |
265//! +--------+---------------+
266//! ```
267//!
268//! The length field is situated in the middle of the frame head. In this
269//! case, the first byte in the frame head could be a version or some other
270//! identifier that is not needed for processing. On the other hand, the
271//! second half of the head is needed.
272//!
273//! `length_field_offset` indicates how many bytes to skip before starting
274//! to read the length field.  `length_adjustment` is the number of bytes to
275//! skip starting at the end of the length field. In this case, it is the
276//! second half of the head.
277//!
278//! ## Example 6
279//!
280//! The following will parse a `u16` length field at offset 1 of a 4 byte
281//! frame head. The first byte and the length field will be omitted from the
282//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
283//! included. In this case, the length field **includes** the frame head
284//! length.
285//!
286//! ```
287//! # use tokio_stream::StreamExt;
288//! # use tokio_util::codec::LengthDelimitedCodec;
289//! # #[tokio::main]
290//! # async fn main() {
291//! # let io: &[u8] = b"\xCA\x00\x0F\xFEHello world";
292//! let mut reader = LengthDelimitedCodec::builder()
293//!     .length_field_offset(1) // length of hdr1
294//!     .length_field_type::<u16>()
295//!     .length_adjustment(-3)  // length of hdr1 + LEN, negative
296//!     .num_skip(3)
297//!     .new_read(io);
298//! # let res = reader.next().await.unwrap().unwrap().to_vec();
299//! # assert_eq!(res, b"\xFEHello world");
300//! # }
301//! ```
302//!
303//! The following frame will be decoded as such:
304//!
305//! ```text
306//!                  INPUT
307//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
308//! |  \xCA  | \x00\x0F |  \xFE  |  Hello world  |
309//! +--------+----------+--------+---------------+
310//!
311//!          DECODED
312//! +- hdr2 -+--- Payload ---+
313//! |  \xFE  |  Hello world  |
314//! +--------+---------------+
315//! ```
316//!
317//! Similar to the example above, the difference is that the length field
318//! represents the length of the entire frame instead of just the payload.
319//! The length of `hdr1` and `len` must be counted in `length_adjustment`.
320//! Note that the length of `hdr2` does **not** need to be explicitly set
321//! anywhere because it already is factored into the total frame length that
322//! is read from the byte stream.
323//!
324//! ## Example 7
325//!
326//! The following will parse a 3 byte length field at offset 0 in a 4 byte
327//! frame head, excluding the 4th byte from the yielded `BytesMut`.
328//!
329//! ```
330//! # use tokio_stream::StreamExt;
331//! # use tokio_util::codec::LengthDelimitedCodec;
332//! # #[tokio::main]
333//! # async fn main() {
334//! # let io: &[u8] = b"\x00\x00\x0B\xFFHello world";
335//! let mut reader = LengthDelimitedCodec::builder()
336//!     .length_field_offset(0) // default value
337//!     .length_field_length(3)
338//!     .length_adjustment(0)  // default value
339//!     .num_skip(4) // skip the first 4 bytes
340//!     .new_read(io);
341//! # let res = reader.next().await.unwrap().unwrap().to_vec();
342//! # assert_eq!(res, b"Hello world");
343//! # }
344//! ```
345//!
346//! The following frame will be decoded as such:
347//!
348//! ```text
349//!                  INPUT                       DECODED
350//! +------- len ------+--- Payload ---+    +--- Payload ---+
351//! | \x00\x00\x0B\xFF |  Hello world  | => |  Hello world  |
352//! +------------------+---------------+    +---------------+
353//! ```
354//!
355//! A simple example where there are unused bytes between the length field
356//! and the payload.
357//!
358//! # Encoding
359//!
360//! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`],
361//! such that each submitted [`BytesMut`] is prefaced by a length field.
362//! There are fewer configuration options than [`FramedRead`]. Given
363//! protocols that have more complex frame heads, an encoder should probably
364//! be written by hand using [`Encoder`].
365//!
366//! Here is a simple example, given a `FramedWrite` with the following
367//! configuration:
368//!
369//! ```
370//! # use tokio::io::AsyncWrite;
371//! # use tokio_util::codec::LengthDelimitedCodec;
372//! # fn write_frame<T: AsyncWrite>(io: T) {
373//! # let _ =
374//! LengthDelimitedCodec::builder()
375//!     .length_field_type::<u16>()
376//!     .new_write(io);
377//! # }
378//! # pub fn main() {}
379//! ```
380//!
381//! A payload of `hello world` will be encoded as:
382//!
383//! ```text
384//! +- len: u16 -+---- data ----+
385//! |  \x00\x0b  |  hello world |
386//! +------------+--------------+
387//! ```
388//!
389//! [`LengthDelimitedCodec::new()`]: method@LengthDelimitedCodec::new
390//! [`FramedRead`]: struct@FramedRead
391//! [`FramedWrite`]: struct@FramedWrite
392//! [`AsyncRead`]: trait@tokio::io::AsyncRead
393//! [`AsyncWrite`]: trait@tokio::io::AsyncWrite
394//! [`Encoder`]: trait@Encoder
395//! [`BytesMut`]: bytes::BytesMut
396
397use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite};
398
399use tokio::io::{AsyncRead, AsyncWrite};
400
401use bytes::{Buf, BufMut, Bytes, BytesMut};
402use std::error::Error as StdError;
403use std::io::{self, Cursor};
404use std::{cmp, fmt, mem};
405
406/// Configure length delimited `LengthDelimitedCodec`s.
407///
408/// `Builder` enables constructing configured length delimited codecs. Note
409/// that not all configuration settings apply to both encoding and decoding. See
410/// the documentation for specific methods for more detail.
411///
412/// Note that the if the value of [`Builder::max_frame_length`] becomes larger than
413/// what can actually fit in [`Builder::length_field_length`], it will be clipped to
414/// the maximum value that can fit.
415#[derive(Debug, Clone, Copy)]
416pub struct Builder {
417    // Maximum frame length
418    max_frame_len: usize,
419
420    // Number of bytes representing the field length
421    length_field_len: usize,
422
423    // Number of bytes in the header before the length field
424    length_field_offset: usize,
425
426    // Adjust the length specified in the header field by this amount
427    length_adjustment: isize,
428
429    // Total number of bytes to skip before reading the payload, if not set,
430    // `length_field_len + length_field_offset`
431    num_skip: Option<usize>,
432
433    // Length field byte order (little or big endian)
434    length_field_is_big_endian: bool,
435}
436
437/// An error when the number of bytes read is more than max frame length.
438pub struct LengthDelimitedCodecError {
439    _priv: (),
440}
441
442/// A codec for frames delimited by a frame head specifying their lengths.
443///
444/// This allows the consumer to work with entire frames without having to worry
445/// about buffering or other framing logic.
446///
447/// See [module level] documentation for more detail.
448///
449/// [module level]: index.html
450#[derive(Debug, Clone)]
451pub struct LengthDelimitedCodec {
452    // Configuration values
453    builder: Builder,
454
455    // Read state
456    state: DecodeState,
457}
458
459#[derive(Debug, Clone, Copy)]
460enum DecodeState {
461    Head,
462    Data(usize),
463}
464
465// ===== impl LengthDelimitedCodec ======
466
467impl LengthDelimitedCodec {
468    /// Creates a new `LengthDelimitedCodec` with the default configuration values.
469    pub fn new() -> Self {
470        Self {
471            builder: Builder::new(),
472            state: DecodeState::Head,
473        }
474    }
475
476    /// Creates a new length delimited codec builder with default configuration
477    /// values.
478    pub fn builder() -> Builder {
479        Builder::new()
480    }
481
482    /// Returns the current max frame setting
483    ///
484    /// This is the largest size this codec will accept from the wire. Larger
485    /// frames will be rejected.
486    pub fn max_frame_length(&self) -> usize {
487        self.builder.max_frame_len
488    }
489
490    /// Updates the max frame setting.
491    ///
492    /// The change takes effect the next time a frame is decoded. In other
493    /// words, if a frame is currently in process of being decoded with a frame
494    /// size greater than `val` but less than the max frame length in effect
495    /// before calling this function, then the frame will be allowed.
496    pub fn set_max_frame_length(&mut self, val: usize) {
497        self.builder.max_frame_length(val);
498    }
499
500    fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> {
501        let head_len = self.builder.num_head_bytes();
502        let field_len = self.builder.length_field_len;
503
504        if src.len() < head_len {
505            // Not enough data
506            return Ok(None);
507        }
508
509        let n = {
510            let mut src = Cursor::new(&mut *src);
511
512            // Skip the required bytes
513            src.advance(self.builder.length_field_offset);
514
515            // match endianness
516            let n = if self.builder.length_field_is_big_endian {
517                src.get_uint(field_len)
518            } else {
519                src.get_uint_le(field_len)
520            };
521
522            if n > self.builder.max_frame_len as u64 {
523                return Err(io::Error::new(
524                    io::ErrorKind::InvalidData,
525                    LengthDelimitedCodecError { _priv: () },
526                ));
527            }
528
529            // The check above ensures there is no overflow
530            let n = n as usize;
531
532            // Adjust `n` with bounds checking
533            let n = if self.builder.length_adjustment < 0 {
534                n.checked_sub(-self.builder.length_adjustment as usize)
535            } else {
536                n.checked_add(self.builder.length_adjustment as usize)
537            };
538
539            // Error handling
540            match n {
541                Some(n) => n,
542                None => {
543                    return Err(io::Error::new(
544                        io::ErrorKind::InvalidInput,
545                        "provided length would overflow after adjustment",
546                    ));
547                }
548            }
549        };
550
551        src.advance(self.builder.get_num_skip());
552
553        // Ensure that the buffer has enough space to read the incoming
554        // payload
555        src.reserve(n.saturating_sub(src.len()));
556
557        Ok(Some(n))
558    }
559
560    fn decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut> {
561        // At this point, the buffer has already had the required capacity
562        // reserved. All there is to do is read.
563        if src.len() < n {
564            return None;
565        }
566
567        Some(src.split_to(n))
568    }
569}
570
571impl Decoder for LengthDelimitedCodec {
572    type Item = BytesMut;
573    type Error = io::Error;
574
575    fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
576        let n = match self.state {
577            DecodeState::Head => match self.decode_head(src)? {
578                Some(n) => {
579                    self.state = DecodeState::Data(n);
580                    n
581                }
582                None => return Ok(None),
583            },
584            DecodeState::Data(n) => n,
585        };
586
587        match self.decode_data(n, src) {
588            Some(data) => {
589                // Update the decode state
590                self.state = DecodeState::Head;
591
592                // Make sure the buffer has enough space to read the next head
593                src.reserve(self.builder.num_head_bytes().saturating_sub(src.len()));
594
595                Ok(Some(data))
596            }
597            None => Ok(None),
598        }
599    }
600}
601
602impl Encoder<Bytes> for LengthDelimitedCodec {
603    type Error = io::Error;
604
605    fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> {
606        let n = data.len();
607
608        if n > self.builder.max_frame_len {
609            return Err(io::Error::new(
610                io::ErrorKind::InvalidInput,
611                LengthDelimitedCodecError { _priv: () },
612            ));
613        }
614
615        // Adjust `n` with bounds checking
616        let n = if self.builder.length_adjustment < 0 {
617            n.checked_add(-self.builder.length_adjustment as usize)
618        } else {
619            n.checked_sub(self.builder.length_adjustment as usize)
620        };
621
622        let n = n.ok_or_else(|| {
623            io::Error::new(
624                io::ErrorKind::InvalidInput,
625                "provided length would overflow after adjustment",
626            )
627        })?;
628
629        // Reserve capacity in the destination buffer to fit the frame and
630        // length field (plus adjustment).
631        dst.reserve(self.builder.length_field_len + n);
632
633        if self.builder.length_field_is_big_endian {
634            dst.put_uint(n as u64, self.builder.length_field_len);
635        } else {
636            dst.put_uint_le(n as u64, self.builder.length_field_len);
637        }
638
639        // Write the frame to the buffer
640        dst.extend_from_slice(&data[..]);
641
642        Ok(())
643    }
644}
645
646impl Default for LengthDelimitedCodec {
647    fn default() -> Self {
648        Self::new()
649    }
650}
651
652// ===== impl Builder =====
653
654mod builder {
655    /// Types that can be used with `Builder::length_field_type`.
656    pub trait LengthFieldType {}
657
658    impl LengthFieldType for u8 {}
659    impl LengthFieldType for u16 {}
660    impl LengthFieldType for u32 {}
661    impl LengthFieldType for u64 {}
662
663    #[cfg(any(
664        target_pointer_width = "16",
665        target_pointer_width = "32",
666        target_pointer_width = "64",
667    ))]
668    impl LengthFieldType for usize {}
669}
670
671impl Builder {
672    /// Creates a new length delimited codec builder with default configuration
673    /// values.
674    ///
675    /// # Examples
676    ///
677    /// ```
678    /// # use tokio::io::AsyncRead;
679    /// use tokio_util::codec::LengthDelimitedCodec;
680    ///
681    /// # fn bind_read<T: AsyncRead>(io: T) {
682    /// LengthDelimitedCodec::builder()
683    ///     .length_field_offset(0)
684    ///     .length_field_type::<u16>()
685    ///     .length_adjustment(0)
686    ///     .num_skip(0)
687    ///     .new_read(io);
688    /// # }
689    /// # pub fn main() {}
690    /// ```
691    pub fn new() -> Builder {
692        Builder {
693            // Default max frame length of 8MB
694            max_frame_len: 8 * 1_024 * 1_024,
695
696            // Default byte length of 4
697            length_field_len: 4,
698
699            // Default to the header field being at the start of the header.
700            length_field_offset: 0,
701
702            length_adjustment: 0,
703
704            // Total number of bytes to skip before reading the payload, if not set,
705            // `length_field_len + length_field_offset`
706            num_skip: None,
707
708            // Default to reading the length field in network (big) endian.
709            length_field_is_big_endian: true,
710        }
711    }
712
713    /// Read the length field as a big endian integer
714    ///
715    /// This is the default setting.
716    ///
717    /// This configuration option applies to both encoding and decoding.
718    ///
719    /// # Examples
720    ///
721    /// ```
722    /// # use tokio::io::AsyncRead;
723    /// use tokio_util::codec::LengthDelimitedCodec;
724    ///
725    /// # fn bind_read<T: AsyncRead>(io: T) {
726    /// LengthDelimitedCodec::builder()
727    ///     .big_endian()
728    ///     .new_read(io);
729    /// # }
730    /// # pub fn main() {}
731    /// ```
732    pub fn big_endian(&mut self) -> &mut Self {
733        self.length_field_is_big_endian = true;
734        self
735    }
736
737    /// Read the length field as a little endian integer
738    ///
739    /// The default setting is big endian.
740    ///
741    /// This configuration option applies to both encoding and decoding.
742    ///
743    /// # Examples
744    ///
745    /// ```
746    /// # use tokio::io::AsyncRead;
747    /// use tokio_util::codec::LengthDelimitedCodec;
748    ///
749    /// # fn bind_read<T: AsyncRead>(io: T) {
750    /// LengthDelimitedCodec::builder()
751    ///     .little_endian()
752    ///     .new_read(io);
753    /// # }
754    /// # pub fn main() {}
755    /// ```
756    pub fn little_endian(&mut self) -> &mut Self {
757        self.length_field_is_big_endian = false;
758        self
759    }
760
761    /// Read the length field as a native endian integer
762    ///
763    /// The default setting is big endian.
764    ///
765    /// This configuration option applies to both encoding and decoding.
766    ///
767    /// # Examples
768    ///
769    /// ```
770    /// # use tokio::io::AsyncRead;
771    /// use tokio_util::codec::LengthDelimitedCodec;
772    ///
773    /// # fn bind_read<T: AsyncRead>(io: T) {
774    /// LengthDelimitedCodec::builder()
775    ///     .native_endian()
776    ///     .new_read(io);
777    /// # }
778    /// # pub fn main() {}
779    /// ```
780    pub fn native_endian(&mut self) -> &mut Self {
781        if cfg!(target_endian = "big") {
782            self.big_endian()
783        } else {
784            self.little_endian()
785        }
786    }
787
788    /// Sets the max frame length in bytes
789    ///
790    /// This configuration option applies to both encoding and decoding. The
791    /// default value is 8MB.
792    ///
793    /// When decoding, the length field read from the byte stream is checked
794    /// against this setting **before** any adjustments are applied. When
795    /// encoding, the length of the submitted payload is checked against this
796    /// setting.
797    ///
798    /// When frames exceed the max length, an `io::Error` with the custom value
799    /// of the `LengthDelimitedCodecError` type will be returned.
800    ///
801    /// # Examples
802    ///
803    /// ```
804    /// # use tokio::io::AsyncRead;
805    /// use tokio_util::codec::LengthDelimitedCodec;
806    ///
807    /// # fn bind_read<T: AsyncRead>(io: T) {
808    /// LengthDelimitedCodec::builder()
809    ///     .max_frame_length(8 * 1024 * 1024)
810    ///     .new_read(io);
811    /// # }
812    /// # pub fn main() {}
813    /// ```
814    pub fn max_frame_length(&mut self, val: usize) -> &mut Self {
815        self.max_frame_len = val;
816        self
817    }
818
819    /// Sets the unsigned integer type used to represent the length field.
820    ///
821    /// The default type is [`u32`]. The max type is [`u64`] (or [`usize`] on
822    /// 64-bit targets).
823    ///
824    /// # Examples
825    ///
826    /// ```
827    /// # use tokio::io::AsyncRead;
828    /// use tokio_util::codec::LengthDelimitedCodec;
829    ///
830    /// # fn bind_read<T: AsyncRead>(io: T) {
831    /// LengthDelimitedCodec::builder()
832    ///     .length_field_type::<u32>()
833    ///     .new_read(io);
834    /// # }
835    /// # pub fn main() {}
836    /// ```
837    ///
838    /// Unlike [`Builder::length_field_length`], this does not fail at runtime
839    /// and instead produces a compile error:
840    ///
841    /// ```compile_fail
842    /// # use tokio::io::AsyncRead;
843    /// # use tokio_util::codec::LengthDelimitedCodec;
844    /// # fn bind_read<T: AsyncRead>(io: T) {
845    /// LengthDelimitedCodec::builder()
846    ///     .length_field_type::<u128>()
847    ///     .new_read(io);
848    /// # }
849    /// # pub fn main() {}
850    /// ```
851    pub fn length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self {
852        self.length_field_length(mem::size_of::<T>())
853    }
854
855    /// Sets the number of bytes used to represent the length field
856    ///
857    /// The default value is `4`. The max value is `8`.
858    ///
859    /// This configuration option applies to both encoding and decoding.
860    ///
861    /// # Examples
862    ///
863    /// ```
864    /// # use tokio::io::AsyncRead;
865    /// use tokio_util::codec::LengthDelimitedCodec;
866    ///
867    /// # fn bind_read<T: AsyncRead>(io: T) {
868    /// LengthDelimitedCodec::builder()
869    ///     .length_field_length(4)
870    ///     .new_read(io);
871    /// # }
872    /// # pub fn main() {}
873    /// ```
874    pub fn length_field_length(&mut self, val: usize) -> &mut Self {
875        assert!(val > 0 && val <= 8, "invalid length field length");
876        self.length_field_len = val;
877        self
878    }
879
880    /// Sets the number of bytes in the header before the length field
881    ///
882    /// This configuration option only applies to decoding.
883    ///
884    /// # Examples
885    ///
886    /// ```
887    /// # use tokio::io::AsyncRead;
888    /// use tokio_util::codec::LengthDelimitedCodec;
889    ///
890    /// # fn bind_read<T: AsyncRead>(io: T) {
891    /// LengthDelimitedCodec::builder()
892    ///     .length_field_offset(1)
893    ///     .new_read(io);
894    /// # }
895    /// # pub fn main() {}
896    /// ```
897    pub fn length_field_offset(&mut self, val: usize) -> &mut Self {
898        self.length_field_offset = val;
899        self
900    }
901
902    /// Delta between the payload length specified in the header and the real
903    /// payload length
904    ///
905    /// # Examples
906    ///
907    /// ```
908    /// # use tokio::io::AsyncRead;
909    /// use tokio_util::codec::LengthDelimitedCodec;
910    ///
911    /// # fn bind_read<T: AsyncRead>(io: T) {
912    /// LengthDelimitedCodec::builder()
913    ///     .length_adjustment(-2)
914    ///     .new_read(io);
915    /// # }
916    /// # pub fn main() {}
917    /// ```
918    pub fn length_adjustment(&mut self, val: isize) -> &mut Self {
919        self.length_adjustment = val;
920        self
921    }
922
923    /// Sets the number of bytes to skip before reading the payload
924    ///
925    /// Default value is `length_field_len + length_field_offset`
926    ///
927    /// This configuration option only applies to decoding
928    ///
929    /// # Examples
930    ///
931    /// ```
932    /// # use tokio::io::AsyncRead;
933    /// use tokio_util::codec::LengthDelimitedCodec;
934    ///
935    /// # fn bind_read<T: AsyncRead>(io: T) {
936    /// LengthDelimitedCodec::builder()
937    ///     .num_skip(4)
938    ///     .new_read(io);
939    /// # }
940    /// # pub fn main() {}
941    /// ```
942    pub fn num_skip(&mut self, val: usize) -> &mut Self {
943        self.num_skip = Some(val);
944        self
945    }
946
947    /// Create a configured length delimited `LengthDelimitedCodec`
948    ///
949    /// # Examples
950    ///
951    /// ```
952    /// use tokio_util::codec::LengthDelimitedCodec;
953    /// # pub fn main() {
954    /// LengthDelimitedCodec::builder()
955    ///     .length_field_offset(0)
956    ///     .length_field_type::<u16>()
957    ///     .length_adjustment(0)
958    ///     .num_skip(0)
959    ///     .new_codec();
960    /// # }
961    /// ```
962    pub fn new_codec(&self) -> LengthDelimitedCodec {
963        let mut builder = *self;
964
965        builder.adjust_max_frame_len();
966
967        LengthDelimitedCodec {
968            builder,
969            state: DecodeState::Head,
970        }
971    }
972
973    /// Create a configured length delimited `FramedRead`
974    ///
975    /// # Examples
976    ///
977    /// ```
978    /// # use tokio::io::AsyncRead;
979    /// use tokio_util::codec::LengthDelimitedCodec;
980    ///
981    /// # fn bind_read<T: AsyncRead>(io: T) {
982    /// LengthDelimitedCodec::builder()
983    ///     .length_field_offset(0)
984    ///     .length_field_type::<u16>()
985    ///     .length_adjustment(0)
986    ///     .num_skip(0)
987    ///     .new_read(io);
988    /// # }
989    /// # pub fn main() {}
990    /// ```
991    pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec>
992    where
993        T: AsyncRead,
994    {
995        FramedRead::new(upstream, self.new_codec())
996    }
997
998    /// Create a configured length delimited `FramedWrite`
999    ///
1000    /// # Examples
1001    ///
1002    /// ```
1003    /// # use tokio::io::AsyncWrite;
1004    /// # use tokio_util::codec::LengthDelimitedCodec;
1005    /// # fn write_frame<T: AsyncWrite>(io: T) {
1006    /// LengthDelimitedCodec::builder()
1007    ///     .length_field_type::<u16>()
1008    ///     .new_write(io);
1009    /// # }
1010    /// # pub fn main() {}
1011    /// ```
1012    pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec>
1013    where
1014        T: AsyncWrite,
1015    {
1016        FramedWrite::new(inner, self.new_codec())
1017    }
1018
1019    /// Create a configured length delimited `Framed`
1020    ///
1021    /// # Examples
1022    ///
1023    /// ```
1024    /// # use tokio::io::{AsyncRead, AsyncWrite};
1025    /// # use tokio_util::codec::LengthDelimitedCodec;
1026    /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
1027    /// # let _ =
1028    /// LengthDelimitedCodec::builder()
1029    ///     .length_field_type::<u16>()
1030    ///     .new_framed(io);
1031    /// # }
1032    /// # pub fn main() {}
1033    /// ```
1034    pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec>
1035    where
1036        T: AsyncRead + AsyncWrite,
1037    {
1038        Framed::new(inner, self.new_codec())
1039    }
1040
1041    fn num_head_bytes(&self) -> usize {
1042        let num = self.length_field_offset + self.length_field_len;
1043        cmp::max(num, self.num_skip.unwrap_or(0))
1044    }
1045
1046    fn get_num_skip(&self) -> usize {
1047        self.num_skip
1048            .unwrap_or(self.length_field_offset + self.length_field_len)
1049    }
1050
1051    fn adjust_max_frame_len(&mut self) {
1052        // Calculate the maximum number that can be represented using `length_field_len` bytes.
1053        let max_number = match 1u64.checked_shl((8 * self.length_field_len) as u32) {
1054            Some(shl) => shl - 1,
1055            None => u64::MAX,
1056        };
1057
1058        let max_allowed_len = max_number.saturating_add_signed(self.length_adjustment as i64);
1059
1060        if self.max_frame_len as u64 > max_allowed_len {
1061            self.max_frame_len = usize::try_from(max_allowed_len).unwrap_or(usize::MAX);
1062        }
1063    }
1064}
1065
1066impl Default for Builder {
1067    fn default() -> Self {
1068        Self::new()
1069    }
1070}
1071
1072// ===== impl LengthDelimitedCodecError =====
1073
1074impl fmt::Debug for LengthDelimitedCodecError {
1075    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1076        f.debug_struct("LengthDelimitedCodecError").finish()
1077    }
1078}
1079
1080impl fmt::Display for LengthDelimitedCodecError {
1081    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1082        f.write_str("frame size too big")
1083    }
1084}
1085
1086impl StdError for LengthDelimitedCodecError {}