use crate::proto;
use crate::protocol::{self, MAX_MESSAGE_SIZE};
use asynchronous_codec::{Framed, FramedParts};
use bytes::Bytes;
use futures::prelude::*;
use libp2p_identity::PeerId;
use libp2p_swarm::Stream;
use thiserror::Error;
pub(crate) async fn handle_open_circuit(io: Stream) -> Result<Circuit, FatalUpgradeError> {
let mut substream = Framed::new(io, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE));
let proto::StopMessage {
type_pb,
peer,
limit,
status: _,
} = substream
.next()
.await
.ok_or(FatalUpgradeError::StreamClosed)??;
match type_pb {
proto::StopMessageType::CONNECT => {
let src_peer_id = PeerId::from_bytes(&peer.ok_or(FatalUpgradeError::MissingPeer)?.id)
.map_err(|_| FatalUpgradeError::ParsePeerId)?;
Ok(Circuit {
substream,
src_peer_id,
limit: limit.map(Into::into),
})
}
proto::StopMessageType::STATUS => Err(FatalUpgradeError::UnexpectedTypeStatus),
}
}
#[derive(Debug, Error)]
pub enum UpgradeError {
#[error("Fatal")]
Fatal(#[from] FatalUpgradeError),
}
impl From<quick_protobuf_codec::Error> for UpgradeError {
fn from(error: quick_protobuf_codec::Error) -> Self {
Self::Fatal(error.into())
}
}
#[derive(Debug, Error)]
pub enum FatalUpgradeError {
#[error(transparent)]
Codec(#[from] quick_protobuf_codec::Error),
#[error("Stream closed")]
StreamClosed,
#[error("Failed to parse response type field.")]
ParseTypeField,
#[error("Failed to parse peer id.")]
ParsePeerId,
#[error("Expected 'peer' field to be set.")]
MissingPeer,
#[error("Unexpected message type 'status'")]
UnexpectedTypeStatus,
}
pub(crate) struct Circuit {
substream: Framed<Stream, quick_protobuf_codec::Codec<proto::StopMessage>>,
src_peer_id: PeerId,
limit: Option<protocol::Limit>,
}
impl Circuit {
pub(crate) fn src_peer_id(&self) -> PeerId {
self.src_peer_id
}
pub(crate) fn limit(&self) -> Option<protocol::Limit> {
self.limit
}
pub(crate) async fn accept(mut self) -> Result<(Stream, Bytes), UpgradeError> {
let msg = proto::StopMessage {
type_pb: proto::StopMessageType::STATUS,
peer: None,
limit: None,
status: Some(proto::Status::OK),
};
self.send(msg).await?;
let FramedParts {
io,
read_buffer,
write_buffer,
..
} = self.substream.into_parts();
assert!(
write_buffer.is_empty(),
"Expect a flushed Framed to have an empty write buffer."
);
Ok((io, read_buffer.freeze()))
}
pub(crate) async fn deny(mut self, status: proto::Status) -> Result<(), UpgradeError> {
let msg = proto::StopMessage {
type_pb: proto::StopMessageType::STATUS,
peer: None,
limit: None,
status: Some(status),
};
self.send(msg).await.map_err(Into::into)
}
async fn send(&mut self, msg: proto::StopMessage) -> Result<(), quick_protobuf_codec::Error> {
self.substream.send(msg).await?;
self.substream.flush().await?;
Ok(())
}
}