libp2p_floodsub/
protocol.rs1use std::{io, iter, pin::Pin};
22
23use asynchronous_codec::Framed;
24use bytes::Bytes;
25use futures::{
26 io::{AsyncRead, AsyncWrite},
27 Future, SinkExt, StreamExt,
28};
29use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
30use libp2p_identity::PeerId;
31use libp2p_swarm::StreamProtocol;
32
33use crate::{proto, topic::Topic};
34
35const MAX_MESSAGE_LEN_BYTES: usize = 2048;
36
37const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/floodsub/1.0.0");
38
39#[derive(Debug, Clone, Default)]
41pub struct FloodsubProtocol {}
42
43impl FloodsubProtocol {
44 pub fn new() -> FloodsubProtocol {
46 FloodsubProtocol {}
47 }
48}
49
50impl UpgradeInfo for FloodsubProtocol {
51 type Info = StreamProtocol;
52 type InfoIter = iter::Once<Self::Info>;
53
54 fn protocol_info(&self) -> Self::InfoIter {
55 iter::once(PROTOCOL_NAME)
56 }
57}
58
59impl<TSocket> InboundUpgrade<TSocket> for FloodsubProtocol
60where
61 TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
62{
63 type Output = FloodsubRpc;
64 type Error = FloodsubError;
65 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
66
67 fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
68 Box::pin(async move {
69 let mut framed = Framed::new(
70 socket,
71 quick_protobuf_codec::Codec::<proto::RPC>::new(MAX_MESSAGE_LEN_BYTES),
72 );
73
74 let rpc = framed
75 .next()
76 .await
77 .ok_or_else(|| FloodsubError::ReadError(io::ErrorKind::UnexpectedEof.into()))?
78 .map_err(CodecError)?;
79
80 let mut messages = Vec::with_capacity(rpc.publish.len());
81 for publish in rpc.publish.into_iter() {
82 messages.push(FloodsubMessage {
83 source: PeerId::from_bytes(&publish.from.unwrap_or_default())
84 .map_err(|_| FloodsubError::InvalidPeerId)?,
85 data: publish.data.unwrap_or_default().into(),
86 sequence_number: publish.seqno.unwrap_or_default(),
87 topics: publish.topic_ids.into_iter().map(Topic::new).collect(),
88 });
89 }
90
91 Ok(FloodsubRpc {
92 messages,
93 subscriptions: rpc
94 .subscriptions
95 .into_iter()
96 .map(|sub| FloodsubSubscription {
97 action: if Some(true) == sub.subscribe {
98 FloodsubSubscriptionAction::Subscribe
99 } else {
100 FloodsubSubscriptionAction::Unsubscribe
101 },
102 topic: Topic::new(sub.topic_id.unwrap_or_default()),
103 })
104 .collect(),
105 })
106 })
107 }
108}
109
110#[derive(thiserror::Error, Debug)]
112pub enum FloodsubError {
113 #[error("Failed to decode PeerId from message")]
115 InvalidPeerId,
116 #[error("Failed to decode protobuf")]
118 ProtobufError(#[from] CodecError),
119 #[error("Failed to read from socket")]
121 ReadError(#[from] io::Error),
122}
123
124#[derive(thiserror::Error, Debug)]
125#[error(transparent)]
126pub struct CodecError(#[from] quick_protobuf_codec::Error);
127
128#[derive(Debug, Clone, PartialEq, Eq, Hash)]
130pub struct FloodsubRpc {
131 pub messages: Vec<FloodsubMessage>,
133 pub subscriptions: Vec<FloodsubSubscription>,
135}
136
137impl UpgradeInfo for FloodsubRpc {
138 type Info = StreamProtocol;
139 type InfoIter = iter::Once<Self::Info>;
140
141 fn protocol_info(&self) -> Self::InfoIter {
142 iter::once(PROTOCOL_NAME)
143 }
144}
145
146impl<TSocket> OutboundUpgrade<TSocket> for FloodsubRpc
147where
148 TSocket: AsyncWrite + AsyncRead + Send + Unpin + 'static,
149{
150 type Output = ();
151 type Error = CodecError;
152 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
153
154 fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
155 Box::pin(async move {
156 let mut framed = Framed::new(
157 socket,
158 quick_protobuf_codec::Codec::<proto::RPC>::new(MAX_MESSAGE_LEN_BYTES),
159 );
160 framed.send(self.into_rpc()).await?;
161 framed.close().await?;
162 Ok(())
163 })
164 }
165}
166
167impl FloodsubRpc {
168 fn into_rpc(self) -> proto::RPC {
170 proto::RPC {
171 publish: self
172 .messages
173 .into_iter()
174 .map(|msg| proto::Message {
175 from: Some(msg.source.to_bytes()),
176 data: Some(msg.data.to_vec()),
177 seqno: Some(msg.sequence_number),
178 topic_ids: msg.topics.into_iter().map(|topic| topic.into()).collect(),
179 })
180 .collect(),
181
182 subscriptions: self
183 .subscriptions
184 .into_iter()
185 .map(|topic| proto::SubOpts {
186 subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe),
187 topic_id: Some(topic.topic.into()),
188 })
189 .collect(),
190 }
191 }
192}
193
194#[derive(Debug, Clone, PartialEq, Eq, Hash)]
196pub struct FloodsubMessage {
197 pub source: PeerId,
199
200 pub data: Bytes,
202
203 pub sequence_number: Vec<u8>,
205
206 pub topics: Vec<Topic>,
210}
211
212#[derive(Debug, Clone, PartialEq, Eq, Hash)]
214pub struct FloodsubSubscription {
215 pub action: FloodsubSubscriptionAction,
217 pub topic: Topic,
219}
220
221#[derive(Debug, Clone, PartialEq, Eq, Hash)]
223pub enum FloodsubSubscriptionAction {
224 Subscribe,
226 Unsubscribe,
228}