libp2p_floodsub/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{io, iter, pin::Pin};
22
23use asynchronous_codec::Framed;
24use bytes::Bytes;
25use futures::{
26    io::{AsyncRead, AsyncWrite},
27    Future, SinkExt, StreamExt,
28};
29use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
30use libp2p_identity::PeerId;
31use libp2p_swarm::StreamProtocol;
32
33use crate::{proto, topic::Topic};
34
35const MAX_MESSAGE_LEN_BYTES: usize = 2048;
36
37const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/floodsub/1.0.0");
38
39/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
40#[derive(Debug, Clone, Default)]
41pub struct FloodsubProtocol {}
42
43impl FloodsubProtocol {
44    /// Builds a new `FloodsubProtocol`.
45    pub fn new() -> FloodsubProtocol {
46        FloodsubProtocol {}
47    }
48}
49
50impl UpgradeInfo for FloodsubProtocol {
51    type Info = StreamProtocol;
52    type InfoIter = iter::Once<Self::Info>;
53
54    fn protocol_info(&self) -> Self::InfoIter {
55        iter::once(PROTOCOL_NAME)
56    }
57}
58
59impl<TSocket> InboundUpgrade<TSocket> for FloodsubProtocol
60where
61    TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
62{
63    type Output = FloodsubRpc;
64    type Error = FloodsubError;
65    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
66
67    fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
68        Box::pin(async move {
69            let mut framed = Framed::new(
70                socket,
71                quick_protobuf_codec::Codec::<proto::RPC>::new(MAX_MESSAGE_LEN_BYTES),
72            );
73
74            let rpc = framed
75                .next()
76                .await
77                .ok_or_else(|| FloodsubError::ReadError(io::ErrorKind::UnexpectedEof.into()))?
78                .map_err(CodecError)?;
79
80            let mut messages = Vec::with_capacity(rpc.publish.len());
81            for publish in rpc.publish.into_iter() {
82                messages.push(FloodsubMessage {
83                    source: PeerId::from_bytes(&publish.from.unwrap_or_default())
84                        .map_err(|_| FloodsubError::InvalidPeerId)?,
85                    data: publish.data.unwrap_or_default().into(),
86                    sequence_number: publish.seqno.unwrap_or_default(),
87                    topics: publish.topic_ids.into_iter().map(Topic::new).collect(),
88                });
89            }
90
91            Ok(FloodsubRpc {
92                messages,
93                subscriptions: rpc
94                    .subscriptions
95                    .into_iter()
96                    .map(|sub| FloodsubSubscription {
97                        action: if Some(true) == sub.subscribe {
98                            FloodsubSubscriptionAction::Subscribe
99                        } else {
100                            FloodsubSubscriptionAction::Unsubscribe
101                        },
102                        topic: Topic::new(sub.topic_id.unwrap_or_default()),
103                    })
104                    .collect(),
105            })
106        })
107    }
108}
109
110/// Reach attempt interrupt errors.
111#[derive(thiserror::Error, Debug)]
112pub enum FloodsubError {
113    /// Error when parsing the `PeerId` in the message.
114    #[error("Failed to decode PeerId from message")]
115    InvalidPeerId,
116    /// Error when decoding the raw buffer into a protobuf.
117    #[error("Failed to decode protobuf")]
118    ProtobufError(#[from] CodecError),
119    /// Error when reading the packet from the socket.
120    #[error("Failed to read from socket")]
121    ReadError(#[from] io::Error),
122}
123
124#[derive(thiserror::Error, Debug)]
125#[error(transparent)]
126pub struct CodecError(#[from] quick_protobuf_codec::Error);
127
128/// An RPC received by the floodsub system.
129#[derive(Debug, Clone, PartialEq, Eq, Hash)]
130pub struct FloodsubRpc {
131    /// List of messages that were part of this RPC query.
132    pub messages: Vec<FloodsubMessage>,
133    /// List of subscriptions.
134    pub subscriptions: Vec<FloodsubSubscription>,
135}
136
137impl UpgradeInfo for FloodsubRpc {
138    type Info = StreamProtocol;
139    type InfoIter = iter::Once<Self::Info>;
140
141    fn protocol_info(&self) -> Self::InfoIter {
142        iter::once(PROTOCOL_NAME)
143    }
144}
145
146impl<TSocket> OutboundUpgrade<TSocket> for FloodsubRpc
147where
148    TSocket: AsyncWrite + AsyncRead + Send + Unpin + 'static,
149{
150    type Output = ();
151    type Error = CodecError;
152    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
153
154    fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
155        Box::pin(async move {
156            let mut framed = Framed::new(
157                socket,
158                quick_protobuf_codec::Codec::<proto::RPC>::new(MAX_MESSAGE_LEN_BYTES),
159            );
160            framed.send(self.into_rpc()).await?;
161            framed.close().await?;
162            Ok(())
163        })
164    }
165}
166
167impl FloodsubRpc {
168    /// Turns this `FloodsubRpc` into a message that can be sent to a substream.
169    fn into_rpc(self) -> proto::RPC {
170        proto::RPC {
171            publish: self
172                .messages
173                .into_iter()
174                .map(|msg| proto::Message {
175                    from: Some(msg.source.to_bytes()),
176                    data: Some(msg.data.to_vec()),
177                    seqno: Some(msg.sequence_number),
178                    topic_ids: msg.topics.into_iter().map(|topic| topic.into()).collect(),
179                })
180                .collect(),
181
182            subscriptions: self
183                .subscriptions
184                .into_iter()
185                .map(|topic| proto::SubOpts {
186                    subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe),
187                    topic_id: Some(topic.topic.into()),
188                })
189                .collect(),
190        }
191    }
192}
193
194/// A message received by the floodsub system.
195#[derive(Debug, Clone, PartialEq, Eq, Hash)]
196pub struct FloodsubMessage {
197    /// Id of the peer that published this message.
198    pub source: PeerId,
199
200    /// Content of the message. Its meaning is out of scope of this library.
201    pub data: Bytes,
202
203    /// An incrementing sequence number.
204    pub sequence_number: Vec<u8>,
205
206    /// List of topics this message belongs to.
207    ///
208    /// Each message can belong to multiple topics at once.
209    pub topics: Vec<Topic>,
210}
211
212/// A subscription received by the floodsub system.
213#[derive(Debug, Clone, PartialEq, Eq, Hash)]
214pub struct FloodsubSubscription {
215    /// Action to perform.
216    pub action: FloodsubSubscriptionAction,
217    /// The topic from which to subscribe or unsubscribe.
218    pub topic: Topic,
219}
220
221/// Action that a subscription wants to perform.
222#[derive(Debug, Clone, PartialEq, Eq, Hash)]
223pub enum FloodsubSubscriptionAction {
224    /// The remote wants to subscribe to the given topic.
225    Subscribe,
226    /// The remote wants to unsubscribe from the given topic.
227    Unsubscribe,
228}