use bytes::BytesMut;
use codec::UviBytes;
use crate::dht_proto as proto;
use crate::record::{self, Record};
use futures::prelude::*;
use futures_codec::Framed;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use prost::Message;
use std::{borrow::Cow, convert::TryFrom, time::Duration};
use std::{io, iter};
use unsigned_varint::codec;
use wasm_timer::Instant;
pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0";
pub const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024;
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub enum KadConnectionType {
NotConnected = 0,
Connected = 1,
CanConnect = 2,
CannotConnect = 3,
}
impl From<proto::message::ConnectionType> for KadConnectionType {
fn from(raw: proto::message::ConnectionType) -> KadConnectionType {
use proto::message::ConnectionType::*;
match raw {
NotConnected => KadConnectionType::NotConnected,
Connected => KadConnectionType::Connected,
CanConnect => KadConnectionType::CanConnect,
CannotConnect => KadConnectionType::CannotConnect,
}
}
}
impl Into<proto::message::ConnectionType> for KadConnectionType {
fn into(self) -> proto::message::ConnectionType {
use proto::message::ConnectionType::*;
match self {
KadConnectionType::NotConnected => NotConnected,
KadConnectionType::Connected => Connected,
KadConnectionType::CanConnect => CanConnect,
KadConnectionType::CannotConnect => CannotConnect,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KadPeer {
pub node_id: PeerId,
pub multiaddrs: Vec<Multiaddr>,
pub connection_ty: KadConnectionType,
}
impl TryFrom<proto::message::Peer> for KadPeer {
type Error = io::Error;
fn try_from(peer: proto::message::Peer) -> Result<KadPeer, Self::Error> {
let node_id = PeerId::from_bytes(peer.id)
.map_err(|_| invalid_data("invalid peer id"))?;
let mut addrs = Vec::with_capacity(peer.addrs.len());
for addr in peer.addrs.into_iter() {
let as_ma = Multiaddr::try_from(addr).map_err(invalid_data)?;
addrs.push(as_ma);
}
debug_assert_eq!(addrs.len(), addrs.capacity());
let connection_ty = proto::message::ConnectionType::from_i32(peer.connection)
.ok_or_else(|| invalid_data("unknown connection type"))?
.into();
Ok(KadPeer {
node_id,
multiaddrs: addrs,
connection_ty
})
}
}
impl Into<proto::message::Peer> for KadPeer {
fn into(self) -> proto::message::Peer {
proto::message::Peer {
id: self.node_id.into_bytes(),
addrs: self.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
connection: {
let ct: proto::message::ConnectionType = self.connection_ty.into();
ct as i32
}
}
}
}
#[derive(Debug, Clone)]
pub struct KademliaProtocolConfig {
protocol_name: Cow<'static, [u8]>,
max_packet_size: usize,
}
impl KademliaProtocolConfig {
pub fn protocol_name(&self) -> &[u8] {
&self.protocol_name
}
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
self.protocol_name = name.into();
}
pub fn set_max_packet_size(&mut self, size: usize) {
self.max_packet_size = size;
}
}
impl Default for KademliaProtocolConfig {
fn default() -> Self {
KademliaProtocolConfig {
protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
max_packet_size: DEFAULT_MAX_PACKET_SIZE,
}
}
}
impl UpgradeInfo for KademliaProtocolConfig {
type Info = Cow<'static, [u8]>;
type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol_name.clone())
}
}
impl<C> InboundUpgrade<C> for KademliaProtocolConfig
where
C: AsyncRead + AsyncWrite + Unpin,
{
type Output = KadInStreamSink<C>;
type Future = future::Ready<Result<Self::Output, io::Error>>;
type Error = io::Error;
fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default();
codec.set_max_len(self.max_packet_size);
future::ok(
Framed::new(incoming, codec)
.err_into()
.with::<_, _, fn(_) -> _, _>(|response| {
let proto_struct = resp_msg_to_proto(response);
let mut buf = Vec::with_capacity(proto_struct.encoded_len());
proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
future::ready(Ok(io::Cursor::new(buf)))
})
.and_then::<_, fn(_) -> _>(|bytes| {
let request = match proto::Message::decode(bytes) {
Ok(r) => r,
Err(err) => return future::ready(Err(err.into()))
};
future::ready(proto_to_req_msg(request))
}),
)
}
}
impl<C> OutboundUpgrade<C> for KademliaProtocolConfig
where
C: AsyncRead + AsyncWrite + Unpin,
{
type Output = KadOutStreamSink<C>;
type Future = future::Ready<Result<Self::Output, io::Error>>;
type Error = io::Error;
fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default();
codec.set_max_len(self.max_packet_size);
future::ok(
Framed::new(incoming, codec)
.err_into()
.with::<_, _, fn(_) -> _, _>(|request| {
let proto_struct = req_msg_to_proto(request);
let mut buf = Vec::with_capacity(proto_struct.encoded_len());
proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
future::ready(Ok(io::Cursor::new(buf)))
})
.and_then::<_, fn(_) -> _>(|bytes| {
let response = match proto::Message::decode(bytes) {
Ok(r) => r,
Err(err) => return future::ready(Err(err.into()))
};
future::ready(proto_to_resp_msg(response))
}),
)
}
}
pub type KadInStreamSink<S> = KadStreamSink<S, KadResponseMsg, KadRequestMsg>;
pub type KadOutStreamSink<S> = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;
pub type KadStreamSink<S, A, B> = stream::AndThen<
sink::With<
stream::ErrInto<Framed<S, UviBytes<io::Cursor<Vec<u8>>>>, io::Error>,
io::Cursor<Vec<u8>>,
A,
future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
fn(A) -> future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
>,
future::Ready<Result<B, io::Error>>,
fn(BytesMut) -> future::Ready<Result<B, io::Error>>,
>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum KadRequestMsg {
Ping,
FindNode {
key: Vec<u8>,
},
GetProviders {
key: record::Key,
},
AddProvider {
key: record::Key,
provider: KadPeer,
},
GetValue {
key: record::Key,
},
PutValue {
record: Record,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum KadResponseMsg {
Pong,
FindNode {
closer_peers: Vec<KadPeer>,
},
GetProviders {
closer_peers: Vec<KadPeer>,
provider_peers: Vec<KadPeer>,
},
GetValue {
record: Option<Record>,
closer_peers: Vec<KadPeer>,
},
PutValue {
key: record::Key,
value: Vec<u8>,
},
}
fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
match kad_msg {
KadRequestMsg::Ping => proto::Message {
r#type: proto::message::MessageType::Ping as i32,
.. proto::Message::default()
},
KadRequestMsg::FindNode { key } => proto::Message {
r#type: proto::message::MessageType::FindNode as i32,
key,
cluster_level_raw: 10,
.. proto::Message::default()
},
KadRequestMsg::GetProviders { key } => proto::Message {
r#type: proto::message::MessageType::GetProviders as i32,
key: key.to_vec(),
cluster_level_raw: 10,
.. proto::Message::default()
},
KadRequestMsg::AddProvider { key, provider } => proto::Message {
r#type: proto::message::MessageType::AddProvider as i32,
cluster_level_raw: 10,
key: key.to_vec(),
provider_peers: vec![provider.into()],
.. proto::Message::default()
},
KadRequestMsg::GetValue { key } => proto::Message {
r#type: proto::message::MessageType::GetValue as i32,
cluster_level_raw: 10,
key: key.to_vec(),
.. proto::Message::default()
},
KadRequestMsg::PutValue { record } => proto::Message {
r#type: proto::message::MessageType::PutValue as i32,
record: Some(record_to_proto(record)),
.. proto::Message::default()
}
}
}
fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
match kad_msg {
KadResponseMsg::Pong => proto::Message {
r#type: proto::message::MessageType::Ping as i32,
.. proto::Message::default()
},
KadResponseMsg::FindNode { closer_peers } => proto::Message {
r#type: proto::message::MessageType::FindNode as i32,
cluster_level_raw: 9,
closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
.. proto::Message::default()
},
KadResponseMsg::GetProviders { closer_peers, provider_peers } => proto::Message {
r#type: proto::message::MessageType::GetProviders as i32,
cluster_level_raw: 9,
closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
provider_peers: provider_peers.into_iter().map(KadPeer::into).collect(),
.. proto::Message::default()
},
KadResponseMsg::GetValue { record, closer_peers } => proto::Message {
r#type: proto::message::MessageType::GetValue as i32,
cluster_level_raw: 9,
closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
record: record.map(record_to_proto),
.. proto::Message::default()
},
KadResponseMsg::PutValue { key, value } => proto::Message {
r#type: proto::message::MessageType::PutValue as i32,
key: key.to_vec(),
record: Some(proto::Record {
key: key.to_vec(),
value,
.. proto::Record::default()
}),
.. proto::Message::default()
}
}
}
fn proto_to_req_msg(message: proto::Message) -> Result<KadRequestMsg, io::Error> {
let msg_type = proto::message::MessageType::from_i32(message.r#type)
.ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
match msg_type {
proto::message::MessageType::Ping => Ok(KadRequestMsg::Ping),
proto::message::MessageType::PutValue => {
let record = record_from_proto(message.record.unwrap_or_default())?;
Ok(KadRequestMsg::PutValue { record })
}
proto::message::MessageType::GetValue => {
Ok(KadRequestMsg::GetValue { key: record::Key::from(message.key) })
}
proto::message::MessageType::FindNode => {
Ok(KadRequestMsg::FindNode { key: message.key })
}
proto::message::MessageType::GetProviders => {
Ok(KadRequestMsg::GetProviders { key: record::Key::from(message.key)})
}
proto::message::MessageType::AddProvider => {
let provider = message.provider_peers
.into_iter()
.find_map(|peer| KadPeer::try_from(peer).ok());
if let Some(provider) = provider {
let key = record::Key::from(message.key);
Ok(KadRequestMsg::AddProvider { key, provider })
} else {
Err(invalid_data("AddProvider message with no valid peer."))
}
}
}
}
fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Error> {
let msg_type = proto::message::MessageType::from_i32(message.r#type)
.ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
match msg_type {
proto::message::MessageType::Ping => Ok(KadResponseMsg::Pong),
proto::message::MessageType::GetValue => {
let record =
if let Some(r) = message.record {
Some(record_from_proto(r)?)
} else {
None
};
let closer_peers = message.closer_peers.into_iter()
.filter_map(|peer| KadPeer::try_from(peer).ok())
.collect();
Ok(KadResponseMsg::GetValue { record, closer_peers })
}
proto::message::MessageType::FindNode => {
let closer_peers = message.closer_peers.into_iter()
.filter_map(|peer| KadPeer::try_from(peer).ok())
.collect();
Ok(KadResponseMsg::FindNode { closer_peers })
}
proto::message::MessageType::GetProviders => {
let closer_peers = message.closer_peers.into_iter()
.filter_map(|peer| KadPeer::try_from(peer).ok())
.collect();
let provider_peers = message.provider_peers.into_iter()
.filter_map(|peer| KadPeer::try_from(peer).ok())
.collect();
Ok(KadResponseMsg::GetProviders {
closer_peers,
provider_peers,
})
}
proto::message::MessageType::PutValue => {
let key = record::Key::from(message.key);
let rec = message.record.ok_or_else(|| {
invalid_data("received PutValue message with no record")
})?;
Ok(KadResponseMsg::PutValue {
key,
value: rec.value
})
}
proto::message::MessageType::AddProvider =>
Err(invalid_data("received an unexpected AddProvider message"))
}
}
fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
let key = record::Key::from(record.key);
let value = record.value;
let publisher =
if !record.publisher.is_empty() {
PeerId::from_bytes(record.publisher)
.map(Some)
.map_err(|_| invalid_data("Invalid publisher peer ID."))?
} else {
None
};
let expires =
if record.ttl > 0 {
Some(Instant::now() + Duration::from_secs(record.ttl as u64))
} else {
None
};
Ok(Record { key, value, publisher, expires })
}
fn record_to_proto(record: Record) -> proto::Record {
proto::Record {
key: record.key.to_vec(),
value: record.value,
publisher: record.publisher.map(PeerId::into_bytes).unwrap_or_default(),
ttl: record.expires
.map(|t| {
let now = Instant::now();
if t > now {
(t - now).as_secs() as u32
} else {
1
}
})
.unwrap_or(0),
time_received: String::new()
}
}
fn invalid_data<E>(e: E) -> io::Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>
{
io::Error::new(io::ErrorKind::InvalidData, e)
}
#[cfg(test)]
mod tests {
}