jsonrpc_tcp_server/
dispatch.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::Poll;
6
7use crate::futures::{channel::mpsc, Stream};
8
9use parking_lot::Mutex;
10
11pub type SenderChannels = Mutex<HashMap<SocketAddr, mpsc::UnboundedSender<String>>>;
12
13pub struct PeerMessageQueue<S: Stream + Unpin> {
14	up: S,
15	receiver: Option<mpsc::UnboundedReceiver<String>>,
16	_addr: SocketAddr,
17}
18
19impl<S: Stream + Unpin> PeerMessageQueue<S> {
20	pub fn new(response_stream: S, receiver: mpsc::UnboundedReceiver<String>, addr: SocketAddr) -> Self {
21		PeerMessageQueue {
22			up: response_stream,
23			receiver: Some(receiver),
24			_addr: addr,
25		}
26	}
27}
28
29/// Push Message Error
30#[derive(Debug)]
31pub enum PushMessageError {
32	/// Invalid peer
33	NoSuchPeer,
34	/// Send error
35	Send(mpsc::TrySendError<String>),
36}
37
38impl From<mpsc::TrySendError<String>> for PushMessageError {
39	fn from(send_err: mpsc::TrySendError<String>) -> Self {
40		PushMessageError::Send(send_err)
41	}
42}
43
44/// Peer-messages dispatcher.
45#[derive(Clone)]
46pub struct Dispatcher {
47	channels: Arc<SenderChannels>,
48}
49
50impl Dispatcher {
51	/// Creates a new dispatcher
52	pub fn new(channels: Arc<SenderChannels>) -> Self {
53		Dispatcher { channels }
54	}
55
56	/// Pushes message to given peer
57	pub fn push_message(&self, peer_addr: &SocketAddr, msg: String) -> Result<(), PushMessageError> {
58		let mut channels = self.channels.lock();
59
60		match channels.get_mut(peer_addr) {
61			Some(channel) => {
62				channel.unbounded_send(msg).map_err(PushMessageError::from)?;
63				Ok(())
64			}
65			None => Err(PushMessageError::NoSuchPeer),
66		}
67	}
68
69	/// Returns `true` if the peer is still connnected
70	pub fn is_connected(&self, socket_addr: &SocketAddr) -> bool {
71		self.channels.lock().contains_key(socket_addr)
72	}
73
74	/// Returns current peer count.
75	pub fn peer_count(&self) -> usize {
76		self.channels.lock().len()
77	}
78}
79
80impl<S: Stream<Item = std::io::Result<String>> + Unpin> Stream for PeerMessageQueue<S> {
81	type Item = std::io::Result<String>;
82
83	// The receiver will never return `Ok(Async::Ready(None))`
84	// Because the sender is kept in `SenderChannels` and it will never be dropped until `the stream` is resolved.
85	//
86	// Thus, that is the reason we terminate if `up_closed && receiver == Async::NotReady`.
87	//
88	// However, it is possible to have a race between `poll` and `push_work` if the connection is dropped.
89	// Therefore, the receiver is then dropped when the connection is dropped and an error is propagated when
90	// a `send` attempt is made on that channel.
91	fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
92		// check if we have response pending
93		let this = Pin::into_inner(self);
94
95		let up_closed = match Pin::new(&mut this.up).poll_next(cx) {
96			Poll::Ready(Some(Ok(item))) => return Poll::Ready(Some(Ok(item))),
97			Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
98			Poll::Ready(None) => true,
99			Poll::Pending => false,
100		};
101
102		let mut rx = match &mut this.receiver {
103			None => {
104				debug_assert!(up_closed);
105				return Poll::Ready(None);
106			}
107			Some(rx) => rx,
108		};
109
110		match Pin::new(&mut rx).poll_next(cx) {
111			Poll::Ready(Some(item)) => Poll::Ready(Some(Ok(item))),
112			Poll::Ready(None) | Poll::Pending if up_closed => {
113				this.receiver = None;
114				Poll::Ready(None)
115			}
116			Poll::Ready(None) | Poll::Pending => Poll::Pending,
117		}
118	}
119}