tokio_util/codec/length_delimited.rs
1//! Frame a stream of bytes based on a length prefix
2//!
3//! Many protocols delimit their frames by prefacing frame data with a
4//! frame head that specifies the length of the frame. The
5//! `length_delimited` module provides utilities for handling the length
6//! based framing. This allows the consumer to work with entire frames
7//! without having to worry about buffering or other framing logic.
8//!
9//! # Getting started
10//!
11//! If implementing a protocol from scratch, using length delimited framing
12//! is an easy way to get started. [`LengthDelimitedCodec::new()`] will
13//! return a length delimited codec using default configuration values.
14//! This can then be used to construct a framer to adapt a full-duplex
15//! byte stream into a stream of frames.
16//!
17//! ```
18//! use tokio::io::{AsyncRead, AsyncWrite};
19//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
20//!
21//! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T)
22//! -> Framed<T, LengthDelimitedCodec>
23//! {
24//! Framed::new(io, LengthDelimitedCodec::new())
25//! }
26//! # pub fn main() {}
27//! ```
28//!
29//! The returned transport implements `Sink + Stream` for `BytesMut`. It
30//! encodes the frame with a big-endian `u32` header denoting the frame
31//! payload length:
32//!
33//! ```text
34//! +----------+--------------------------------+
35//! | len: u32 | frame payload |
36//! +----------+--------------------------------+
37//! ```
38//!
39//! Specifically, given the following:
40//!
41//! ```
42//! use tokio::io::{AsyncRead, AsyncWrite};
43//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
44//!
45//! use futures::SinkExt;
46//! use bytes::Bytes;
47//!
48//! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>>
49//! where
50//! T: AsyncRead + AsyncWrite + Unpin,
51//! {
52//! let mut transport = Framed::new(io, LengthDelimitedCodec::new());
53//! let frame = Bytes::from("hello world");
54//!
55//! transport.send(frame).await?;
56//! Ok(())
57//! }
58//! ```
59//!
60//! The encoded frame will look like this:
61//!
62//! ```text
63//! +---- len: u32 ----+---- data ----+
64//! | \x00\x00\x00\x0b | hello world |
65//! +------------------+--------------+
66//! ```
67//!
68//! # Decoding
69//!
70//! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`],
71//! such that each yielded [`BytesMut`] value contains the contents of an
72//! entire frame. There are many configuration parameters enabling
73//! [`FramedRead`] to handle a wide range of protocols. Here are some
74//! examples that will cover the various options at a high level.
75//!
76//! ## Example 1
77//!
78//! The following will parse a `u16` length field at offset 0, omitting the
79//! frame head in the yielded `BytesMut`.
80//!
81//! ```
82//! # use tokio_stream::StreamExt;
83//! # use tokio_util::codec::LengthDelimitedCodec;
84//! # #[tokio::main]
85//! # async fn main() {
86//! # let io: &[u8] = b"\x00\x0BHello world";
87//! let mut reader = LengthDelimitedCodec::builder()
88//! .length_field_offset(0) // default value
89//! .length_field_type::<u16>()
90//! .length_adjustment(0) // default value
91//! .new_read(io);
92//! # let res = reader.next().await.unwrap().unwrap().to_vec();
93//! # assert_eq!(res, b"Hello world");
94//! # }
95//! ```
96//!
97//! The following frame will be decoded as such:
98//!
99//! ```text
100//! INPUT DECODED
101//! +-- len ---+--- Payload ---+ +--- Payload ---+
102//! | \x00\x0B | Hello world | --> | Hello world |
103//! +----------+---------------+ +---------------+
104//! ```
105//!
106//! The value of the length field is 11 (`\x0B`) which represents the length
107//! of the payload, `hello world`. By default, [`FramedRead`] assumes that
108//! the length field represents the number of bytes that **follows** the
109//! length field. Thus, the entire frame has a length of 13: 2 bytes for the
110//! frame head + 11 bytes for the payload.
111//!
112//! ## Example 2
113//!
114//! The following will parse a `u16` length field at offset 0, including the
115//! frame head in the yielded `BytesMut`.
116//!
117//! ```
118//! # use tokio_stream::StreamExt;
119//! # use tokio_util::codec::LengthDelimitedCodec;
120//! # #[tokio::main]
121//! # async fn main() {
122//! # let io: &[u8] = b"\x00\x0BHello world";
123//! let mut reader = LengthDelimitedCodec::builder()
124//! .length_field_offset(0) // default value
125//! .length_field_type::<u16>()
126//! .length_adjustment(2) // Add head size to length
127//! .num_skip(0) // Do NOT skip the head
128//! .new_read(io);
129//! # let res = reader.next().await.unwrap().unwrap().to_vec();
130//! # assert_eq!(res, b"\x00\x0BHello world");
131//! # }
132//! ```
133//!
134//! The following frame will be decoded as such:
135//!
136//! ```text
137//! INPUT DECODED
138//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+
139//! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world |
140//! +----------+---------------+ +----------+---------------+
141//! ```
142//!
143//! This is similar to the first example, the only difference is that the
144//! frame head is **included** in the yielded `BytesMut` value. To achieve
145//! this, we need to add the header size to the length with `length_adjustment`,
146//! and set `num_skip` to `0` to prevent skipping the head.
147//!
148//! ## Example 3
149//!
150//! The following will parse a `u16` length field at offset 0, omitting the
151//! frame head in the yielded `BytesMut`. In this case, the length field
152//! **includes** the frame head length.
153//!
154//! ```
155//! # use tokio_stream::StreamExt;
156//! # use tokio_util::codec::LengthDelimitedCodec;
157//! # #[tokio::main]
158//! # async fn main() {
159//! # let io: &[u8] = b"\x00\x0DHello world";
160//! let mut reader = LengthDelimitedCodec::builder()
161//! .length_field_offset(0) // default value
162//! .length_field_type::<u16>()
163//! .length_adjustment(-2) // size of head
164//! .new_read(io);
165//! # let res = reader.next().await.unwrap().unwrap().to_vec();
166//! # assert_eq!(res, b"Hello world");
167//! # }
168//! ```
169//!
170//! The following frame will be decoded as such:
171//!
172//! ```text
173//! INPUT DECODED
174//! +-- len ---+--- Payload ---+ +--- Payload ---+
175//! | \x00\x0D | Hello world | --> | Hello world |
176//! +----------+---------------+ +---------------+
177//! ```
178//!
179//! In most cases, the length field represents the length of the payload
180//! only, as shown in the previous examples. However, in some protocols the
181//! length field represents the length of the whole frame, including the
182//! head. In such cases, we specify a negative `length_adjustment` to adjust
183//! the value provided in the frame head to represent the payload length.
184//!
185//! ## Example 4
186//!
187//! The following will parse a 3 byte length field at offset 0 in a 5 byte
188//! frame head, including the frame head in the yielded `BytesMut`.
189//!
190//! ```
191//! # use tokio_stream::StreamExt;
192//! # use tokio_util::codec::LengthDelimitedCodec;
193//! # #[tokio::main]
194//! # async fn main() {
195//! # let io: &[u8] = b"\x00\x00\x0B\xCA\xFEHello world";
196//! let mut reader = LengthDelimitedCodec::builder()
197//! .length_field_offset(0) // default value
198//! .length_field_length(3)
199//! .length_adjustment(3 + 2) // len field and remaining head
200//! .num_skip(0)
201//! .new_read(io);
202//! # let res = reader.next().await.unwrap().unwrap().to_vec();
203//! # assert_eq!(res, b"\x00\x00\x0B\xCA\xFEHello world");
204//! # }
205//! ```
206//!
207//! The following frame will be decoded as such:
208//!
209//! ```text
210//! INPUT
211//! +---- len -----+- head -+--- Payload ---+
212//! | \x00\x00\x0B | \xCAFE | Hello world |
213//! +--------------+--------+---------------+
214//!
215//! DECODED
216//! +---- len -----+- head -+--- Payload ---+
217//! | \x00\x00\x0B | \xCAFE | Hello world |
218//! +--------------+--------+---------------+
219//! ```
220//!
221//! A more advanced example that shows a case where there is extra frame
222//! head data between the length field and the payload. In such cases, it is
223//! usually desirable to include the frame head as part of the yielded
224//! `BytesMut`. This lets consumers of the length delimited framer to
225//! process the frame head as needed.
226//!
227//! The positive `length_adjustment` value lets `FramedRead` factor in the
228//! additional head into the frame length calculation.
229//!
230//! ## Example 5
231//!
232//! The following will parse a `u16` length field at offset 1 of a 4 byte
233//! frame head. The first byte and the length field will be omitted from the
234//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
235//! included.
236//!
237//! ```
238//! # use tokio_stream::StreamExt;
239//! # use tokio_util::codec::LengthDelimitedCodec;
240//! # #[tokio::main]
241//! # async fn main() {
242//! # let io: &[u8] = b"\xCA\x00\x0B\xFEHello world";
243//! let mut reader = LengthDelimitedCodec::builder()
244//! .length_field_offset(1) // length of hdr1
245//! .length_field_type::<u16>()
246//! .length_adjustment(1) // length of hdr2
247//! .num_skip(3) // length of hdr1 + LEN
248//! .new_read(io);
249//! # let res = reader.next().await.unwrap().unwrap().to_vec();
250//! # assert_eq!(res, b"\xFEHello world");
251//! # }
252//! ```
253//!
254//! The following frame will be decoded as such:
255//!
256//! ```text
257//! INPUT
258//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
259//! | \xCA | \x00\x0B | \xFE | Hello world |
260//! +--------+----------+--------+---------------+
261//!
262//! DECODED
263//! +- hdr2 -+--- Payload ---+
264//! | \xFE | Hello world |
265//! +--------+---------------+
266//! ```
267//!
268//! The length field is situated in the middle of the frame head. In this
269//! case, the first byte in the frame head could be a version or some other
270//! identifier that is not needed for processing. On the other hand, the
271//! second half of the head is needed.
272//!
273//! `length_field_offset` indicates how many bytes to skip before starting
274//! to read the length field. `length_adjustment` is the number of bytes to
275//! skip starting at the end of the length field. In this case, it is the
276//! second half of the head.
277//!
278//! ## Example 6
279//!
280//! The following will parse a `u16` length field at offset 1 of a 4 byte
281//! frame head. The first byte and the length field will be omitted from the
282//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
283//! included. In this case, the length field **includes** the frame head
284//! length.
285//!
286//! ```
287//! # use tokio_stream::StreamExt;
288//! # use tokio_util::codec::LengthDelimitedCodec;
289//! # #[tokio::main]
290//! # async fn main() {
291//! # let io: &[u8] = b"\xCA\x00\x0F\xFEHello world";
292//! let mut reader = LengthDelimitedCodec::builder()
293//! .length_field_offset(1) // length of hdr1
294//! .length_field_type::<u16>()
295//! .length_adjustment(-3) // length of hdr1 + LEN, negative
296//! .num_skip(3)
297//! .new_read(io);
298//! # let res = reader.next().await.unwrap().unwrap().to_vec();
299//! # assert_eq!(res, b"\xFEHello world");
300//! # }
301//! ```
302//!
303//! The following frame will be decoded as such:
304//!
305//! ```text
306//! INPUT
307//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
308//! | \xCA | \x00\x0F | \xFE | Hello world |
309//! +--------+----------+--------+---------------+
310//!
311//! DECODED
312//! +- hdr2 -+--- Payload ---+
313//! | \xFE | Hello world |
314//! +--------+---------------+
315//! ```
316//!
317//! Similar to the example above, the difference is that the length field
318//! represents the length of the entire frame instead of just the payload.
319//! The length of `hdr1` and `len` must be counted in `length_adjustment`.
320//! Note that the length of `hdr2` does **not** need to be explicitly set
321//! anywhere because it already is factored into the total frame length that
322//! is read from the byte stream.
323//!
324//! ## Example 7
325//!
326//! The following will parse a 3 byte length field at offset 0 in a 4 byte
327//! frame head, excluding the 4th byte from the yielded `BytesMut`.
328//!
329//! ```
330//! # use tokio_stream::StreamExt;
331//! # use tokio_util::codec::LengthDelimitedCodec;
332//! # #[tokio::main]
333//! # async fn main() {
334//! # let io: &[u8] = b"\x00\x00\x0B\xFFHello world";
335//! let mut reader = LengthDelimitedCodec::builder()
336//! .length_field_offset(0) // default value
337//! .length_field_length(3)
338//! .length_adjustment(0) // default value
339//! .num_skip(4) // skip the first 4 bytes
340//! .new_read(io);
341//! # let res = reader.next().await.unwrap().unwrap().to_vec();
342//! # assert_eq!(res, b"Hello world");
343//! # }
344//! ```
345//!
346//! The following frame will be decoded as such:
347//!
348//! ```text
349//! INPUT DECODED
350//! +------- len ------+--- Payload ---+ +--- Payload ---+
351//! | \x00\x00\x0B\xFF | Hello world | => | Hello world |
352//! +------------------+---------------+ +---------------+
353//! ```
354//!
355//! A simple example where there are unused bytes between the length field
356//! and the payload.
357//!
358//! # Encoding
359//!
360//! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`],
361//! such that each submitted [`BytesMut`] is prefaced by a length field.
362//! There are fewer configuration options than [`FramedRead`]. Given
363//! protocols that have more complex frame heads, an encoder should probably
364//! be written by hand using [`Encoder`].
365//!
366//! Here is a simple example, given a `FramedWrite` with the following
367//! configuration:
368//!
369//! ```
370//! # use tokio::io::AsyncWrite;
371//! # use tokio_util::codec::LengthDelimitedCodec;
372//! # fn write_frame<T: AsyncWrite>(io: T) {
373//! # let _ =
374//! LengthDelimitedCodec::builder()
375//! .length_field_type::<u16>()
376//! .new_write(io);
377//! # }
378//! # pub fn main() {}
379//! ```
380//!
381//! A payload of `hello world` will be encoded as:
382//!
383//! ```text
384//! +- len: u16 -+---- data ----+
385//! | \x00\x0b | hello world |
386//! +------------+--------------+
387//! ```
388//!
389//! [`LengthDelimitedCodec::new()`]: method@LengthDelimitedCodec::new
390//! [`FramedRead`]: struct@FramedRead
391//! [`FramedWrite`]: struct@FramedWrite
392//! [`AsyncRead`]: trait@tokio::io::AsyncRead
393//! [`AsyncWrite`]: trait@tokio::io::AsyncWrite
394//! [`Encoder`]: trait@Encoder
395//! [`BytesMut`]: bytes::BytesMut
396
397use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite};
398
399use tokio::io::{AsyncRead, AsyncWrite};
400
401use bytes::{Buf, BufMut, Bytes, BytesMut};
402use std::error::Error as StdError;
403use std::io::{self, Cursor};
404use std::{cmp, fmt, mem};
405
406/// Configure length delimited `LengthDelimitedCodec`s.
407///
408/// `Builder` enables constructing configured length delimited codecs. Note
409/// that not all configuration settings apply to both encoding and decoding. See
410/// the documentation for specific methods for more detail.
411///
412/// Note that the if the value of [`Builder::max_frame_length`] becomes larger than
413/// what can actually fit in [`Builder::length_field_length`], it will be clipped to
414/// the maximum value that can fit.
415#[derive(Debug, Clone, Copy)]
416pub struct Builder {
417 // Maximum frame length
418 max_frame_len: usize,
419
420 // Number of bytes representing the field length
421 length_field_len: usize,
422
423 // Number of bytes in the header before the length field
424 length_field_offset: usize,
425
426 // Adjust the length specified in the header field by this amount
427 length_adjustment: isize,
428
429 // Total number of bytes to skip before reading the payload, if not set,
430 // `length_field_len + length_field_offset`
431 num_skip: Option<usize>,
432
433 // Length field byte order (little or big endian)
434 length_field_is_big_endian: bool,
435}
436
437/// An error when the number of bytes read is more than max frame length.
438pub struct LengthDelimitedCodecError {
439 _priv: (),
440}
441
442/// A codec for frames delimited by a frame head specifying their lengths.
443///
444/// This allows the consumer to work with entire frames without having to worry
445/// about buffering or other framing logic.
446///
447/// See [module level] documentation for more detail.
448///
449/// [module level]: index.html
450#[derive(Debug, Clone)]
451pub struct LengthDelimitedCodec {
452 // Configuration values
453 builder: Builder,
454
455 // Read state
456 state: DecodeState,
457}
458
459#[derive(Debug, Clone, Copy)]
460enum DecodeState {
461 Head,
462 Data(usize),
463}
464
465// ===== impl LengthDelimitedCodec ======
466
467impl LengthDelimitedCodec {
468 /// Creates a new `LengthDelimitedCodec` with the default configuration values.
469 pub fn new() -> Self {
470 Self {
471 builder: Builder::new(),
472 state: DecodeState::Head,
473 }
474 }
475
476 /// Creates a new length delimited codec builder with default configuration
477 /// values.
478 pub fn builder() -> Builder {
479 Builder::new()
480 }
481
482 /// Returns the current max frame setting
483 ///
484 /// This is the largest size this codec will accept from the wire. Larger
485 /// frames will be rejected.
486 pub fn max_frame_length(&self) -> usize {
487 self.builder.max_frame_len
488 }
489
490 /// Updates the max frame setting.
491 ///
492 /// The change takes effect the next time a frame is decoded. In other
493 /// words, if a frame is currently in process of being decoded with a frame
494 /// size greater than `val` but less than the max frame length in effect
495 /// before calling this function, then the frame will be allowed.
496 pub fn set_max_frame_length(&mut self, val: usize) {
497 self.builder.max_frame_length(val);
498 }
499
500 fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> {
501 let head_len = self.builder.num_head_bytes();
502 let field_len = self.builder.length_field_len;
503
504 if src.len() < head_len {
505 // Not enough data
506 return Ok(None);
507 }
508
509 let n = {
510 let mut src = Cursor::new(&mut *src);
511
512 // Skip the required bytes
513 src.advance(self.builder.length_field_offset);
514
515 // match endianness
516 let n = if self.builder.length_field_is_big_endian {
517 src.get_uint(field_len)
518 } else {
519 src.get_uint_le(field_len)
520 };
521
522 if n > self.builder.max_frame_len as u64 {
523 return Err(io::Error::new(
524 io::ErrorKind::InvalidData,
525 LengthDelimitedCodecError { _priv: () },
526 ));
527 }
528
529 // The check above ensures there is no overflow
530 let n = n as usize;
531
532 // Adjust `n` with bounds checking
533 let n = if self.builder.length_adjustment < 0 {
534 n.checked_sub(-self.builder.length_adjustment as usize)
535 } else {
536 n.checked_add(self.builder.length_adjustment as usize)
537 };
538
539 // Error handling
540 match n {
541 Some(n) => n,
542 None => {
543 return Err(io::Error::new(
544 io::ErrorKind::InvalidInput,
545 "provided length would overflow after adjustment",
546 ));
547 }
548 }
549 };
550
551 src.advance(self.builder.get_num_skip());
552
553 // Ensure that the buffer has enough space to read the incoming
554 // payload
555 src.reserve(n.saturating_sub(src.len()));
556
557 Ok(Some(n))
558 }
559
560 fn decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut> {
561 // At this point, the buffer has already had the required capacity
562 // reserved. All there is to do is read.
563 if src.len() < n {
564 return None;
565 }
566
567 Some(src.split_to(n))
568 }
569}
570
571impl Decoder for LengthDelimitedCodec {
572 type Item = BytesMut;
573 type Error = io::Error;
574
575 fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
576 let n = match self.state {
577 DecodeState::Head => match self.decode_head(src)? {
578 Some(n) => {
579 self.state = DecodeState::Data(n);
580 n
581 }
582 None => return Ok(None),
583 },
584 DecodeState::Data(n) => n,
585 };
586
587 match self.decode_data(n, src) {
588 Some(data) => {
589 // Update the decode state
590 self.state = DecodeState::Head;
591
592 // Make sure the buffer has enough space to read the next head
593 src.reserve(self.builder.num_head_bytes().saturating_sub(src.len()));
594
595 Ok(Some(data))
596 }
597 None => Ok(None),
598 }
599 }
600}
601
602impl Encoder<Bytes> for LengthDelimitedCodec {
603 type Error = io::Error;
604
605 fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> {
606 let n = data.len();
607
608 if n > self.builder.max_frame_len {
609 return Err(io::Error::new(
610 io::ErrorKind::InvalidInput,
611 LengthDelimitedCodecError { _priv: () },
612 ));
613 }
614
615 // Adjust `n` with bounds checking
616 let n = if self.builder.length_adjustment < 0 {
617 n.checked_add(-self.builder.length_adjustment as usize)
618 } else {
619 n.checked_sub(self.builder.length_adjustment as usize)
620 };
621
622 let n = n.ok_or_else(|| {
623 io::Error::new(
624 io::ErrorKind::InvalidInput,
625 "provided length would overflow after adjustment",
626 )
627 })?;
628
629 // Reserve capacity in the destination buffer to fit the frame and
630 // length field (plus adjustment).
631 dst.reserve(self.builder.length_field_len + n);
632
633 if self.builder.length_field_is_big_endian {
634 dst.put_uint(n as u64, self.builder.length_field_len);
635 } else {
636 dst.put_uint_le(n as u64, self.builder.length_field_len);
637 }
638
639 // Write the frame to the buffer
640 dst.extend_from_slice(&data[..]);
641
642 Ok(())
643 }
644}
645
646impl Default for LengthDelimitedCodec {
647 fn default() -> Self {
648 Self::new()
649 }
650}
651
652// ===== impl Builder =====
653
654mod builder {
655 /// Types that can be used with `Builder::length_field_type`.
656 pub trait LengthFieldType {}
657
658 impl LengthFieldType for u8 {}
659 impl LengthFieldType for u16 {}
660 impl LengthFieldType for u32 {}
661 impl LengthFieldType for u64 {}
662
663 #[cfg(any(
664 target_pointer_width = "16",
665 target_pointer_width = "32",
666 target_pointer_width = "64",
667 ))]
668 impl LengthFieldType for usize {}
669}
670
671impl Builder {
672 /// Creates a new length delimited codec builder with default configuration
673 /// values.
674 ///
675 /// # Examples
676 ///
677 /// ```
678 /// # use tokio::io::AsyncRead;
679 /// use tokio_util::codec::LengthDelimitedCodec;
680 ///
681 /// # fn bind_read<T: AsyncRead>(io: T) {
682 /// LengthDelimitedCodec::builder()
683 /// .length_field_offset(0)
684 /// .length_field_type::<u16>()
685 /// .length_adjustment(0)
686 /// .num_skip(0)
687 /// .new_read(io);
688 /// # }
689 /// # pub fn main() {}
690 /// ```
691 pub fn new() -> Builder {
692 Builder {
693 // Default max frame length of 8MB
694 max_frame_len: 8 * 1_024 * 1_024,
695
696 // Default byte length of 4
697 length_field_len: 4,
698
699 // Default to the header field being at the start of the header.
700 length_field_offset: 0,
701
702 length_adjustment: 0,
703
704 // Total number of bytes to skip before reading the payload, if not set,
705 // `length_field_len + length_field_offset`
706 num_skip: None,
707
708 // Default to reading the length field in network (big) endian.
709 length_field_is_big_endian: true,
710 }
711 }
712
713 /// Read the length field as a big endian integer
714 ///
715 /// This is the default setting.
716 ///
717 /// This configuration option applies to both encoding and decoding.
718 ///
719 /// # Examples
720 ///
721 /// ```
722 /// # use tokio::io::AsyncRead;
723 /// use tokio_util::codec::LengthDelimitedCodec;
724 ///
725 /// # fn bind_read<T: AsyncRead>(io: T) {
726 /// LengthDelimitedCodec::builder()
727 /// .big_endian()
728 /// .new_read(io);
729 /// # }
730 /// # pub fn main() {}
731 /// ```
732 pub fn big_endian(&mut self) -> &mut Self {
733 self.length_field_is_big_endian = true;
734 self
735 }
736
737 /// Read the length field as a little endian integer
738 ///
739 /// The default setting is big endian.
740 ///
741 /// This configuration option applies to both encoding and decoding.
742 ///
743 /// # Examples
744 ///
745 /// ```
746 /// # use tokio::io::AsyncRead;
747 /// use tokio_util::codec::LengthDelimitedCodec;
748 ///
749 /// # fn bind_read<T: AsyncRead>(io: T) {
750 /// LengthDelimitedCodec::builder()
751 /// .little_endian()
752 /// .new_read(io);
753 /// # }
754 /// # pub fn main() {}
755 /// ```
756 pub fn little_endian(&mut self) -> &mut Self {
757 self.length_field_is_big_endian = false;
758 self
759 }
760
761 /// Read the length field as a native endian integer
762 ///
763 /// The default setting is big endian.
764 ///
765 /// This configuration option applies to both encoding and decoding.
766 ///
767 /// # Examples
768 ///
769 /// ```
770 /// # use tokio::io::AsyncRead;
771 /// use tokio_util::codec::LengthDelimitedCodec;
772 ///
773 /// # fn bind_read<T: AsyncRead>(io: T) {
774 /// LengthDelimitedCodec::builder()
775 /// .native_endian()
776 /// .new_read(io);
777 /// # }
778 /// # pub fn main() {}
779 /// ```
780 pub fn native_endian(&mut self) -> &mut Self {
781 if cfg!(target_endian = "big") {
782 self.big_endian()
783 } else {
784 self.little_endian()
785 }
786 }
787
788 /// Sets the max frame length in bytes
789 ///
790 /// This configuration option applies to both encoding and decoding. The
791 /// default value is 8MB.
792 ///
793 /// When decoding, the length field read from the byte stream is checked
794 /// against this setting **before** any adjustments are applied. When
795 /// encoding, the length of the submitted payload is checked against this
796 /// setting.
797 ///
798 /// When frames exceed the max length, an `io::Error` with the custom value
799 /// of the `LengthDelimitedCodecError` type will be returned.
800 ///
801 /// # Examples
802 ///
803 /// ```
804 /// # use tokio::io::AsyncRead;
805 /// use tokio_util::codec::LengthDelimitedCodec;
806 ///
807 /// # fn bind_read<T: AsyncRead>(io: T) {
808 /// LengthDelimitedCodec::builder()
809 /// .max_frame_length(8 * 1024 * 1024)
810 /// .new_read(io);
811 /// # }
812 /// # pub fn main() {}
813 /// ```
814 pub fn max_frame_length(&mut self, val: usize) -> &mut Self {
815 self.max_frame_len = val;
816 self
817 }
818
819 /// Sets the unsigned integer type used to represent the length field.
820 ///
821 /// The default type is [`u32`]. The max type is [`u64`] (or [`usize`] on
822 /// 64-bit targets).
823 ///
824 /// # Examples
825 ///
826 /// ```
827 /// # use tokio::io::AsyncRead;
828 /// use tokio_util::codec::LengthDelimitedCodec;
829 ///
830 /// # fn bind_read<T: AsyncRead>(io: T) {
831 /// LengthDelimitedCodec::builder()
832 /// .length_field_type::<u32>()
833 /// .new_read(io);
834 /// # }
835 /// # pub fn main() {}
836 /// ```
837 ///
838 /// Unlike [`Builder::length_field_length`], this does not fail at runtime
839 /// and instead produces a compile error:
840 ///
841 /// ```compile_fail
842 /// # use tokio::io::AsyncRead;
843 /// # use tokio_util::codec::LengthDelimitedCodec;
844 /// # fn bind_read<T: AsyncRead>(io: T) {
845 /// LengthDelimitedCodec::builder()
846 /// .length_field_type::<u128>()
847 /// .new_read(io);
848 /// # }
849 /// # pub fn main() {}
850 /// ```
851 pub fn length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self {
852 self.length_field_length(mem::size_of::<T>())
853 }
854
855 /// Sets the number of bytes used to represent the length field
856 ///
857 /// The default value is `4`. The max value is `8`.
858 ///
859 /// This configuration option applies to both encoding and decoding.
860 ///
861 /// # Examples
862 ///
863 /// ```
864 /// # use tokio::io::AsyncRead;
865 /// use tokio_util::codec::LengthDelimitedCodec;
866 ///
867 /// # fn bind_read<T: AsyncRead>(io: T) {
868 /// LengthDelimitedCodec::builder()
869 /// .length_field_length(4)
870 /// .new_read(io);
871 /// # }
872 /// # pub fn main() {}
873 /// ```
874 pub fn length_field_length(&mut self, val: usize) -> &mut Self {
875 assert!(val > 0 && val <= 8, "invalid length field length");
876 self.length_field_len = val;
877 self
878 }
879
880 /// Sets the number of bytes in the header before the length field
881 ///
882 /// This configuration option only applies to decoding.
883 ///
884 /// # Examples
885 ///
886 /// ```
887 /// # use tokio::io::AsyncRead;
888 /// use tokio_util::codec::LengthDelimitedCodec;
889 ///
890 /// # fn bind_read<T: AsyncRead>(io: T) {
891 /// LengthDelimitedCodec::builder()
892 /// .length_field_offset(1)
893 /// .new_read(io);
894 /// # }
895 /// # pub fn main() {}
896 /// ```
897 pub fn length_field_offset(&mut self, val: usize) -> &mut Self {
898 self.length_field_offset = val;
899 self
900 }
901
902 /// Delta between the payload length specified in the header and the real
903 /// payload length
904 ///
905 /// # Examples
906 ///
907 /// ```
908 /// # use tokio::io::AsyncRead;
909 /// use tokio_util::codec::LengthDelimitedCodec;
910 ///
911 /// # fn bind_read<T: AsyncRead>(io: T) {
912 /// LengthDelimitedCodec::builder()
913 /// .length_adjustment(-2)
914 /// .new_read(io);
915 /// # }
916 /// # pub fn main() {}
917 /// ```
918 pub fn length_adjustment(&mut self, val: isize) -> &mut Self {
919 self.length_adjustment = val;
920 self
921 }
922
923 /// Sets the number of bytes to skip before reading the payload
924 ///
925 /// Default value is `length_field_len + length_field_offset`
926 ///
927 /// This configuration option only applies to decoding
928 ///
929 /// # Examples
930 ///
931 /// ```
932 /// # use tokio::io::AsyncRead;
933 /// use tokio_util::codec::LengthDelimitedCodec;
934 ///
935 /// # fn bind_read<T: AsyncRead>(io: T) {
936 /// LengthDelimitedCodec::builder()
937 /// .num_skip(4)
938 /// .new_read(io);
939 /// # }
940 /// # pub fn main() {}
941 /// ```
942 pub fn num_skip(&mut self, val: usize) -> &mut Self {
943 self.num_skip = Some(val);
944 self
945 }
946
947 /// Create a configured length delimited `LengthDelimitedCodec`
948 ///
949 /// # Examples
950 ///
951 /// ```
952 /// use tokio_util::codec::LengthDelimitedCodec;
953 /// # pub fn main() {
954 /// LengthDelimitedCodec::builder()
955 /// .length_field_offset(0)
956 /// .length_field_type::<u16>()
957 /// .length_adjustment(0)
958 /// .num_skip(0)
959 /// .new_codec();
960 /// # }
961 /// ```
962 pub fn new_codec(&self) -> LengthDelimitedCodec {
963 let mut builder = *self;
964
965 builder.adjust_max_frame_len();
966
967 LengthDelimitedCodec {
968 builder,
969 state: DecodeState::Head,
970 }
971 }
972
973 /// Create a configured length delimited `FramedRead`
974 ///
975 /// # Examples
976 ///
977 /// ```
978 /// # use tokio::io::AsyncRead;
979 /// use tokio_util::codec::LengthDelimitedCodec;
980 ///
981 /// # fn bind_read<T: AsyncRead>(io: T) {
982 /// LengthDelimitedCodec::builder()
983 /// .length_field_offset(0)
984 /// .length_field_type::<u16>()
985 /// .length_adjustment(0)
986 /// .num_skip(0)
987 /// .new_read(io);
988 /// # }
989 /// # pub fn main() {}
990 /// ```
991 pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec>
992 where
993 T: AsyncRead,
994 {
995 FramedRead::new(upstream, self.new_codec())
996 }
997
998 /// Create a configured length delimited `FramedWrite`
999 ///
1000 /// # Examples
1001 ///
1002 /// ```
1003 /// # use tokio::io::AsyncWrite;
1004 /// # use tokio_util::codec::LengthDelimitedCodec;
1005 /// # fn write_frame<T: AsyncWrite>(io: T) {
1006 /// LengthDelimitedCodec::builder()
1007 /// .length_field_type::<u16>()
1008 /// .new_write(io);
1009 /// # }
1010 /// # pub fn main() {}
1011 /// ```
1012 pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec>
1013 where
1014 T: AsyncWrite,
1015 {
1016 FramedWrite::new(inner, self.new_codec())
1017 }
1018
1019 /// Create a configured length delimited `Framed`
1020 ///
1021 /// # Examples
1022 ///
1023 /// ```
1024 /// # use tokio::io::{AsyncRead, AsyncWrite};
1025 /// # use tokio_util::codec::LengthDelimitedCodec;
1026 /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
1027 /// # let _ =
1028 /// LengthDelimitedCodec::builder()
1029 /// .length_field_type::<u16>()
1030 /// .new_framed(io);
1031 /// # }
1032 /// # pub fn main() {}
1033 /// ```
1034 pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec>
1035 where
1036 T: AsyncRead + AsyncWrite,
1037 {
1038 Framed::new(inner, self.new_codec())
1039 }
1040
1041 fn num_head_bytes(&self) -> usize {
1042 let num = self.length_field_offset + self.length_field_len;
1043 cmp::max(num, self.num_skip.unwrap_or(0))
1044 }
1045
1046 fn get_num_skip(&self) -> usize {
1047 self.num_skip
1048 .unwrap_or(self.length_field_offset + self.length_field_len)
1049 }
1050
1051 fn adjust_max_frame_len(&mut self) {
1052 // Calculate the maximum number that can be represented using `length_field_len` bytes.
1053 let max_number = match 1u64.checked_shl((8 * self.length_field_len) as u32) {
1054 Some(shl) => shl - 1,
1055 None => u64::MAX,
1056 };
1057
1058 let max_allowed_len = max_number.saturating_add_signed(self.length_adjustment as i64);
1059
1060 if self.max_frame_len as u64 > max_allowed_len {
1061 self.max_frame_len = usize::try_from(max_allowed_len).unwrap_or(usize::MAX);
1062 }
1063 }
1064}
1065
1066impl Default for Builder {
1067 fn default() -> Self {
1068 Self::new()
1069 }
1070}
1071
1072// ===== impl LengthDelimitedCodecError =====
1073
1074impl fmt::Debug for LengthDelimitedCodecError {
1075 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1076 f.debug_struct("LengthDelimitedCodecError").finish()
1077 }
1078}
1079
1080impl fmt::Display for LengthDelimitedCodecError {
1081 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1082 f.write_str("frame size too big")
1083 }
1084}
1085
1086impl StdError for LengthDelimitedCodecError {}