1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use instant::Instant;
use rand::{distributions, prelude::*};
use std::{io, time::Duration};
pub const PROTOCOL_NAME: &[u8] = b"/ipfs/ping/1.0.0";
/// The `Ping` protocol upgrade.
///
/// The ping protocol sends 32 bytes of random data in configurable
/// intervals over a single outbound substream, expecting to receive
/// the same bytes as a response. At the same time, incoming pings
/// on inbound substreams are answered by sending back the received bytes.
///
/// At most a single inbound and outbound substream is kept open at
/// any time. In case of a ping timeout or another error on a substream, the
/// substream is dropped. If a configurable number of consecutive
/// outbound pings fail, the connection is closed.
///
/// Successful pings report the round-trip time.
///
/// > **Note**: The round-trip time of a ping may be subject to delays induced
/// > by the underlying transport, e.g. in the case of TCP there is
/// > Nagle's algorithm, delayed acks and similar configuration options
/// > which can affect latencies especially on otherwise low-volume
/// > connections.
#[derive(Default, Debug, Copy, Clone)]
pub struct Ping;
const PING_SIZE: usize = 32;
/// Sends a ping and waits for the pong.
pub async fn send_ping<S>(mut stream: S) -> io::Result<(S, Duration)>
where
S: AsyncRead + AsyncWrite + Unpin,
{
let payload: [u8; PING_SIZE] = thread_rng().sample(distributions::Standard);
stream.write_all(&payload).await?;
stream.flush().await?;
let started = Instant::now();
let mut recv_payload = [0u8; PING_SIZE];
stream.read_exact(&mut recv_payload).await?;
if recv_payload == payload {
Ok((stream, started.elapsed()))
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Ping payload mismatch",
))
}
}
/// Waits for a ping and sends a pong.
pub async fn recv_ping<S>(mut stream: S) -> io::Result<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
let mut payload = [0u8; PING_SIZE];
stream.read_exact(&mut payload).await?;
stream.write_all(&payload).await?;
stream.flush().await?;
Ok(stream)
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use libp2p_core::{
multiaddr::multiaddr,
transport::{memory::MemoryTransport, Transport},
};
use rand::{thread_rng, Rng};
use std::time::Duration;
#[test]
fn ping_pong() {
let mem_addr = multiaddr![Memory(thread_rng().gen::<u64>())];
let mut transport = MemoryTransport::new().boxed();
transport.listen_on(mem_addr).unwrap();
let listener_addr = transport
.select_next_some()
.now_or_never()
.and_then(|ev| ev.into_new_address())
.expect("MemoryTransport not listening on an address!");
async_std::task::spawn(async move {
let transport_event = transport.next().await.unwrap();
let (listener_upgrade, _) = transport_event.into_incoming().unwrap();
let conn = listener_upgrade.await.unwrap();
recv_ping(conn).await.unwrap();
});
async_std::task::block_on(async move {
let c = MemoryTransport::new()
.dial(listener_addr)
.unwrap()
.await
.unwrap();
let (_, rtt) = send_ping(c).await.unwrap();
assert!(rtt > Duration::from_secs(0));
});
}
}