jsonrpc_tcp_server/
dispatch.rs1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::Poll;
6
7use crate::futures::{channel::mpsc, Stream};
8
9use parking_lot::Mutex;
10
11pub type SenderChannels = Mutex<HashMap<SocketAddr, mpsc::UnboundedSender<String>>>;
12
13pub struct PeerMessageQueue<S: Stream + Unpin> {
14 up: S,
15 receiver: Option<mpsc::UnboundedReceiver<String>>,
16 _addr: SocketAddr,
17}
18
19impl<S: Stream + Unpin> PeerMessageQueue<S> {
20 pub fn new(response_stream: S, receiver: mpsc::UnboundedReceiver<String>, addr: SocketAddr) -> Self {
21 PeerMessageQueue {
22 up: response_stream,
23 receiver: Some(receiver),
24 _addr: addr,
25 }
26 }
27}
28
29#[derive(Debug)]
31pub enum PushMessageError {
32 NoSuchPeer,
34 Send(mpsc::TrySendError<String>),
36}
37
38impl From<mpsc::TrySendError<String>> for PushMessageError {
39 fn from(send_err: mpsc::TrySendError<String>) -> Self {
40 PushMessageError::Send(send_err)
41 }
42}
43
44#[derive(Clone)]
46pub struct Dispatcher {
47 channels: Arc<SenderChannels>,
48}
49
50impl Dispatcher {
51 pub fn new(channels: Arc<SenderChannels>) -> Self {
53 Dispatcher { channels }
54 }
55
56 pub fn push_message(&self, peer_addr: &SocketAddr, msg: String) -> Result<(), PushMessageError> {
58 let mut channels = self.channels.lock();
59
60 match channels.get_mut(peer_addr) {
61 Some(channel) => {
62 channel.unbounded_send(msg).map_err(PushMessageError::from)?;
63 Ok(())
64 }
65 None => Err(PushMessageError::NoSuchPeer),
66 }
67 }
68
69 pub fn is_connected(&self, socket_addr: &SocketAddr) -> bool {
71 self.channels.lock().contains_key(socket_addr)
72 }
73
74 pub fn peer_count(&self) -> usize {
76 self.channels.lock().len()
77 }
78}
79
80impl<S: Stream<Item = std::io::Result<String>> + Unpin> Stream for PeerMessageQueue<S> {
81 type Item = std::io::Result<String>;
82
83 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
92 let this = Pin::into_inner(self);
94
95 let up_closed = match Pin::new(&mut this.up).poll_next(cx) {
96 Poll::Ready(Some(Ok(item))) => return Poll::Ready(Some(Ok(item))),
97 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
98 Poll::Ready(None) => true,
99 Poll::Pending => false,
100 };
101
102 let mut rx = match &mut this.receiver {
103 None => {
104 debug_assert!(up_closed);
105 return Poll::Ready(None);
106 }
107 Some(rx) => rx,
108 };
109
110 match Pin::new(&mut rx).poll_next(cx) {
111 Poll::Ready(Some(item)) => Poll::Ready(Some(Ok(item))),
112 Poll::Ready(None) | Poll::Pending if up_closed => {
113 this.receiver = None;
114 Poll::Ready(None)
115 }
116 Poll::Ready(None) | Poll::Pending => Poll::Pending,
117 }
118 }
119}