libp2p_request_response/
json.rs

1// Copyright 2023 Protocol Labs
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21/// A request-response behaviour using [`serde_json`] for serializing and deserializing the
22/// messages.
23///
24/// # Example
25///
26/// ```
27/// # use libp2p_request_response::{json, ProtocolSupport, self as request_response};
28/// # use libp2p_swarm::{StreamProtocol};
29/// #[derive(Debug, serde::Serialize, serde::Deserialize)]
30/// struct GreetRequest {
31///     name: String,
32/// }
33///
34/// #[derive(Debug, serde::Serialize, serde::Deserialize)]
35/// struct GreetResponse {
36///     message: String,
37/// }
38///
39/// let behaviour = json::Behaviour::<GreetRequest, GreetResponse>::new(
40///     [(
41///         StreamProtocol::new("/my-json-protocol"),
42///         ProtocolSupport::Full,
43///     )],
44///     request_response::Config::default(),
45/// );
46/// ```
47pub type Behaviour<Req, Resp> = crate::Behaviour<codec::Codec<Req, Resp>>;
48
49mod codec {
50    use std::{io, marker::PhantomData};
51
52    use async_trait::async_trait;
53    use futures::prelude::*;
54    use libp2p_swarm::StreamProtocol;
55    use serde::{de::DeserializeOwned, Serialize};
56
57    pub struct Codec<Req, Resp> {
58        /// Max request size in bytes
59        request_size_maximum: u64,
60        /// Max response size in bytes
61        response_size_maximum: u64,
62        phantom: PhantomData<(Req, Resp)>,
63    }
64
65    impl<Req, Resp> Default for Codec<Req, Resp> {
66        fn default() -> Self {
67            Codec {
68                request_size_maximum: 1024 * 1024,
69                response_size_maximum: 10 * 1024 * 1024,
70                phantom: PhantomData,
71            }
72        }
73    }
74
75    impl<Req, Resp> Clone for Codec<Req, Resp> {
76        fn clone(&self) -> Self {
77            Self {
78                request_size_maximum: self.request_size_maximum,
79                response_size_maximum: self.response_size_maximum,
80                phantom: self.phantom,
81            }
82        }
83    }
84
85    impl<Req, Resp> Codec<Req, Resp> {
86        /// Sets the limit for request size in bytes.
87        pub fn set_request_size_maximum(mut self, request_size_maximum: u64) -> Self {
88            self.request_size_maximum = request_size_maximum;
89            self
90        }
91
92        /// Sets the limit for response size in bytes.
93        pub fn set_response_size_maximum(mut self, response_size_maximum: u64) -> Self {
94            self.response_size_maximum = response_size_maximum;
95            self
96        }
97    }
98
99    #[async_trait]
100    impl<Req, Resp> crate::Codec for Codec<Req, Resp>
101    where
102        Req: Send + Serialize + DeserializeOwned,
103        Resp: Send + Serialize + DeserializeOwned,
104    {
105        type Protocol = StreamProtocol;
106        type Request = Req;
107        type Response = Resp;
108
109        async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Req>
110        where
111            T: AsyncRead + Unpin + Send,
112        {
113            let mut vec = Vec::new();
114
115            io.take(self.request_size_maximum)
116                .read_to_end(&mut vec)
117                .await?;
118
119            Ok(serde_json::from_slice(vec.as_slice())?)
120        }
121
122        async fn read_response<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Resp>
123        where
124            T: AsyncRead + Unpin + Send,
125        {
126            let mut vec = Vec::new();
127
128            io.take(self.response_size_maximum)
129                .read_to_end(&mut vec)
130                .await?;
131
132            Ok(serde_json::from_slice(vec.as_slice())?)
133        }
134
135        async fn write_request<T>(
136            &mut self,
137            _: &Self::Protocol,
138            io: &mut T,
139            req: Self::Request,
140        ) -> io::Result<()>
141        where
142            T: AsyncWrite + Unpin + Send,
143        {
144            let data = serde_json::to_vec(&req)?;
145
146            io.write_all(data.as_ref()).await?;
147
148            Ok(())
149        }
150
151        async fn write_response<T>(
152            &mut self,
153            _: &Self::Protocol,
154            io: &mut T,
155            resp: Self::Response,
156        ) -> io::Result<()>
157        where
158            T: AsyncWrite + Unpin + Send,
159        {
160            let data = serde_json::to_vec(&resp)?;
161
162            io.write_all(data.as_ref()).await?;
163
164            Ok(())
165        }
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use futures::AsyncWriteExt;
172    use futures_ringbuf::Endpoint;
173    use libp2p_swarm::StreamProtocol;
174    use serde::{Deserialize, Serialize};
175
176    use crate::Codec;
177
178    #[async_std::test]
179    async fn test_codec() {
180        let expected_request = TestRequest {
181            payload: "test_payload".to_string(),
182        };
183        let expected_response = TestResponse {
184            payload: "test_payload".to_string(),
185        };
186        let protocol = StreamProtocol::new("/test_json/1");
187        let mut codec: super::codec::Codec<TestRequest, TestResponse> =
188            super::codec::Codec::default();
189
190        let (mut a, mut b) = Endpoint::pair(124, 124);
191        codec
192            .write_request(&protocol, &mut a, expected_request.clone())
193            .await
194            .expect("Should write request");
195        a.close().await.unwrap();
196
197        let actual_request = codec
198            .read_request(&protocol, &mut b)
199            .await
200            .expect("Should read request");
201        b.close().await.unwrap();
202
203        assert_eq!(actual_request, expected_request);
204
205        let (mut a, mut b) = Endpoint::pair(124, 124);
206        codec
207            .write_response(&protocol, &mut a, expected_response.clone())
208            .await
209            .expect("Should write response");
210        a.close().await.unwrap();
211
212        let actual_response = codec
213            .read_response(&protocol, &mut b)
214            .await
215            .expect("Should read response");
216        b.close().await.unwrap();
217
218        assert_eq!(actual_response, expected_response);
219    }
220
221    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
222    struct TestRequest {
223        payload: String,
224    }
225
226    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
227    struct TestResponse {
228        payload: String,
229    }
230}