1use bytes::{Buf, BufMut, Bytes, BytesMut};
23use crate::{encode, decode::{self, Error}};
24use std::{io, marker::PhantomData, usize};
25
26#[derive(Default)]
28pub struct Uvi<T>(PhantomData<T>);
29
30macro_rules! encoder_decoder_impls {
31 ($typ:ident, $arr:ident) => {
32 impl Uvi<$typ> {
33 fn serialise(&mut self, item: $typ, dst: &mut BytesMut) {
34 let mut buf = encode::$arr();
35 dst.extend_from_slice(encode::$typ(item, &mut buf))
36 }
37
38 fn deserialise(&mut self, src: &mut BytesMut) -> Result<Option<$typ>, io::Error> {
39 let (number, consumed) =
40 match decode::$typ(src.as_ref()) {
41 Ok((n, rem)) => (n, src.len() - rem.len()),
42 Err(Error::Insufficient) => return Ok(None),
43 Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e))
44 };
45 src.advance(consumed);
46 Ok(Some(number))
47 }
48 }
49
50 #[cfg(feature = "codec")]
51 impl tokio_util::codec::Encoder<$typ> for Uvi<$typ> {
52 type Error = io::Error;
53
54 fn encode(&mut self, item: $typ, dst: &mut BytesMut) -> Result<(), Self::Error> {
55 self.serialise(item, dst);
56 Ok(())
57 }
58 }
59
60 #[cfg(feature = "codec")]
61 impl tokio_util::codec::Decoder for Uvi<$typ> {
62 type Item = $typ;
63 type Error = io::Error;
64
65 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
66 self.deserialise(src)
67 }
68 }
69
70 #[cfg(feature = "asynchronous_codec")]
71 impl asynchronous_codec::Encoder for Uvi<$typ> {
72 type Item<'a> = $typ;
73 type Error = io::Error;
74
75 fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
76 self.serialise(item, dst);
77 Ok(())
78 }
79 }
80
81 #[cfg(feature = "asynchronous_codec")]
82 impl asynchronous_codec::Decoder for Uvi<$typ> {
83 type Item = $typ;
84 type Error = io::Error;
85
86 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
87 self.deserialise(src)
88 }
89 }
90 }
91}
92
93encoder_decoder_impls!(u8, u8_buffer);
94encoder_decoder_impls!(u16, u16_buffer);
95encoder_decoder_impls!(u32, u32_buffer);
96encoder_decoder_impls!(u64, u64_buffer);
97encoder_decoder_impls!(u128, u128_buffer);
98encoder_decoder_impls!(usize, usize_buffer);
99
100pub struct UviBytes<T = Bytes> {
102 varint_codec: Uvi<usize>,
104 len: Option<usize>,
106 max: usize,
108 _ty: PhantomData<T>
109}
110
111impl<T> Default for UviBytes<T> {
112 fn default() -> Self {
113 Self {
114 varint_codec: Default::default(),
115 len: None,
116 max: 128 * 1024 * 1024,
117 _ty: PhantomData
118 }
119 }
120}
121
122impl<T> UviBytes<T> {
123 pub fn set_max_len(&mut self, val: usize) {
125 self.max = val
126 }
127
128 pub fn max_len(&self) -> usize {
130 self.max
131 }
132
133 fn deserialise(&mut self, src: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
134 if self.len.is_none() {
135 self.len = self.varint_codec.deserialise(src)?
136 }
137 if let Some(n) = self.len.take() {
138 if n > self.max {
139 return Err(io::Error::new(io::ErrorKind::PermissionDenied, "len > max"))
140 }
141 if n <= src.len() {
142 return Ok(Some(src.split_to(n)))
143 }
144 let add = n - src.len();
145 src.reserve(add);
146 self.len = Some(n)
147 }
148 Ok(None)
149 }
150}
151
152impl<T: Buf> UviBytes<T> {
153 fn serialise(&mut self, item: T, dst: &mut BytesMut) -> Result<(), io::Error> {
154 if item.remaining() > self.max {
155 return Err(io::Error::new(io::ErrorKind::PermissionDenied, "len > max when encoding"));
156 }
157 self.varint_codec.serialise(item.remaining(), dst);
158 dst.reserve(item.remaining());
159 dst.put(item);
160 Ok(())
161 }
162}
163
164
165#[cfg(feature = "codec")]
166impl<T: Buf> tokio_util::codec::Encoder<T> for UviBytes<T> {
167 type Error = io::Error;
168
169 fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> {
170 self.serialise(item, dst)
171 }
172}
173
174#[cfg(feature = "codec")]
175impl<T> tokio_util::codec::Decoder for UviBytes<T> {
176 type Item = BytesMut;
177 type Error = io::Error;
178
179 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
180 self.deserialise(src)
181 }
182}
183
184#[cfg(feature = "asynchronous_codec")]
185impl<T: Buf> asynchronous_codec::Encoder for UviBytes<T> {
186 type Item<'a> = T;
187 type Error = io::Error;
188
189 fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
190 self.serialise(item, dst)
191 }
192}
193
194#[cfg(feature = "asynchronous_codec")]
195impl<T> asynchronous_codec::Decoder for UviBytes<T> {
196 type Item = BytesMut;
197 type Error = io::Error;
198
199 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
200 self.deserialise(src)
201 }
202}
203