jsonrpc_tcp_server/
server.rs

1use std::io;
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use tower_service::Service as _;
7
8use crate::futures::{self, future};
9use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
10use crate::server_utils::tokio_stream::wrappers::TcpListenerStream;
11use crate::server_utils::{codecs, reactor, tokio, tokio_util::codec::Framed, SuspendableStream};
12
13use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels};
14use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
15use crate::service::Service;
16
17/// TCP server builder
18pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
19	executor: reactor::UninitializedExecutor,
20	handler: Arc<MetaIoHandler<M, S>>,
21	meta_extractor: Arc<dyn MetaExtractor<M>>,
22	channels: Arc<SenderChannels>,
23	incoming_separator: codecs::Separator,
24	outgoing_separator: codecs::Separator,
25}
26
27impl<M: Metadata + Default, S: Middleware<M> + 'static> ServerBuilder<M, S>
28where
29	S::Future: Unpin,
30	S::CallFuture: Unpin,
31{
32	/// Creates new `ServerBuilder` wih given `IoHandler`
33	pub fn new<T>(handler: T) -> Self
34	where
35		T: Into<MetaIoHandler<M, S>>,
36	{
37		Self::with_meta_extractor(handler, NoopExtractor)
38	}
39}
40
41impl<M: Metadata, S: Middleware<M> + 'static> ServerBuilder<M, S>
42where
43	S::Future: Unpin,
44	S::CallFuture: Unpin,
45{
46	/// Creates new `ServerBuilder` wih given `IoHandler`
47	pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self
48	where
49		T: Into<MetaIoHandler<M, S>>,
50		E: MetaExtractor<M> + 'static,
51	{
52		ServerBuilder {
53			executor: reactor::UninitializedExecutor::Unspawned,
54			handler: Arc::new(handler.into()),
55			meta_extractor: Arc::new(extractor),
56			channels: Default::default(),
57			incoming_separator: Default::default(),
58			outgoing_separator: Default::default(),
59		}
60	}
61
62	/// Utilize existing event loop executor.
63	pub fn event_loop_executor(mut self, handle: reactor::TaskExecutor) -> Self {
64		self.executor = reactor::UninitializedExecutor::Shared(handle);
65		self
66	}
67
68	/// Sets session meta extractor
69	pub fn session_meta_extractor<T: MetaExtractor<M> + 'static>(mut self, meta_extractor: T) -> Self {
70		self.meta_extractor = Arc::new(meta_extractor);
71		self
72	}
73
74	/// Sets the incoming and outgoing requests separator
75	pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
76		self.incoming_separator = incoming;
77		self.outgoing_separator = outgoing;
78		self
79	}
80
81	/// Starts a new server
82	pub fn start(self, addr: &SocketAddr) -> io::Result<Server> {
83		let meta_extractor = self.meta_extractor.clone();
84		let rpc_handler = self.handler.clone();
85		let channels = self.channels.clone();
86		let incoming_separator = self.incoming_separator;
87		let outgoing_separator = self.outgoing_separator;
88		let address = addr.to_owned();
89		let (tx, rx) = std::sync::mpsc::channel();
90		let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
91
92		let executor = self.executor.initialize()?;
93
94		use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt};
95		executor.executor().spawn(async move {
96			let start = async {
97				let listener = tokio::net::TcpListener::bind(&address).await?;
98				let listener = TcpListenerStream::new(listener);
99				let connections = SuspendableStream::new(listener);
100
101				let server = connections.map(|socket| {
102					let peer_addr = match socket.peer_addr() {
103						Ok(addr) => addr,
104						Err(e) => {
105							warn!(target: "tcp", "Unable to determine socket peer address, ignoring connection {}", e);
106							return future::Either::Left(async { io::Result::Ok(()) });
107						}
108					};
109					trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr);
110					let (sender, receiver) = futures::channel::mpsc::unbounded();
111
112					let context = RequestContext {
113						peer_addr,
114						sender: sender.clone(),
115					};
116
117					let meta = meta_extractor.extract(&context);
118					let mut service = Service::new(peer_addr, rpc_handler.clone(), meta);
119					let (mut writer, reader) = Framed::new(
120						socket,
121						codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone()),
122					)
123					.split();
124
125					// Work around https://github.com/rust-lang/rust/issues/64552 by boxing the stream type
126					let responses: Pin<Box<dyn futures::Stream<Item = io::Result<String>> + Send>> =
127						Box::pin(reader.and_then(move |req| {
128							service.call(req).then(|response| match response {
129								Err(e) => {
130									warn!(target: "tcp", "Error while processing request: {:?}", e);
131									future::ok(String::new())
132								}
133								Ok(None) => {
134									trace!(target: "tcp", "JSON RPC request produced no response");
135									future::ok(String::new())
136								}
137								Ok(Some(response_data)) => {
138									trace!(target: "tcp", "Sent response: {}", &response_data);
139									future::ok(response_data)
140								}
141							})
142						}));
143
144					let mut peer_message_queue = {
145						let mut channels = channels.lock();
146						channels.insert(peer_addr, sender);
147
148						PeerMessageQueue::new(responses, receiver, peer_addr)
149					};
150
151					let shared_channels = channels.clone();
152					let writer = async move {
153						writer.send_all(&mut peer_message_queue).await?;
154						trace!(target: "tcp", "Peer {}: service finished", peer_addr);
155						let mut channels = shared_channels.lock();
156						channels.remove(&peer_addr);
157						Ok(())
158					};
159
160					future::Either::Right(writer)
161				});
162
163				Ok(server)
164			};
165
166			match start.await {
167				Ok(server) => {
168					tx.send(Ok(())).expect("Rx is blocking parent thread.");
169					let server = server.buffer_unordered(1024).for_each(|_| async {});
170
171					future::select(Box::pin(server), stop_rx).await;
172				}
173				Err(e) => {
174					tx.send(Err(e)).expect("Rx is blocking parent thread.");
175					let _ = stop_rx.await;
176				}
177			}
178		});
179
180		let res = rx.recv().expect("Response is always sent before tx is dropped.");
181
182		res.map(|_| Server {
183			executor: Some(executor),
184			stop: Some(stop_tx),
185		})
186	}
187
188	/// Returns dispatcher
189	pub fn dispatcher(&self) -> Dispatcher {
190		Dispatcher::new(self.channels.clone())
191	}
192}
193
194/// TCP Server handle
195pub struct Server {
196	executor: Option<reactor::Executor>,
197	stop: Option<futures::channel::oneshot::Sender<()>>,
198}
199
200impl Server {
201	/// Closes the server (waits for finish)
202	pub fn close(mut self) {
203		let _ = self.stop.take().map(|sg| sg.send(()));
204		self.executor.take().unwrap().close();
205	}
206
207	/// Wait for the server to finish
208	pub fn wait(mut self) {
209		self.executor.take().unwrap().wait();
210	}
211}
212
213impl Drop for Server {
214	fn drop(&mut self) {
215		let _ = self.stop.take().map(|sg| sg.send(()));
216		if let Some(executor) = self.executor.take() {
217			executor.close()
218		}
219	}
220}
221
222#[cfg(test)]
223mod tests {
224	use super::*;
225
226	#[test]
227	fn server_is_send_and_sync() {
228		fn is_send_and_sync<T: Send + Sync>() {}
229
230		is_send_and_sync::<Server>();
231	}
232}