jsonrpc_ipc_server/
server.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use crate::jsonrpc::futures::channel::mpsc;
7use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
8use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
9use crate::select_with_weak::SelectWithWeakExt;
10use futures::channel::oneshot;
11use futures::StreamExt;
12use parity_tokio_ipc::Endpoint;
13use parking_lot::Mutex;
14use tower_service::Service as _;
15
16use crate::server_utils::{codecs, reactor, reactor::TaskExecutor, session, tokio_util};
17
18pub use parity_tokio_ipc::SecurityAttributes;
19
20/// IPC server session
21pub struct Service<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
22	handler: Arc<MetaIoHandler<M, S>>,
23	meta: M,
24}
25
26impl<M: Metadata, S: Middleware<M>> Service<M, S> {
27	/// Create new IPC server session with given handler and metadata.
28	pub fn new(handler: Arc<MetaIoHandler<M, S>>, meta: M) -> Self {
29		Service { handler, meta }
30	}
31}
32
33impl<M: Metadata, S: Middleware<M>> tower_service::Service<String> for Service<M, S>
34where
35	S::Future: Unpin,
36	S::CallFuture: Unpin,
37{
38	type Response = Option<String>;
39	type Error = ();
40
41	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
42
43	fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
44		Poll::Ready(Ok(()))
45	}
46
47	fn call(&mut self, req: String) -> Self::Future {
48		use futures::FutureExt;
49		trace!(target: "ipc", "Received request: {}", req);
50		Box::pin(self.handler.handle_request(&req, self.meta.clone()).map(Ok))
51	}
52}
53
54/// IPC server builder
55pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
56	handler: Arc<MetaIoHandler<M, S>>,
57	meta_extractor: Arc<dyn MetaExtractor<M>>,
58	session_stats: Option<Arc<dyn session::SessionStats>>,
59	executor: reactor::UninitializedExecutor,
60	incoming_separator: codecs::Separator,
61	outgoing_separator: codecs::Separator,
62	security_attributes: SecurityAttributes,
63	client_buffer_size: usize,
64}
65
66impl<M: Metadata + Default, S: Middleware<M>> ServerBuilder<M, S>
67where
68	S::Future: Unpin,
69	S::CallFuture: Unpin,
70{
71	/// Creates new IPC server build given the `IoHandler`.
72	pub fn new<T>(io_handler: T) -> ServerBuilder<M, S>
73	where
74		T: Into<MetaIoHandler<M, S>>,
75	{
76		Self::with_meta_extractor(io_handler, NoopExtractor)
77	}
78}
79
80impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S>
81where
82	S::Future: Unpin,
83	S::CallFuture: Unpin,
84{
85	/// Creates new IPC server build given the `IoHandler` and metadata extractor.
86	pub fn with_meta_extractor<T, E>(io_handler: T, extractor: E) -> ServerBuilder<M, S>
87	where
88		T: Into<MetaIoHandler<M, S>>,
89		E: MetaExtractor<M>,
90	{
91		ServerBuilder {
92			handler: Arc::new(io_handler.into()),
93			meta_extractor: Arc::new(extractor),
94			session_stats: None,
95			executor: reactor::UninitializedExecutor::Unspawned,
96			incoming_separator: codecs::Separator::Empty,
97			outgoing_separator: codecs::Separator::default(),
98			security_attributes: SecurityAttributes::empty(),
99			client_buffer_size: 5,
100		}
101	}
102
103	/// Sets shared different event loop executor.
104	pub fn event_loop_executor(mut self, executor: TaskExecutor) -> Self {
105		self.executor = reactor::UninitializedExecutor::Shared(executor);
106		self
107	}
108
109	/// Sets session metadata extractor.
110	pub fn session_meta_extractor<X>(mut self, meta_extractor: X) -> Self
111	where
112		X: MetaExtractor<M>,
113	{
114		self.meta_extractor = Arc::new(meta_extractor);
115		self
116	}
117
118	/// Session stats
119	pub fn session_stats<T: session::SessionStats>(mut self, stats: T) -> Self {
120		self.session_stats = Some(Arc::new(stats));
121		self
122	}
123
124	/// Sets the incoming and outgoing requests separator
125	pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
126		self.incoming_separator = incoming;
127		self.outgoing_separator = outgoing;
128		self
129	}
130
131	/// Sets the security attributes for the underlying IPC socket/pipe
132	pub fn set_security_attributes(mut self, attr: SecurityAttributes) -> Self {
133		self.security_attributes = attr;
134		self
135	}
136
137	/// Sets how many concurrent requests per client can be processed at any one time. Set to 5 by default.
138	pub fn set_client_buffer_size(mut self, buffer_size: usize) -> Self {
139		self.client_buffer_size = buffer_size;
140		self
141	}
142
143	/// Creates a new server from the given endpoint.
144	pub fn start(self, path: &str) -> std::io::Result<Server> {
145		let executor = self.executor.initialize()?;
146		let rpc_handler = self.handler;
147		let endpoint_addr = path.to_owned();
148		let meta_extractor = self.meta_extractor;
149		let session_stats = self.session_stats;
150		let incoming_separator = self.incoming_separator;
151		let outgoing_separator = self.outgoing_separator;
152		let (stop_signal, stop_receiver) = oneshot::channel();
153		// NOTE: These channels are only waited upon in synchronous fashion
154		let (start_signal, start_receiver) = std::sync::mpsc::channel();
155		let (wait_signal, wait_receiver) = std::sync::mpsc::channel();
156		let security_attributes = self.security_attributes;
157		let client_buffer_size = self.client_buffer_size;
158
159		let fut = async move {
160			let mut endpoint = Endpoint::new(endpoint_addr);
161			endpoint.set_security_attributes(security_attributes);
162
163			if cfg!(unix) {
164				// warn about existing file and remove it
165				if ::std::fs::remove_file(endpoint.path()).is_ok() {
166					warn!("Removed existing file '{}'.", endpoint.path());
167				}
168			}
169
170			let endpoint_addr = endpoint.path().to_owned();
171			let connections = match endpoint.incoming() {
172				Ok(connections) => connections,
173				Err(e) => {
174					start_signal
175						.send(Err(e))
176						.expect("Cannot fail since receiver never dropped before receiving");
177					return;
178				}
179			};
180
181			let mut id = 0u64;
182
183			use futures::TryStreamExt;
184			let server = connections.map_ok(move |io_stream| {
185				id = id.wrapping_add(1);
186				let session_id = id;
187				let session_stats = session_stats.clone();
188				trace!(target: "ipc", "Accepted incoming IPC connection: {}", session_id);
189				if let Some(stats) = session_stats.as_ref() {
190					stats.open_session(session_id)
191				}
192
193				let (sender, receiver) = mpsc::unbounded();
194				let meta = meta_extractor.extract(&RequestContext {
195					endpoint_addr: endpoint_addr.as_ref(),
196					session_id,
197					sender,
198				});
199				let mut service = Service::new(rpc_handler.clone(), meta);
200				let codec = codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone());
201				let framed = tokio_util::codec::Decoder::framed(codec, io_stream);
202				let (writer, reader) = futures::StreamExt::split(framed);
203
204				let responses = reader
205					.map_ok(move |req| {
206						service
207							.call(req)
208							// Ignore service errors
209							.map(|x| Ok(x.ok().flatten()))
210					})
211					.try_buffer_unordered(client_buffer_size)
212					// Filter out previously ignored service errors as `None`s
213					.try_filter_map(futures::future::ok)
214					// we use `select_with_weak` here, instead of `select`, to close the stream
215					// as soon as the ipc pipe is closed
216					.select_with_weak(receiver.map(Ok));
217
218				responses.forward(writer).then(move |_| {
219					trace!(target: "ipc", "Peer: service finished");
220					if let Some(stats) = session_stats.as_ref() {
221						stats.close_session(session_id)
222					}
223
224					async { Ok(()) }
225				})
226			});
227			start_signal
228				.send(Ok(()))
229				.expect("Cannot fail since receiver never dropped before receiving");
230			let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted);
231			let stop = Box::pin(stop);
232
233			let server = server.try_buffer_unordered(1024).for_each(|_| async {});
234
235			let result = futures::future::select(Box::pin(server), stop).await;
236			// We drop the server first to prevent a situation where main thread terminates
237			// before the server is properly dropped (see #504 for more details)
238			drop(result);
239			let _ = wait_signal.send(());
240		};
241
242		use futures::FutureExt;
243		let fut = Box::pin(fut.map(drop));
244		executor.executor().spawn(fut);
245
246		let handle = InnerHandles {
247			executor: Some(executor),
248			stop: Some(stop_signal),
249			path: path.to_owned(),
250		};
251
252		use futures::TryFutureExt;
253		match start_receiver.recv().expect("Message should always be sent") {
254			Ok(()) => Ok(Server {
255				handles: Arc::new(Mutex::new(handle)),
256				wait_handle: Some(wait_receiver),
257			}),
258			Err(e) => Err(e),
259		}
260	}
261}
262
263/// IPC Server handle
264#[derive(Debug)]
265pub struct Server {
266	handles: Arc<Mutex<InnerHandles>>,
267	wait_handle: Option<std::sync::mpsc::Receiver<()>>,
268}
269
270impl Server {
271	/// Closes the server (waits for finish)
272	pub fn close(self) {
273		self.handles.lock().close();
274	}
275
276	/// Creates a close handle that can be used to stop the server remotely
277	pub fn close_handle(&self) -> CloseHandle {
278		CloseHandle {
279			inner: self.handles.clone(),
280		}
281	}
282
283	/// Wait for the server to finish
284	pub fn wait(mut self) {
285		if let Some(wait_receiver) = self.wait_handle.take() {
286			let _ = wait_receiver.recv();
287		}
288	}
289}
290
291#[derive(Debug)]
292struct InnerHandles {
293	executor: Option<reactor::Executor>,
294	stop: Option<oneshot::Sender<()>>,
295	path: String,
296}
297
298impl InnerHandles {
299	pub fn close(&mut self) {
300		let _ = self.stop.take().map(|stop| stop.send(()));
301		if let Some(executor) = self.executor.take() {
302			executor.close()
303		}
304		let _ = ::std::fs::remove_file(&self.path); // ignore error, file could have been gone somewhere
305	}
306}
307
308impl Drop for InnerHandles {
309	fn drop(&mut self) {
310		self.close();
311	}
312}
313/// `CloseHandle` allows one to stop an `IpcServer` remotely.
314#[derive(Clone)]
315pub struct CloseHandle {
316	inner: Arc<Mutex<InnerHandles>>,
317}
318
319impl CloseHandle {
320	/// `close` closes the corresponding `IpcServer` instance.
321	pub fn close(self) {
322		self.inner.lock().close();
323	}
324}
325
326#[cfg(test)]
327#[cfg(not(windows))]
328mod tests {
329	use super::*;
330
331	use jsonrpc_core::Value;
332	use std::os::unix::net::UnixStream;
333	use std::thread;
334	use std::time::{self, Duration};
335
336	fn server_builder() -> ServerBuilder {
337		let mut io = MetaIoHandler::<()>::default();
338		io.add_sync_method("say_hello", |_params| Ok(Value::String("hello".to_string())));
339		ServerBuilder::new(io)
340	}
341
342	fn run(path: &str) -> Server {
343		let builder = server_builder();
344		let server = builder.start(path).expect("Server must run with no issues");
345		server
346	}
347
348	fn dummy_request_str(path: &str, data: &str) -> String {
349		use futures::SinkExt;
350
351		let reply = async move {
352			use tokio::net::UnixStream;
353
354			let stream: UnixStream = UnixStream::connect(path).await?;
355			let codec = codecs::StreamCodec::stream_incoming();
356			let mut stream = tokio_util::codec::Decoder::framed(codec, stream);
357			stream.send(data.to_owned()).await?;
358			let (reply, _) = stream.into_future().await;
359
360			reply.expect("there should be one reply")
361		};
362
363		let rt = tokio::runtime::Runtime::new().unwrap();
364		rt.block_on(reply).expect("wait for reply")
365	}
366
367	#[test]
368	fn start() {
369		crate::logger::init_log();
370
371		let mut io = MetaIoHandler::<()>::default();
372		io.add_sync_method("say_hello", |_params| Ok(Value::String("hello".to_string())));
373		let server = ServerBuilder::new(io);
374
375		let _server = server
376			.start("/tmp/test-ipc-20000")
377			.expect("Server must run with no issues");
378	}
379
380	#[test]
381	fn connect() {
382		crate::logger::init_log();
383		let path = "/tmp/test-ipc-30000";
384		let _server = run(path);
385
386		UnixStream::connect(path).expect("Socket should connect");
387	}
388
389	#[test]
390	fn request() {
391		crate::logger::init_log();
392		let path = "/tmp/test-ipc-40000";
393		let server = run(path);
394		let (stop_signal, stop_receiver) = std::sync::mpsc::channel();
395
396		let t = thread::spawn(move || {
397			let result = dummy_request_str(
398				path,
399				"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
400			);
401			stop_signal.send(result).unwrap();
402		});
403		t.join().unwrap();
404
405		let result = stop_receiver.recv().unwrap();
406
407		assert_eq!(
408			result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
409			"Response does not exactly match the expected response",
410		);
411		server.close();
412	}
413
414	#[test]
415	fn req_parallel() {
416		crate::logger::init_log();
417		let path = "/tmp/test-ipc-45000";
418		let server = run(path);
419		let (stop_signal, stop_receiver) = futures::channel::mpsc::channel(400);
420
421		let mut handles = Vec::new();
422		for _ in 0..4 {
423			let path = path.clone();
424			let mut stop_signal = stop_signal.clone();
425			handles.push(thread::spawn(move || {
426				for _ in 0..100 {
427					let result = dummy_request_str(
428						&path,
429						"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
430					);
431					stop_signal.try_send(result).unwrap();
432				}
433			}));
434		}
435
436		for handle in handles.drain(..) {
437			handle.join().unwrap();
438		}
439
440		thread::spawn(move || {
441			let fut = stop_receiver
442				.map(|result| {
443					assert_eq!(
444						result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
445						"Response does not exactly match the expected response",
446					);
447				})
448				.take(400)
449				.for_each(|_| async {});
450			futures::executor::block_on(fut);
451		})
452		.join()
453		.unwrap();
454		server.close();
455	}
456
457	#[test]
458	fn close() {
459		crate::logger::init_log();
460		let path = "/tmp/test-ipc-50000";
461		let server = run(path);
462		server.close();
463
464		assert!(
465			::std::fs::metadata(path).is_err(),
466			"There should be no socket file left"
467		);
468		assert!(
469			UnixStream::connect(path).is_err(),
470			"Connection to the closed socket should fail"
471		);
472	}
473
474	fn huge_response_test_str() -> String {
475		let mut result = String::from("begin_hello");
476		result.push_str("begin_hello");
477		for _ in 0..16384 {
478			result.push(' ');
479		}
480		result.push_str("end_hello");
481		result
482	}
483
484	fn huge_response_test_json() -> String {
485		let mut result = String::from("{\"jsonrpc\":\"2.0\",\"result\":\"");
486		result.push_str(&huge_response_test_str());
487		result.push_str("\",\"id\":1}");
488
489		result
490	}
491
492	#[test]
493	fn test_huge_response() {
494		crate::logger::init_log();
495		let path = "/tmp/test-ipc-60000";
496
497		let mut io = MetaIoHandler::<()>::default();
498		io.add_sync_method("say_huge_hello", |_params| Ok(Value::String(huge_response_test_str())));
499		let builder = ServerBuilder::new(io);
500
501		let server = builder.start(path).expect("Server must run with no issues");
502		let (stop_signal, stop_receiver) = oneshot::channel();
503
504		let t = thread::spawn(move || {
505			let result = dummy_request_str(
506				&path,
507				"{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
508			);
509
510			stop_signal.send(result).unwrap();
511		});
512		t.join().unwrap();
513
514		thread::spawn(move || {
515			futures::executor::block_on(async move {
516				let result = stop_receiver.await.unwrap();
517				assert_eq!(
518					result,
519					huge_response_test_json(),
520					"Response does not exactly match the expected response",
521				);
522				server.close();
523			});
524		})
525		.join()
526		.unwrap();
527	}
528
529	#[test]
530	fn test_session_end() {
531		struct SessionEndMeta {
532			drop_signal: Option<oneshot::Sender<()>>,
533		}
534
535		impl Drop for SessionEndMeta {
536			fn drop(&mut self) {
537				trace!(target: "ipc", "Dropping session meta");
538				self.drop_signal.take().unwrap().send(()).unwrap()
539			}
540		}
541
542		struct SessionEndExtractor {
543			drop_receivers: Arc<Mutex<futures::channel::mpsc::Sender<oneshot::Receiver<()>>>>,
544		}
545
546		impl MetaExtractor<Arc<SessionEndMeta>> for SessionEndExtractor {
547			fn extract(&self, _context: &RequestContext) -> Arc<SessionEndMeta> {
548				let (signal, receiver) = oneshot::channel();
549				self.drop_receivers.lock().try_send(receiver).unwrap();
550				let meta = SessionEndMeta {
551					drop_signal: Some(signal),
552				};
553				Arc::new(meta)
554			}
555		}
556
557		crate::logger::init_log();
558		let path = "/tmp/test-ipc-30009";
559		let (signal, receiver) = futures::channel::mpsc::channel(16);
560		let session_metadata_extractor = SessionEndExtractor {
561			drop_receivers: Arc::new(Mutex::new(signal)),
562		};
563
564		let io = MetaIoHandler::<Arc<SessionEndMeta>>::default();
565		let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor);
566		let server = builder.start(path).expect("Server must run with no issues");
567		{
568			let _ = UnixStream::connect(path).expect("Socket should connect");
569		}
570
571		thread::spawn(move || {
572			futures::executor::block_on(async move {
573				let (drop_receiver, ..) = receiver.into_future().await;
574				drop_receiver.unwrap().await.unwrap();
575			});
576		})
577		.join()
578		.unwrap();
579		server.close();
580	}
581
582	#[test]
583	fn close_handle() {
584		crate::logger::init_log();
585		let path = "/tmp/test-ipc-90000";
586		let server = run(path);
587		let handle = server.close_handle();
588		handle.close();
589		assert!(
590			UnixStream::connect(path).is_err(),
591			"Connection to the closed socket should fail"
592		);
593	}
594
595	#[test]
596	fn close_when_waiting() {
597		crate::logger::init_log();
598		let path = "/tmp/test-ipc-70000";
599		let server = run(path);
600		let close_handle = server.close_handle();
601		let (tx, rx) = oneshot::channel();
602
603		thread::spawn(move || {
604			thread::sleep(time::Duration::from_millis(100));
605			close_handle.close();
606		});
607		thread::spawn(move || {
608			server.wait();
609			tx.send(true).expect("failed to report that the server has stopped");
610		});
611
612		let rt = tokio::runtime::Runtime::new().unwrap();
613		rt.block_on(async move {
614			let timeout = tokio::time::sleep(Duration::from_millis(500));
615			futures::pin_mut!(timeout);
616
617			match futures::future::select(rx, timeout).await {
618				futures::future::Either::Left((result, _)) => {
619					assert!(result.is_ok(), "Rx failed");
620					assert_eq!(result, Ok(true), "Wait timeout exceeded");
621					assert!(
622						UnixStream::connect(path).is_err(),
623						"Connection to the closed socket should fail"
624					);
625					Ok(())
626				}
627				futures::future::Either::Right(_) => Err("timed out"),
628			}
629		})
630		.unwrap();
631	}
632
633	#[test]
634	fn runs_with_security_attributes() {
635		let path = "/tmp/test-ipc-9001";
636		let io = MetaIoHandler::<Arc<()>>::default();
637		ServerBuilder::with_meta_extractor(io, NoopExtractor)
638			.set_security_attributes(SecurityAttributes::empty())
639			.start(path)
640			.expect("Server must run with no issues");
641	}
642}