use crate::protocol::{
KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg,
KademliaProtocolConfig,
};
use crate::record::Record;
use futures::prelude::*;
use libp2p_core::protocols_handler::{
KeepAlive,
SubstreamProtocol,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId, upgrade::Negotiated};
use multihash::Multihash;
use std::{borrow::Cow, error, fmt, io, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use wasm_timer::Instant;
pub struct KademliaHandler<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite,
{
config: KademliaProtocolConfig,
allow_listening: bool,
next_connec_unique_id: UniqueConnecId,
substreams: Vec<SubstreamState<Negotiated<TSubstream>, TUserData>>,
keep_alive: KeepAlive,
}
enum SubstreamState<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite,
{
OutPendingOpen(KadRequestMsg, Option<TUserData>),
OutPendingSend(
KadOutStreamSink<TSubstream>,
KadRequestMsg,
Option<TUserData>,
),
OutPendingFlush(KadOutStreamSink<TSubstream>, Option<TUserData>),
OutWaitingAnswer(KadOutStreamSink<TSubstream>, TUserData),
OutReportError(KademliaHandlerQueryErr, TUserData),
OutClosing(KadOutStreamSink<TSubstream>),
InWaitingMessage(UniqueConnecId, KadInStreamSink<TSubstream>),
InWaitingUser(UniqueConnecId, KadInStreamSink<TSubstream>),
InPendingSend(UniqueConnecId, KadInStreamSink<TSubstream>, KadResponseMsg),
InPendingFlush(UniqueConnecId, KadInStreamSink<TSubstream>),
InClosing(KadInStreamSink<TSubstream>),
}
impl<TSubstream, TUserData> SubstreamState<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite,
{
fn try_close(self) -> AsyncSink<Self> {
match self {
SubstreamState::OutPendingOpen(_, _)
| SubstreamState::OutReportError(_, _) => AsyncSink::Ready,
SubstreamState::OutPendingSend(mut stream, _, _)
| SubstreamState::OutPendingFlush(mut stream, _)
| SubstreamState::OutWaitingAnswer(mut stream, _)
| SubstreamState::OutClosing(mut stream) => match stream.close() {
Ok(Async::Ready(())) | Err(_) => AsyncSink::Ready,
Ok(Async::NotReady) => AsyncSink::NotReady(SubstreamState::OutClosing(stream)),
},
SubstreamState::InWaitingMessage(_, mut stream)
| SubstreamState::InWaitingUser(_, mut stream)
| SubstreamState::InPendingSend(_, mut stream, _)
| SubstreamState::InPendingFlush(_, mut stream)
| SubstreamState::InClosing(mut stream) => match stream.close() {
Ok(Async::Ready(())) | Err(_) => AsyncSink::Ready,
Ok(Async::NotReady) => AsyncSink::NotReady(SubstreamState::InClosing(stream)),
},
}
}
}
#[derive(Debug)]
pub enum KademliaHandlerEvent<TUserData> {
FindNodeReq {
key: PeerId,
request_id: KademliaRequestId,
},
FindNodeRes {
closer_peers: Vec<KadPeer>,
user_data: TUserData,
},
GetProvidersReq {
key: Multihash,
request_id: KademliaRequestId,
},
GetProvidersRes {
closer_peers: Vec<KadPeer>,
provider_peers: Vec<KadPeer>,
user_data: TUserData,
},
QueryError {
error: KademliaHandlerQueryErr,
user_data: TUserData,
},
AddProvider {
key: Multihash,
provider_peer: KadPeer,
},
GetValue {
key: Multihash,
request_id: KademliaRequestId,
},
GetValueRes {
result: Option<Record>,
closer_peers: Vec<KadPeer>,
user_data: TUserData,
},
PutValue {
key: Multihash,
value: Vec<u8>,
request_id: KademliaRequestId,
},
PutValueRes {
key: Multihash,
user_data: TUserData,
}
}
#[derive(Debug)]
pub enum KademliaHandlerQueryErr {
Upgrade(ProtocolsHandlerUpgrErr<io::Error>),
UnexpectedMessage,
Io(io::Error),
}
impl fmt::Display for KademliaHandlerQueryErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
KademliaHandlerQueryErr::Upgrade(err) => {
write!(f, "Error while performing Kademlia query: {}", err)
},
KademliaHandlerQueryErr::UnexpectedMessage => {
write!(f, "Remote answered our Kademlia RPC query with the wrong message type")
},
KademliaHandlerQueryErr::Io(err) => {
write!(f, "I/O error during a Kademlia RPC query: {}", err)
},
}
}
}
impl error::Error for KademliaHandlerQueryErr {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
KademliaHandlerQueryErr::Upgrade(err) => Some(err),
KademliaHandlerQueryErr::UnexpectedMessage => None,
KademliaHandlerQueryErr::Io(err) => Some(err),
}
}
}
impl From<ProtocolsHandlerUpgrErr<io::Error>> for KademliaHandlerQueryErr {
#[inline]
fn from(err: ProtocolsHandlerUpgrErr<io::Error>) -> Self {
KademliaHandlerQueryErr::Upgrade(err)
}
}
#[derive(Debug)]
pub enum KademliaHandlerIn<TUserData> {
FindNodeReq {
key: Multihash,
user_data: TUserData,
},
FindNodeRes {
closer_peers: Vec<KadPeer>,
request_id: KademliaRequestId,
},
GetProvidersReq {
key: Multihash,
user_data: TUserData,
},
GetProvidersRes {
closer_peers: Vec<KadPeer>,
provider_peers: Vec<KadPeer>,
request_id: KademliaRequestId,
},
AddProvider {
key: Multihash,
provider_peer: KadPeer,
},
GetValue {
key: Multihash,
user_data: TUserData,
},
GetValueRes {
result: Option<Record>,
closer_peers: Vec<KadPeer>,
request_id: KademliaRequestId,
},
PutValue {
key: Multihash,
value: Vec<u8>,
user_data: TUserData,
},
PutValueRes {
key: Multihash,
value: Vec<u8>,
request_id: KademliaRequestId,
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct KademliaRequestId {
connec_unique_id: UniqueConnecId,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct UniqueConnecId(u64);
impl<TSubstream, TUserData> KademliaHandler<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite,
{
pub fn dial_only() -> Self {
KademliaHandler::with_allow_listening(false)
}
pub fn dial_and_listen() -> Self {
KademliaHandler::with_allow_listening(true)
}
fn with_allow_listening(allow_listening: bool) -> Self {
KademliaHandler {
config: Default::default(),
allow_listening,
next_connec_unique_id: UniqueConnecId(0),
substreams: Vec::new(),
keep_alive: KeepAlive::Yes,
}
}
pub fn with_protocol_name(mut self, name: impl Into<Cow<'static, [u8]>>) -> Self {
self.config = self.config.with_protocol_name(name);
self
}
}
impl<TSubstream, TUserData> Default for KademliaHandler<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite,
{
#[inline]
fn default() -> Self {
KademliaHandler::dial_and_listen()
}
}
impl<TSubstream, TUserData> ProtocolsHandler for KademliaHandler<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite,
TUserData: Clone,
{
type InEvent = KademliaHandlerIn<TUserData>;
type OutEvent = KademliaHandlerEvent<TUserData>;
type Error = io::Error;
type Substream = TSubstream;
type InboundProtocol = upgrade::EitherUpgrade<KademliaProtocolConfig, upgrade::DeniedUpgrade>;
type OutboundProtocol = KademliaProtocolConfig;
type OutboundOpenInfo = (KadRequestMsg, Option<TUserData>);
#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
if self.allow_listening {
SubstreamProtocol::new(self.config.clone()).map_upgrade(upgrade::EitherUpgrade::A)
} else {
SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade))
}
}
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
(msg, user_data): Self::OutboundOpenInfo,
) {
self.substreams
.push(SubstreamState::OutPendingSend(protocol, msg, user_data));
}
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<TSubstream>>::Output,
) {
let protocol = match protocol {
EitherOutput::First(p) => p,
EitherOutput::Second(p) => void::unreachable(p),
};
debug_assert!(self.allow_listening);
let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
self.substreams
.push(SubstreamState::InWaitingMessage(connec_unique_id, protocol));
}
#[inline]
fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) {
match message {
KademliaHandlerIn::FindNodeReq { key, user_data } => {
match PeerId::from_multihash(key.clone()) {
Ok(key) => {
let msg = KadRequestMsg::FindNode { key };
self.substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
},
Err(_) => (),
}
}
KademliaHandlerIn::FindNodeRes {
closer_peers,
request_id,
} => {
let pos = self.substreams.iter().position(|state| match state {
SubstreamState::InWaitingUser(ref conn_id, _) =>
conn_id == &request_id.connec_unique_id,
_ => false,
});
if let Some(pos) = pos {
let (conn_id, substream) = match self.substreams.remove(pos) {
SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
_ => unreachable!(),
};
let msg = KadResponseMsg::FindNode {
closer_peers: closer_peers.clone(),
};
self.substreams
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
}
}
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
let msg = KadRequestMsg::GetProviders { key: key.clone() };
self.substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
}
KademliaHandlerIn::GetProvidersRes {
closer_peers,
provider_peers,
request_id,
} => {
let pos = self.substreams.iter().position(|state| match state {
SubstreamState::InWaitingUser(ref conn_id, _)
if conn_id == &request_id.connec_unique_id =>
{
true
}
_ => false,
});
if let Some(pos) = pos {
let (conn_id, substream) = match self.substreams.remove(pos) {
SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
_ => unreachable!(),
};
let msg = KadResponseMsg::GetProviders {
closer_peers: closer_peers.clone(),
provider_peers: provider_peers.clone(),
};
self.substreams
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
}
}
KademliaHandlerIn::AddProvider { key, provider_peer } => {
let msg = KadRequestMsg::AddProvider {
key: key.clone(),
provider_peer: provider_peer.clone(),
};
self.substreams
.push(SubstreamState::OutPendingOpen(msg, None));
}
KademliaHandlerIn::GetValue { key, user_data } => {
let msg = KadRequestMsg::GetValue { key };
self.substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
}
KademliaHandlerIn::PutValue { key, value, user_data } => {
let msg = KadRequestMsg::PutValue {
key,
value,
};
self.substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
}
KademliaHandlerIn::GetValueRes {
result,
closer_peers,
request_id,
} => {
let pos = self.substreams.iter().position(|state| match state {
SubstreamState::InWaitingUser(ref conn_id, _)
=> conn_id == &request_id.connec_unique_id,
_ => false,
});
if let Some(pos) = pos {
let (conn_id, substream) = match self.substreams.remove(pos) {
SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
_ => unreachable!(),
};
let msg = KadResponseMsg::GetValue {
result,
closer_peers: closer_peers.clone(),
};
self.substreams
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
}
}
KademliaHandlerIn::PutValueRes {
key,
request_id,
value,
} => {
let pos = self.substreams.iter().position(|state| match state {
SubstreamState::InWaitingUser(ref conn_id, _)
if conn_id == &request_id.connec_unique_id =>
{
true
}
_ => false,
});
if let Some(pos) = pos {
let (conn_id, substream) = match self.substreams.remove(pos) {
SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
_ => unreachable!(),
};
let msg = KadResponseMsg::PutValue {
key,
value,
};
self.substreams
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
}
}
}
}
#[inline]
fn inject_dial_upgrade_error(
&mut self,
(_, user_data): Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<io::Error>,
) {
if let Some(user_data) = user_data {
self.substreams
.push(SubstreamState::OutReportError(error.into(), user_data));
}
}
#[inline]
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn poll(
&mut self,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
io::Error,
> {
for n in (0..self.substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n);
loop {
match advance_substream(substream, self.config.clone()) {
(Some(new_state), Some(event), _) => {
self.substreams.push(new_state);
return Ok(Async::Ready(event));
}
(None, Some(event), _) => {
return Ok(Async::Ready(event));
}
(Some(new_state), None, false) => {
self.substreams.push(new_state);
break;
}
(Some(new_state), None, true) => {
substream = new_state;
continue;
}
(None, None, _) => {
break;
}
}
}
}
if self.substreams.is_empty() {
self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10));
} else {
self.keep_alive = KeepAlive::Yes;
}
Ok(Async::NotReady)
}
}
fn advance_substream<TSubstream, TUserData>(
state: SubstreamState<TSubstream, TUserData>,
upgrade: KademliaProtocolConfig,
) -> (
Option<SubstreamState<TSubstream, TUserData>>,
Option<
ProtocolsHandlerEvent<
KademliaProtocolConfig,
(KadRequestMsg, Option<TUserData>),
KademliaHandlerEvent<TUserData>,
>,
>,
bool,
)
where
TSubstream: AsyncRead + AsyncWrite,
{
match state {
SubstreamState::OutPendingOpen(msg, user_data) => {
let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(upgrade),
info: (msg, user_data),
};
(None, Some(ev), false)
}
SubstreamState::OutPendingSend(mut substream, msg, user_data) => {
match substream.start_send(msg) {
Ok(AsyncSink::Ready) => (
Some(SubstreamState::OutPendingFlush(substream, user_data)),
None,
true,
),
Ok(AsyncSink::NotReady(msg)) => (
Some(SubstreamState::OutPendingSend(substream, msg, user_data)),
None,
false,
),
Err(error) => {
let event = if let Some(user_data) = user_data {
Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data
}))
} else {
None
};
(None, event, false)
}
}
}
SubstreamState::OutPendingFlush(mut substream, user_data) => {
match substream.poll_complete() {
Ok(Async::Ready(())) => {
if let Some(user_data) = user_data {
(
Some(SubstreamState::OutWaitingAnswer(substream, user_data)),
None,
true,
)
} else {
(Some(SubstreamState::OutClosing(substream)), None, true)
}
}
Ok(Async::NotReady) => (
Some(SubstreamState::OutPendingFlush(substream, user_data)),
None,
false,
),
Err(error) => {
let event = if let Some(user_data) = user_data {
Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
}))
} else {
None
};
(None, event, false)
}
}
}
SubstreamState::OutWaitingAnswer(mut substream, user_data) => match substream.poll() {
Ok(Async::Ready(Some(msg))) => {
let new_state = SubstreamState::OutClosing(substream);
let event = process_kad_response(msg, user_data);
(
Some(new_state),
Some(ProtocolsHandlerEvent::Custom(event)),
true,
)
}
Ok(Async::NotReady) => (
Some(SubstreamState::OutWaitingAnswer(substream, user_data)),
None,
false,
),
Err(error) => {
let event = KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
};
(None, Some(ProtocolsHandlerEvent::Custom(event)), false)
}
Ok(Async::Ready(None)) => {
let event = KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()),
user_data,
};
(None, Some(ProtocolsHandlerEvent::Custom(event)), false)
}
},
SubstreamState::OutReportError(error, user_data) => {
let event = KademliaHandlerEvent::QueryError { error, user_data };
(None, Some(ProtocolsHandlerEvent::Custom(event)), false)
}
SubstreamState::OutClosing(mut stream) => match stream.close() {
Ok(Async::Ready(())) => (None, None, false),
Ok(Async::NotReady) => (Some(SubstreamState::OutClosing(stream)), None, false),
Err(_) => (None, None, false),
},
SubstreamState::InWaitingMessage(id, mut substream) => match substream.poll() {
Ok(Async::Ready(Some(msg))) => {
if let Ok(ev) = process_kad_request(msg, id) {
(
Some(SubstreamState::InWaitingUser(id, substream)),
Some(ProtocolsHandlerEvent::Custom(ev)),
false,
)
} else {
(Some(SubstreamState::InClosing(substream)), None, true)
}
}
Ok(Async::NotReady) => (
Some(SubstreamState::InWaitingMessage(id, substream)),
None,
false,
),
Ok(Async::Ready(None)) | Err(_) => (None, None, false),
},
SubstreamState::InWaitingUser(id, substream) => (
Some(SubstreamState::InWaitingUser(id, substream)),
None,
false,
),
SubstreamState::InPendingSend(id, mut substream, msg) => match substream.start_send(msg) {
Ok(AsyncSink::Ready) => (
Some(SubstreamState::InPendingFlush(id, substream)),
None,
true,
),
Ok(AsyncSink::NotReady(msg)) => (
Some(SubstreamState::InPendingSend(id, substream, msg)),
None,
false,
),
Err(_) => (None, None, false),
},
SubstreamState::InPendingFlush(id, mut substream) => match substream.poll_complete() {
Ok(Async::Ready(())) => (
Some(SubstreamState::InWaitingMessage(id, substream)),
None,
true,
),
Ok(Async::NotReady) => (
Some(SubstreamState::InPendingFlush(id, substream)),
None,
false,
),
Err(_) => (None, None, false),
},
SubstreamState::InClosing(mut stream) => match stream.close() {
Ok(Async::Ready(())) => (None, None, false),
Ok(Async::NotReady) => (Some(SubstreamState::InClosing(stream)), None, false),
Err(_) => (None, None, false),
},
}
}
fn process_kad_request<TUserData>(
event: KadRequestMsg,
connec_unique_id: UniqueConnecId,
) -> Result<KademliaHandlerEvent<TUserData>, io::Error> {
match event {
KadRequestMsg::Ping => {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"the PING Kademlia message is not implemented",
))
}
KadRequestMsg::FindNode { key } => Ok(KademliaHandlerEvent::FindNodeReq {
key,
request_id: KademliaRequestId { connec_unique_id },
}),
KadRequestMsg::GetProviders { key } => Ok(KademliaHandlerEvent::GetProvidersReq {
key,
request_id: KademliaRequestId { connec_unique_id },
}),
KadRequestMsg::AddProvider { key, provider_peer } => {
Ok(KademliaHandlerEvent::AddProvider { key, provider_peer })
}
KadRequestMsg::GetValue { key } => Ok(KademliaHandlerEvent::GetValue {
key,
request_id: KademliaRequestId { connec_unique_id },
}),
KadRequestMsg::PutValue { key, value } => Ok(KademliaHandlerEvent::PutValue {
key,
value,
request_id: KademliaRequestId { connec_unique_id },
})
}
}
fn process_kad_response<TUserData>(
event: KadResponseMsg,
user_data: TUserData,
) -> KademliaHandlerEvent<TUserData> {
match event {
KadResponseMsg::Pong => {
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::UnexpectedMessage,
user_data,
}
}
KadResponseMsg::FindNode { closer_peers } => {
KademliaHandlerEvent::FindNodeRes {
closer_peers,
user_data,
}
},
KadResponseMsg::GetProviders {
closer_peers,
provider_peers,
} => KademliaHandlerEvent::GetProvidersRes {
closer_peers,
provider_peers,
user_data,
},
KadResponseMsg::GetValue {
result,
closer_peers,
} => KademliaHandlerEvent::GetValueRes {
result,
closer_peers,
user_data,
},
KadResponseMsg::PutValue { key, .. } => {
KademliaHandlerEvent::PutValueRes {
key,
user_data,
}
}
}
}