jsonrpc_server_utils/
stream_codec.rs

1use bytes::BytesMut;
2use std::{io, str};
3
4/// Separator for enveloping messages in streaming codecs
5#[derive(Debug, Clone)]
6pub enum Separator {
7	/// No envelope is expected between messages. Decoder will try to figure out
8	/// message boundaries by accumulating incoming bytes until valid JSON is formed.
9	/// Encoder will send messages without any boundaries between requests.
10	Empty,
11	/// Byte is used as an sentitel between messages
12	Byte(u8),
13}
14
15impl Default for Separator {
16	fn default() -> Self {
17		Separator::Byte(b'\n')
18	}
19}
20
21/// Stream codec for streaming protocols (ipc, tcp)
22#[derive(Debug, Default)]
23pub struct StreamCodec {
24	incoming_separator: Separator,
25	outgoing_separator: Separator,
26}
27
28impl StreamCodec {
29	/// Default codec with streaming input data. Input can be both enveloped and not.
30	pub fn stream_incoming() -> Self {
31		StreamCodec::new(Separator::Empty, Default::default())
32	}
33
34	/// New custom stream codec
35	pub fn new(incoming_separator: Separator, outgoing_separator: Separator) -> Self {
36		StreamCodec {
37			incoming_separator,
38			outgoing_separator,
39		}
40	}
41}
42
43fn is_whitespace(byte: u8) -> bool {
44	matches!(byte, 0x0D | 0x0A | 0x20 | 0x09)
45}
46
47impl tokio_util::codec::Decoder for StreamCodec {
48	type Item = String;
49	type Error = io::Error;
50
51	fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
52		if let Separator::Byte(separator) = self.incoming_separator {
53			if let Some(i) = buf.as_ref().iter().position(|&b| b == separator) {
54				let line = buf.split_to(i);
55				let _ = buf.split_to(1);
56
57				match str::from_utf8(&line.as_ref()) {
58					Ok(s) => Ok(Some(s.to_string())),
59					Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid UTF-8")),
60				}
61			} else {
62				Ok(None)
63			}
64		} else {
65			let mut depth = 0;
66			let mut in_str = false;
67			let mut is_escaped = false;
68			let mut start_idx = 0;
69			let mut whitespaces = 0;
70
71			for idx in 0..buf.as_ref().len() {
72				let byte = buf.as_ref()[idx];
73
74				if (byte == b'{' || byte == b'[') && !in_str {
75					if depth == 0 {
76						start_idx = idx;
77					}
78					depth += 1;
79				} else if (byte == b'}' || byte == b']') && !in_str {
80					depth -= 1;
81				} else if byte == b'"' && !is_escaped {
82					in_str = !in_str;
83				} else if is_whitespace(byte) {
84					whitespaces += 1;
85				}
86				if byte == b'\\' && !is_escaped && in_str {
87					is_escaped = true;
88				} else {
89					is_escaped = false;
90				}
91
92				if depth == 0 && idx != start_idx && idx - start_idx + 1 > whitespaces {
93					let bts = buf.split_to(idx + 1);
94					match String::from_utf8(bts.as_ref().to_vec()) {
95						Ok(val) => return Ok(Some(val)),
96						Err(_) => {
97							return Ok(None);
98						} // skip non-utf requests (TODO: log error?)
99					};
100				}
101			}
102			Ok(None)
103		}
104	}
105}
106
107impl tokio_util::codec::Encoder<String> for StreamCodec {
108	type Error = io::Error;
109
110	fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
111		let mut payload = msg.into_bytes();
112		if let Separator::Byte(separator) = self.outgoing_separator {
113			payload.push(separator);
114		}
115		buf.extend_from_slice(&payload);
116		Ok(())
117	}
118}
119
120#[cfg(test)]
121mod tests {
122
123	use super::StreamCodec;
124	use bytes::{BufMut, BytesMut};
125	use tokio_util::codec::Decoder;
126
127	#[test]
128	fn simple_encode() {
129		let mut buf = BytesMut::with_capacity(2048);
130		buf.put_slice(b"{ test: 1 }{ test: 2 }{ test: 3 }");
131
132		let mut codec = StreamCodec::stream_incoming();
133
134		let request = codec
135			.decode(&mut buf)
136			.expect("There should be no error in simple test")
137			.expect("There should be at least one request in simple test");
138
139		assert_eq!(request, "{ test: 1 }");
140	}
141
142	#[test]
143	fn escape() {
144		let mut buf = BytesMut::with_capacity(2048);
145		buf.put_slice(br#"{ test: "\"\\" }{ test: "\ " }{ test: "\}" }[ test: "\]" ]"#);
146
147		let mut codec = StreamCodec::stream_incoming();
148
149		let request = codec
150			.decode(&mut buf)
151			.expect("There should be no error in first escape test")
152			.expect("There should be a request in first escape test");
153
154		assert_eq!(request, r#"{ test: "\"\\" }"#);
155
156		let request2 = codec
157			.decode(&mut buf)
158			.expect("There should be no error in 2nd escape test")
159			.expect("There should be a request in 2nd escape test");
160		assert_eq!(request2, r#"{ test: "\ " }"#);
161
162		let request3 = codec
163			.decode(&mut buf)
164			.expect("There should be no error in 3rd escape test")
165			.expect("There should be a request in 3rd escape test");
166		assert_eq!(request3, r#"{ test: "\}" }"#);
167
168		let request4 = codec
169			.decode(&mut buf)
170			.expect("There should be no error in 4th escape test")
171			.expect("There should be a request in 4th escape test");
172		assert_eq!(request4, r#"[ test: "\]" ]"#);
173	}
174
175	#[test]
176	fn whitespace() {
177		let mut buf = BytesMut::with_capacity(2048);
178		buf.put_slice(b"{ test: 1 }\n\n\n\n{ test: 2 }\n\r{\n test: 3 }  ");
179
180		let mut codec = StreamCodec::stream_incoming();
181
182		let request = codec
183			.decode(&mut buf)
184			.expect("There should be no error in first whitespace test")
185			.expect("There should be a request in first whitespace test");
186
187		assert_eq!(request, "{ test: 1 }");
188
189		let request2 = codec
190			.decode(&mut buf)
191			.expect("There should be no error in first 2nd test")
192			.expect("There should be aa request in 2nd whitespace test");
193		// TODO: maybe actually trim it out
194		assert_eq!(request2, "\n\n\n\n{ test: 2 }");
195
196		let request3 = codec
197			.decode(&mut buf)
198			.expect("There should be no error in first 3rd test")
199			.expect("There should be a request in 3rd whitespace test");
200		assert_eq!(request3, "\n\r{\n test: 3 }");
201
202		let request4 = codec
203			.decode(&mut buf)
204			.expect("There should be no error in first 4th test");
205		assert!(
206			request4.is_none(),
207			"There should be no 4th request because it contains only whitespaces"
208		);
209	}
210
211	#[test]
212	fn fragmented_encode() {
213		let mut buf = BytesMut::with_capacity(2048);
214		buf.put_slice(b"{ test: 1 }{ test: 2 }{ tes");
215
216		let mut codec = StreamCodec::stream_incoming();
217
218		let request = codec
219			.decode(&mut buf)
220			.expect("There should be no error in first fragmented test")
221			.expect("There should be at least one request in first fragmented test");
222		assert_eq!(request, "{ test: 1 }");
223		codec
224			.decode(&mut buf)
225			.expect("There should be no error in second fragmented test")
226			.expect("There should be at least one request in second fragmented test");
227		assert_eq!(String::from_utf8(buf.as_ref().to_vec()).unwrap(), "{ tes");
228
229		buf.put_slice(b"t: 3 }");
230		let request = codec
231			.decode(&mut buf)
232			.expect("There should be no error in third fragmented test")
233			.expect("There should be at least one request in third fragmented test");
234		assert_eq!(request, "{ test: 3 }");
235	}
236
237	#[test]
238	fn huge() {
239		let request = r#"
240		{
241			"jsonrpc":"2.0",
242			"method":"say_hello",
243			"params": [
244				42,
245				0,
246				{
247					"from":"0xb60e8dd61c5d32be8058bb8eb970870f07233155",
248					"gas":"0x2dc6c0",
249					"data":"0x606060405260003411156010576002565b6001805433600160a060020a0319918216811790925560028054909116909117905561291f806100406000396000f3606060405236156100e55760e060020a600035046304029f2381146100ed5780630a1273621461015f57806317c1dd87146102335780631f9ea25d14610271578063266fa0e91461029357806349593f5314610429578063569aa0d8146104fc57806359a4669f14610673578063647a4d5f14610759578063656104f5146108095780636e9febfe1461082b57806370de8c6e1461090d57806371bde852146109ed5780638f30435d14610ab4578063916dbc1714610da35780639f5a7cd414610eef578063c91540f614610fe6578063eae99e1c146110b5578063fedc2a281461115a575b61122d610002565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435600154600090600160a060020a03908116339091161461233357610002565b61122f6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505093359350506044359150506064355b60006000600060005086604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905042816005016000508560ff1660028110156100025760040201835060010154604060020a90046001604060020a0316116115df576115d6565b6112416004355b604080516001604060020a038316408152606060020a33600160a060020a031602602082015290519081900360340190205b919050565b61122d600435600254600160a060020a0390811633909116146128e357610002565b61125e6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050505060006000600060006000600060005087604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905080600001600050600087600160a060020a0316815260200190815260200160002060005060000160059054906101000a90046001604060020a03169450845080600001600050600087600160a060020a03168152602001908152602001600020600050600001600d9054906101000a90046001604060020a03169350835080600001600050600087600160a060020a0316815260200190815260200160002060005060000160009054906101000a900460ff169250825080600001600050600087600160a060020a0316815260200190815260200160002060005060000160019054906101000a900463ffffffff16915081505092959194509250565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435608435600060006000600060005088604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509250346000141515611c0e5760405133600160a060020a0316908290349082818181858883f193505050501515611c1a57610002565b6112996004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050600060006000600060006000600060006000508a604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050806001016000508960ff16600281101561000257600160a060020a038a168452828101600101602052604084205463ffffffff1698506002811015610002576040842054606060020a90046001604060020a031697506002811015610002576040842054640100000000900463ffffffff169650600281101561000257604084206001015495506002811015610002576040842054604060020a900463ffffffff169450600281101561000257505060409091205495999498509296509094509260a060020a90046001604060020a0316919050565b61122d6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505050505050506000600060005082604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050348160050160005082600d0160009054906101000a900460ff1660ff16600281101561000257600402830160070180546001608060020a0381169093016001608060020a03199390931692909217909155505b5050565b6112e26004808035906020019082018035906020019191908080601f01602080910003423423094734987103498712093847102938740192387401349857109487501938475"
250				}
251			]
252		}"#;
253
254		let mut buf = BytesMut::with_capacity(65536);
255		buf.put_slice(request.as_bytes());
256
257		let mut codec = StreamCodec::stream_incoming();
258
259		let parsed_request = codec
260			.decode(&mut buf)
261			.expect("There should be no error in huge test")
262			.expect("There should be at least one request huge test");
263		assert_eq!(request, parsed_request);
264	}
265
266	#[test]
267	fn simple_line_codec() {
268		let mut buf = BytesMut::with_capacity(2048);
269		buf.put_slice(b"{ test: 1 }\n{ test: 2 }\n{ test: 3 }");
270
271		let mut codec = StreamCodec::default();
272
273		let request = codec
274			.decode(&mut buf)
275			.expect("There should be no error in simple test")
276			.expect("There should be at least one request in simple test");
277		let request2 = codec
278			.decode(&mut buf)
279			.expect("There should be no error in simple test")
280			.expect("There should be at least one request in simple test");
281
282		assert_eq!(request, "{ test: 1 }");
283		assert_eq!(request2, "{ test: 2 }");
284	}
285}