jsonrpc_tcp_server/
dispatch.rsuse std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use crate::futures::{channel::mpsc, Stream};
use parking_lot::Mutex;
pub type SenderChannels = Mutex<HashMap<SocketAddr, mpsc::UnboundedSender<String>>>;
pub struct PeerMessageQueue<S: Stream + Unpin> {
up: S,
receiver: Option<mpsc::UnboundedReceiver<String>>,
_addr: SocketAddr,
}
impl<S: Stream + Unpin> PeerMessageQueue<S> {
pub fn new(response_stream: S, receiver: mpsc::UnboundedReceiver<String>, addr: SocketAddr) -> Self {
PeerMessageQueue {
up: response_stream,
receiver: Some(receiver),
_addr: addr,
}
}
}
#[derive(Debug)]
pub enum PushMessageError {
NoSuchPeer,
Send(mpsc::TrySendError<String>),
}
impl From<mpsc::TrySendError<String>> for PushMessageError {
fn from(send_err: mpsc::TrySendError<String>) -> Self {
PushMessageError::Send(send_err)
}
}
#[derive(Clone)]
pub struct Dispatcher {
channels: Arc<SenderChannels>,
}
impl Dispatcher {
pub fn new(channels: Arc<SenderChannels>) -> Self {
Dispatcher { channels }
}
pub fn push_message(&self, peer_addr: &SocketAddr, msg: String) -> Result<(), PushMessageError> {
let mut channels = self.channels.lock();
match channels.get_mut(peer_addr) {
Some(channel) => {
channel.unbounded_send(msg).map_err(PushMessageError::from)?;
Ok(())
}
None => Err(PushMessageError::NoSuchPeer),
}
}
pub fn is_connected(&self, socket_addr: &SocketAddr) -> bool {
self.channels.lock().contains_key(socket_addr)
}
pub fn peer_count(&self) -> usize {
self.channels.lock().len()
}
}
impl<S: Stream<Item = std::io::Result<String>> + Unpin> Stream for PeerMessageQueue<S> {
type Item = std::io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
let up_closed = match Pin::new(&mut this.up).poll_next(cx) {
Poll::Ready(Some(Ok(item))) => return Poll::Ready(Some(Ok(item))),
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
Poll::Ready(None) => true,
Poll::Pending => false,
};
let mut rx = match &mut this.receiver {
None => {
debug_assert!(up_closed);
return Poll::Ready(None);
}
Some(rx) => rx,
};
match Pin::new(&mut rx).poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(Ok(item))),
Poll::Ready(None) | Poll::Pending if up_closed => {
this.receiver = None;
Poll::Ready(None)
}
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
}
}