use alloy_json_rpc::PubSubItem;
use serde_json::value::RawValue;
use tokio::sync::{
mpsc,
oneshot::{self, error::TryRecvError},
};
#[derive(Debug)]
pub struct ConnectionHandle {
pub(crate) to_socket: mpsc::UnboundedSender<Box<RawValue>>,
pub(crate) from_socket: mpsc::UnboundedReceiver<PubSubItem>,
pub(crate) error: oneshot::Receiver<()>,
pub(crate) shutdown: oneshot::Sender<()>,
}
impl ConnectionHandle {
pub fn new() -> (Self, ConnectionInterface) {
let (to_socket, from_frontend) = mpsc::unbounded_channel();
let (to_frontend, from_socket) = mpsc::unbounded_channel();
let (error_tx, error_rx) = oneshot::channel();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let handle = Self { to_socket, from_socket, error: error_rx, shutdown: shutdown_tx };
let interface = ConnectionInterface {
from_frontend,
to_frontend,
error: error_tx,
shutdown: shutdown_rx,
};
(handle, interface)
}
pub fn shutdown(self) {
let _ = self.shutdown.send(());
}
}
#[derive(Debug)]
pub struct ConnectionInterface {
pub(crate) from_frontend: mpsc::UnboundedReceiver<Box<RawValue>>,
pub(crate) to_frontend: mpsc::UnboundedSender<PubSubItem>,
pub(crate) error: oneshot::Sender<()>,
pub(crate) shutdown: oneshot::Receiver<()>,
}
impl ConnectionInterface {
pub fn send_to_frontend(
&self,
item: PubSubItem,
) -> Result<(), mpsc::error::SendError<PubSubItem>> {
self.to_frontend.send(item)
}
pub async fn recv_from_frontend(&mut self) -> Option<Box<RawValue>> {
match self.shutdown.try_recv() {
Ok(_) | Err(TryRecvError::Closed) => return None,
Err(TryRecvError::Empty) => {}
}
if self.shutdown.try_recv().is_ok() {
return None;
}
self.from_frontend.recv().await
}
pub fn close_with_error(self) {
let _ = self.error.send(());
}
}