alloy_transport_ipc/
lib.rs#![doc = include_str!("../README.md")]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/alloy-rs/core/main/assets/alloy.jpg",
html_favicon_url = "https://raw.githubusercontent.com/alloy-rs/core/main/assets/favicon.ico"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#[macro_use]
extern crate tracing;
use bytes::{Buf, BytesMut};
use futures::{ready, StreamExt};
use interprocess::local_socket::{tokio::prelude::*, Name};
use std::task::Poll::Ready;
use tokio::{
io::{AsyncRead, AsyncWriteExt},
select,
};
use tokio_util::io::poll_read_buf;
mod connect;
pub use connect::IpcConnect;
#[cfg(feature = "mock")]
pub mod mock;
#[cfg(feature = "mock")]
pub use mock::MockIpcServer;
type Result<T> = std::result::Result<T, std::io::Error>;
struct IpcBackend {
pub(crate) stream: LocalSocketStream,
pub(crate) interface: alloy_pubsub::ConnectionInterface,
}
impl IpcBackend {
async fn connect(name: Name<'_>) -> Result<alloy_pubsub::ConnectionHandle> {
let stream = LocalSocketStream::connect(name).await?;
let (handle, interface) = alloy_pubsub::ConnectionHandle::new();
let backend = Self { stream, interface };
backend.spawn();
Ok(handle)
}
fn spawn(mut self) {
let fut = async move {
let (read, mut writer) = self.stream.split();
let mut read = ReadJsonStream::new(read).fuse();
let err = loop {
select! {
biased;
item = self.interface.recv_from_frontend() => {
match item {
Some(msg) => {
let bytes = msg.get();
if let Err(err) = writer.write_all(bytes.as_bytes()).await {
error!(%err, "Failed to write to IPC socket");
break true;
}
},
None => {
debug!("Frontend has gone away");
break false;
},
}
}
item = read.next() => {
match item {
Some(item) => {
if self.interface.send_to_frontend(item).is_err() {
debug!("Frontend has gone away");
break false;
}
}
None => {
error!("Read stream has failed.");
break true;
}
}
}
}
};
if err {
self.interface.close_with_error();
}
};
tokio::spawn(fut);
}
}
const CAPACITY: usize = 4096;
#[derive(Debug)]
#[pin_project::pin_project]
pub struct ReadJsonStream<T> {
#[pin]
reader: T,
buf: BytesMut,
drained: bool,
}
impl<T: AsyncRead> ReadJsonStream<T> {
fn new(reader: T) -> Self {
Self { reader, buf: BytesMut::with_capacity(CAPACITY), drained: true }
}
}
impl<T: AsyncRead> From<T> for ReadJsonStream<T> {
fn from(reader: T) -> Self {
Self::new(reader)
}
}
impl<T: AsyncRead> futures::stream::Stream for ReadJsonStream<T> {
type Item = alloy_json_rpc::PubSubItem;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if !*this.drained {
debug!(buf_len = this.buf.len(), "Deserializing buffered IPC data");
let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter();
let item = de.next();
this.buf.advance(de.byte_offset());
match item {
Some(Ok(response)) => {
return Ready(Some(response));
}
Some(Err(err)) => {
if err.is_data() {
trace!(
buffer = %String::from_utf8_lossy(this.buf.as_ref()),
"IPC buffer contains invalid JSON data",
);
*this.drained = true;
} else if err.is_eof() {
trace!("partial object in IPC buffer");
*this.drained = true;
} else {
error!(%err, "IPC response contained invalid JSON. Buffer contents will be logged at trace level");
trace!(
buffer = %String::from_utf8_lossy(this.buf.as_ref()),
"IPC response contained invalid JSON. NOTE: Buffer contents do not include invalid utf8.",
);
return Ready(None);
}
}
None => {
*this.drained = true;
}
}
}
match ready!(poll_read_buf(this.reader.as_mut(), cx, &mut this.buf)) {
Ok(0) => {
debug!("IPC socket EOF, stream is closed");
return Ready(None);
}
Ok(data_len) => {
debug!(%data_len, "Read data from IPC socket");
*this.drained = false;
}
Err(err) => {
error!(%err, "Failed to read from IPC socket, shutting down");
return Ready(None);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::future::poll_fn;
#[tokio::test]
async fn test_partial_stream() {
let mock = tokio_test::io::Builder::new()
.read(b"{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscription\"")
.wait(std::time::Duration::from_millis(1))
.read(r#", "params": {"subscription": "0xcd0c3e8af590364c09d0fa6a1210faf5", "result": {"difficulty": "0xd9263f42a87", "uncles": []}} }"#.as_bytes())
.build();
let mut reader = ReadJsonStream::new(mock);
poll_fn(|cx| {
let res = reader.poll_next_unpin(cx);
assert!(res.is_pending());
Ready(())
})
.await;
let _obj = reader.next().await.unwrap();
}
#[tokio::test]
async fn test_large_invalid() {
let mock = tokio_test::io::Builder::new()
.read(b"{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscription\"")
.wait(std::time::Duration::from_millis(1))
.read(vec![b'a'; CAPACITY].as_ref())
.build();
let mut reader = ReadJsonStream::new(mock);
poll_fn(|cx| {
let res = reader.poll_next_unpin(cx);
assert!(res.is_pending());
Ready(())
})
.await;
let obj = reader.next().await;
assert!(obj.is_none());
}
#[tokio::test]
async fn test_large_valid() {
let header = b"{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0x";
let filling_zeros = header
.iter()
.chain(vec![b'0'; CAPACITY - header.len()].iter())
.copied()
.collect::<Vec<_>>();
let first_page = filling_zeros.as_ref();
let second_page = b"\"}";
let mock = tokio_test::io::Builder::new()
.read(first_page)
.wait(std::time::Duration::from_millis(1))
.read(second_page)
.build();
let mut reader = ReadJsonStream::new(mock);
poll_fn(|cx| {
let res = reader.poll_next_unpin(cx);
assert!(res.is_pending());
Ready(())
})
.await;
let obj = reader.next().await;
assert!(obj.is_some());
}
}