libp2p_request_response/
json.rs1pub type Behaviour<Req, Resp> = crate::Behaviour<codec::Codec<Req, Resp>>;
48
49mod codec {
50 use std::{io, marker::PhantomData};
51
52 use async_trait::async_trait;
53 use futures::prelude::*;
54 use libp2p_swarm::StreamProtocol;
55 use serde::{de::DeserializeOwned, Serialize};
56
57 pub struct Codec<Req, Resp> {
58 request_size_maximum: u64,
60 response_size_maximum: u64,
62 phantom: PhantomData<(Req, Resp)>,
63 }
64
65 impl<Req, Resp> Default for Codec<Req, Resp> {
66 fn default() -> Self {
67 Codec {
68 request_size_maximum: 1024 * 1024,
69 response_size_maximum: 10 * 1024 * 1024,
70 phantom: PhantomData,
71 }
72 }
73 }
74
75 impl<Req, Resp> Clone for Codec<Req, Resp> {
76 fn clone(&self) -> Self {
77 Self {
78 request_size_maximum: self.request_size_maximum,
79 response_size_maximum: self.response_size_maximum,
80 phantom: self.phantom,
81 }
82 }
83 }
84
85 impl<Req, Resp> Codec<Req, Resp> {
86 pub fn set_request_size_maximum(mut self, request_size_maximum: u64) -> Self {
88 self.request_size_maximum = request_size_maximum;
89 self
90 }
91
92 pub fn set_response_size_maximum(mut self, response_size_maximum: u64) -> Self {
94 self.response_size_maximum = response_size_maximum;
95 self
96 }
97 }
98
99 #[async_trait]
100 impl<Req, Resp> crate::Codec for Codec<Req, Resp>
101 where
102 Req: Send + Serialize + DeserializeOwned,
103 Resp: Send + Serialize + DeserializeOwned,
104 {
105 type Protocol = StreamProtocol;
106 type Request = Req;
107 type Response = Resp;
108
109 async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Req>
110 where
111 T: AsyncRead + Unpin + Send,
112 {
113 let mut vec = Vec::new();
114
115 io.take(self.request_size_maximum)
116 .read_to_end(&mut vec)
117 .await?;
118
119 Ok(serde_json::from_slice(vec.as_slice())?)
120 }
121
122 async fn read_response<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Resp>
123 where
124 T: AsyncRead + Unpin + Send,
125 {
126 let mut vec = Vec::new();
127
128 io.take(self.response_size_maximum)
129 .read_to_end(&mut vec)
130 .await?;
131
132 Ok(serde_json::from_slice(vec.as_slice())?)
133 }
134
135 async fn write_request<T>(
136 &mut self,
137 _: &Self::Protocol,
138 io: &mut T,
139 req: Self::Request,
140 ) -> io::Result<()>
141 where
142 T: AsyncWrite + Unpin + Send,
143 {
144 let data = serde_json::to_vec(&req)?;
145
146 io.write_all(data.as_ref()).await?;
147
148 Ok(())
149 }
150
151 async fn write_response<T>(
152 &mut self,
153 _: &Self::Protocol,
154 io: &mut T,
155 resp: Self::Response,
156 ) -> io::Result<()>
157 where
158 T: AsyncWrite + Unpin + Send,
159 {
160 let data = serde_json::to_vec(&resp)?;
161
162 io.write_all(data.as_ref()).await?;
163
164 Ok(())
165 }
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use futures::AsyncWriteExt;
172 use futures_ringbuf::Endpoint;
173 use libp2p_swarm::StreamProtocol;
174 use serde::{Deserialize, Serialize};
175
176 use crate::Codec;
177
178 #[async_std::test]
179 async fn test_codec() {
180 let expected_request = TestRequest {
181 payload: "test_payload".to_string(),
182 };
183 let expected_response = TestResponse {
184 payload: "test_payload".to_string(),
185 };
186 let protocol = StreamProtocol::new("/test_json/1");
187 let mut codec: super::codec::Codec<TestRequest, TestResponse> =
188 super::codec::Codec::default();
189
190 let (mut a, mut b) = Endpoint::pair(124, 124);
191 codec
192 .write_request(&protocol, &mut a, expected_request.clone())
193 .await
194 .expect("Should write request");
195 a.close().await.unwrap();
196
197 let actual_request = codec
198 .read_request(&protocol, &mut b)
199 .await
200 .expect("Should read request");
201 b.close().await.unwrap();
202
203 assert_eq!(actual_request, expected_request);
204
205 let (mut a, mut b) = Endpoint::pair(124, 124);
206 codec
207 .write_response(&protocol, &mut a, expected_response.clone())
208 .await
209 .expect("Should write response");
210 a.close().await.unwrap();
211
212 let actual_response = codec
213 .read_response(&protocol, &mut b)
214 .await
215 .expect("Should read response");
216 b.close().await.unwrap();
217
218 assert_eq!(actual_response, expected_response);
219 }
220
221 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
222 struct TestRequest {
223 payload: String,
224 }
225
226 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
227 struct TestResponse {
228 payload: String,
229 }
230}