1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use crate::jsonrpc::futures::channel::mpsc;
7use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
8use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
9use crate::select_with_weak::SelectWithWeakExt;
10use futures::channel::oneshot;
11use futures::StreamExt;
12use parity_tokio_ipc::Endpoint;
13use parking_lot::Mutex;
14use tower_service::Service as _;
15
16use crate::server_utils::{codecs, reactor, reactor::TaskExecutor, session, tokio_util};
17
18pub use parity_tokio_ipc::SecurityAttributes;
19
20pub struct Service<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
22 handler: Arc<MetaIoHandler<M, S>>,
23 meta: M,
24}
25
26impl<M: Metadata, S: Middleware<M>> Service<M, S> {
27 pub fn new(handler: Arc<MetaIoHandler<M, S>>, meta: M) -> Self {
29 Service { handler, meta }
30 }
31}
32
33impl<M: Metadata, S: Middleware<M>> tower_service::Service<String> for Service<M, S>
34where
35 S::Future: Unpin,
36 S::CallFuture: Unpin,
37{
38 type Response = Option<String>;
39 type Error = ();
40
41 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
42
43 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
44 Poll::Ready(Ok(()))
45 }
46
47 fn call(&mut self, req: String) -> Self::Future {
48 use futures::FutureExt;
49 trace!(target: "ipc", "Received request: {}", req);
50 Box::pin(self.handler.handle_request(&req, self.meta.clone()).map(Ok))
51 }
52}
53
54pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
56 handler: Arc<MetaIoHandler<M, S>>,
57 meta_extractor: Arc<dyn MetaExtractor<M>>,
58 session_stats: Option<Arc<dyn session::SessionStats>>,
59 executor: reactor::UninitializedExecutor,
60 incoming_separator: codecs::Separator,
61 outgoing_separator: codecs::Separator,
62 security_attributes: SecurityAttributes,
63 client_buffer_size: usize,
64}
65
66impl<M: Metadata + Default, S: Middleware<M>> ServerBuilder<M, S>
67where
68 S::Future: Unpin,
69 S::CallFuture: Unpin,
70{
71 pub fn new<T>(io_handler: T) -> ServerBuilder<M, S>
73 where
74 T: Into<MetaIoHandler<M, S>>,
75 {
76 Self::with_meta_extractor(io_handler, NoopExtractor)
77 }
78}
79
80impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S>
81where
82 S::Future: Unpin,
83 S::CallFuture: Unpin,
84{
85 pub fn with_meta_extractor<T, E>(io_handler: T, extractor: E) -> ServerBuilder<M, S>
87 where
88 T: Into<MetaIoHandler<M, S>>,
89 E: MetaExtractor<M>,
90 {
91 ServerBuilder {
92 handler: Arc::new(io_handler.into()),
93 meta_extractor: Arc::new(extractor),
94 session_stats: None,
95 executor: reactor::UninitializedExecutor::Unspawned,
96 incoming_separator: codecs::Separator::Empty,
97 outgoing_separator: codecs::Separator::default(),
98 security_attributes: SecurityAttributes::empty(),
99 client_buffer_size: 5,
100 }
101 }
102
103 pub fn event_loop_executor(mut self, executor: TaskExecutor) -> Self {
105 self.executor = reactor::UninitializedExecutor::Shared(executor);
106 self
107 }
108
109 pub fn session_meta_extractor<X>(mut self, meta_extractor: X) -> Self
111 where
112 X: MetaExtractor<M>,
113 {
114 self.meta_extractor = Arc::new(meta_extractor);
115 self
116 }
117
118 pub fn session_stats<T: session::SessionStats>(mut self, stats: T) -> Self {
120 self.session_stats = Some(Arc::new(stats));
121 self
122 }
123
124 pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
126 self.incoming_separator = incoming;
127 self.outgoing_separator = outgoing;
128 self
129 }
130
131 pub fn set_security_attributes(mut self, attr: SecurityAttributes) -> Self {
133 self.security_attributes = attr;
134 self
135 }
136
137 pub fn set_client_buffer_size(mut self, buffer_size: usize) -> Self {
139 self.client_buffer_size = buffer_size;
140 self
141 }
142
143 pub fn start(self, path: &str) -> std::io::Result<Server> {
145 let executor = self.executor.initialize()?;
146 let rpc_handler = self.handler;
147 let endpoint_addr = path.to_owned();
148 let meta_extractor = self.meta_extractor;
149 let session_stats = self.session_stats;
150 let incoming_separator = self.incoming_separator;
151 let outgoing_separator = self.outgoing_separator;
152 let (stop_signal, stop_receiver) = oneshot::channel();
153 let (start_signal, start_receiver) = std::sync::mpsc::channel();
155 let (wait_signal, wait_receiver) = std::sync::mpsc::channel();
156 let security_attributes = self.security_attributes;
157 let client_buffer_size = self.client_buffer_size;
158
159 let fut = async move {
160 let mut endpoint = Endpoint::new(endpoint_addr);
161 endpoint.set_security_attributes(security_attributes);
162
163 if cfg!(unix) {
164 if ::std::fs::remove_file(endpoint.path()).is_ok() {
166 warn!("Removed existing file '{}'.", endpoint.path());
167 }
168 }
169
170 let endpoint_addr = endpoint.path().to_owned();
171 let connections = match endpoint.incoming() {
172 Ok(connections) => connections,
173 Err(e) => {
174 start_signal
175 .send(Err(e))
176 .expect("Cannot fail since receiver never dropped before receiving");
177 return;
178 }
179 };
180
181 let mut id = 0u64;
182
183 use futures::TryStreamExt;
184 let server = connections.map_ok(move |io_stream| {
185 id = id.wrapping_add(1);
186 let session_id = id;
187 let session_stats = session_stats.clone();
188 trace!(target: "ipc", "Accepted incoming IPC connection: {}", session_id);
189 if let Some(stats) = session_stats.as_ref() {
190 stats.open_session(session_id)
191 }
192
193 let (sender, receiver) = mpsc::unbounded();
194 let meta = meta_extractor.extract(&RequestContext {
195 endpoint_addr: endpoint_addr.as_ref(),
196 session_id,
197 sender,
198 });
199 let mut service = Service::new(rpc_handler.clone(), meta);
200 let codec = codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone());
201 let framed = tokio_util::codec::Decoder::framed(codec, io_stream);
202 let (writer, reader) = futures::StreamExt::split(framed);
203
204 let responses = reader
205 .map_ok(move |req| {
206 service
207 .call(req)
208 .map(|x| Ok(x.ok().flatten()))
210 })
211 .try_buffer_unordered(client_buffer_size)
212 .try_filter_map(futures::future::ok)
214 .select_with_weak(receiver.map(Ok));
217
218 responses.forward(writer).then(move |_| {
219 trace!(target: "ipc", "Peer: service finished");
220 if let Some(stats) = session_stats.as_ref() {
221 stats.close_session(session_id)
222 }
223
224 async { Ok(()) }
225 })
226 });
227 start_signal
228 .send(Ok(()))
229 .expect("Cannot fail since receiver never dropped before receiving");
230 let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted);
231 let stop = Box::pin(stop);
232
233 let server = server.try_buffer_unordered(1024).for_each(|_| async {});
234
235 let result = futures::future::select(Box::pin(server), stop).await;
236 drop(result);
239 let _ = wait_signal.send(());
240 };
241
242 use futures::FutureExt;
243 let fut = Box::pin(fut.map(drop));
244 executor.executor().spawn(fut);
245
246 let handle = InnerHandles {
247 executor: Some(executor),
248 stop: Some(stop_signal),
249 path: path.to_owned(),
250 };
251
252 use futures::TryFutureExt;
253 match start_receiver.recv().expect("Message should always be sent") {
254 Ok(()) => Ok(Server {
255 handles: Arc::new(Mutex::new(handle)),
256 wait_handle: Some(wait_receiver),
257 }),
258 Err(e) => Err(e),
259 }
260 }
261}
262
263#[derive(Debug)]
265pub struct Server {
266 handles: Arc<Mutex<InnerHandles>>,
267 wait_handle: Option<std::sync::mpsc::Receiver<()>>,
268}
269
270impl Server {
271 pub fn close(self) {
273 self.handles.lock().close();
274 }
275
276 pub fn close_handle(&self) -> CloseHandle {
278 CloseHandle {
279 inner: self.handles.clone(),
280 }
281 }
282
283 pub fn wait(mut self) {
285 if let Some(wait_receiver) = self.wait_handle.take() {
286 let _ = wait_receiver.recv();
287 }
288 }
289}
290
291#[derive(Debug)]
292struct InnerHandles {
293 executor: Option<reactor::Executor>,
294 stop: Option<oneshot::Sender<()>>,
295 path: String,
296}
297
298impl InnerHandles {
299 pub fn close(&mut self) {
300 let _ = self.stop.take().map(|stop| stop.send(()));
301 if let Some(executor) = self.executor.take() {
302 executor.close()
303 }
304 let _ = ::std::fs::remove_file(&self.path); }
306}
307
308impl Drop for InnerHandles {
309 fn drop(&mut self) {
310 self.close();
311 }
312}
313#[derive(Clone)]
315pub struct CloseHandle {
316 inner: Arc<Mutex<InnerHandles>>,
317}
318
319impl CloseHandle {
320 pub fn close(self) {
322 self.inner.lock().close();
323 }
324}
325
326#[cfg(test)]
327#[cfg(not(windows))]
328mod tests {
329 use super::*;
330
331 use jsonrpc_core::Value;
332 use std::os::unix::net::UnixStream;
333 use std::thread;
334 use std::time::{self, Duration};
335
336 fn server_builder() -> ServerBuilder {
337 let mut io = MetaIoHandler::<()>::default();
338 io.add_sync_method("say_hello", |_params| Ok(Value::String("hello".to_string())));
339 ServerBuilder::new(io)
340 }
341
342 fn run(path: &str) -> Server {
343 let builder = server_builder();
344 let server = builder.start(path).expect("Server must run with no issues");
345 server
346 }
347
348 fn dummy_request_str(path: &str, data: &str) -> String {
349 use futures::SinkExt;
350
351 let reply = async move {
352 use tokio::net::UnixStream;
353
354 let stream: UnixStream = UnixStream::connect(path).await?;
355 let codec = codecs::StreamCodec::stream_incoming();
356 let mut stream = tokio_util::codec::Decoder::framed(codec, stream);
357 stream.send(data.to_owned()).await?;
358 let (reply, _) = stream.into_future().await;
359
360 reply.expect("there should be one reply")
361 };
362
363 let rt = tokio::runtime::Runtime::new().unwrap();
364 rt.block_on(reply).expect("wait for reply")
365 }
366
367 #[test]
368 fn start() {
369 crate::logger::init_log();
370
371 let mut io = MetaIoHandler::<()>::default();
372 io.add_sync_method("say_hello", |_params| Ok(Value::String("hello".to_string())));
373 let server = ServerBuilder::new(io);
374
375 let _server = server
376 .start("/tmp/test-ipc-20000")
377 .expect("Server must run with no issues");
378 }
379
380 #[test]
381 fn connect() {
382 crate::logger::init_log();
383 let path = "/tmp/test-ipc-30000";
384 let _server = run(path);
385
386 UnixStream::connect(path).expect("Socket should connect");
387 }
388
389 #[test]
390 fn request() {
391 crate::logger::init_log();
392 let path = "/tmp/test-ipc-40000";
393 let server = run(path);
394 let (stop_signal, stop_receiver) = std::sync::mpsc::channel();
395
396 let t = thread::spawn(move || {
397 let result = dummy_request_str(
398 path,
399 "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
400 );
401 stop_signal.send(result).unwrap();
402 });
403 t.join().unwrap();
404
405 let result = stop_receiver.recv().unwrap();
406
407 assert_eq!(
408 result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
409 "Response does not exactly match the expected response",
410 );
411 server.close();
412 }
413
414 #[test]
415 fn req_parallel() {
416 crate::logger::init_log();
417 let path = "/tmp/test-ipc-45000";
418 let server = run(path);
419 let (stop_signal, stop_receiver) = futures::channel::mpsc::channel(400);
420
421 let mut handles = Vec::new();
422 for _ in 0..4 {
423 let path = path.clone();
424 let mut stop_signal = stop_signal.clone();
425 handles.push(thread::spawn(move || {
426 for _ in 0..100 {
427 let result = dummy_request_str(
428 &path,
429 "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
430 );
431 stop_signal.try_send(result).unwrap();
432 }
433 }));
434 }
435
436 for handle in handles.drain(..) {
437 handle.join().unwrap();
438 }
439
440 thread::spawn(move || {
441 let fut = stop_receiver
442 .map(|result| {
443 assert_eq!(
444 result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
445 "Response does not exactly match the expected response",
446 );
447 })
448 .take(400)
449 .for_each(|_| async {});
450 futures::executor::block_on(fut);
451 })
452 .join()
453 .unwrap();
454 server.close();
455 }
456
457 #[test]
458 fn close() {
459 crate::logger::init_log();
460 let path = "/tmp/test-ipc-50000";
461 let server = run(path);
462 server.close();
463
464 assert!(
465 ::std::fs::metadata(path).is_err(),
466 "There should be no socket file left"
467 );
468 assert!(
469 UnixStream::connect(path).is_err(),
470 "Connection to the closed socket should fail"
471 );
472 }
473
474 fn huge_response_test_str() -> String {
475 let mut result = String::from("begin_hello");
476 result.push_str("begin_hello");
477 for _ in 0..16384 {
478 result.push(' ');
479 }
480 result.push_str("end_hello");
481 result
482 }
483
484 fn huge_response_test_json() -> String {
485 let mut result = String::from("{\"jsonrpc\":\"2.0\",\"result\":\"");
486 result.push_str(&huge_response_test_str());
487 result.push_str("\",\"id\":1}");
488
489 result
490 }
491
492 #[test]
493 fn test_huge_response() {
494 crate::logger::init_log();
495 let path = "/tmp/test-ipc-60000";
496
497 let mut io = MetaIoHandler::<()>::default();
498 io.add_sync_method("say_huge_hello", |_params| Ok(Value::String(huge_response_test_str())));
499 let builder = ServerBuilder::new(io);
500
501 let server = builder.start(path).expect("Server must run with no issues");
502 let (stop_signal, stop_receiver) = oneshot::channel();
503
504 let t = thread::spawn(move || {
505 let result = dummy_request_str(
506 &path,
507 "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
508 );
509
510 stop_signal.send(result).unwrap();
511 });
512 t.join().unwrap();
513
514 thread::spawn(move || {
515 futures::executor::block_on(async move {
516 let result = stop_receiver.await.unwrap();
517 assert_eq!(
518 result,
519 huge_response_test_json(),
520 "Response does not exactly match the expected response",
521 );
522 server.close();
523 });
524 })
525 .join()
526 .unwrap();
527 }
528
529 #[test]
530 fn test_session_end() {
531 struct SessionEndMeta {
532 drop_signal: Option<oneshot::Sender<()>>,
533 }
534
535 impl Drop for SessionEndMeta {
536 fn drop(&mut self) {
537 trace!(target: "ipc", "Dropping session meta");
538 self.drop_signal.take().unwrap().send(()).unwrap()
539 }
540 }
541
542 struct SessionEndExtractor {
543 drop_receivers: Arc<Mutex<futures::channel::mpsc::Sender<oneshot::Receiver<()>>>>,
544 }
545
546 impl MetaExtractor<Arc<SessionEndMeta>> for SessionEndExtractor {
547 fn extract(&self, _context: &RequestContext) -> Arc<SessionEndMeta> {
548 let (signal, receiver) = oneshot::channel();
549 self.drop_receivers.lock().try_send(receiver).unwrap();
550 let meta = SessionEndMeta {
551 drop_signal: Some(signal),
552 };
553 Arc::new(meta)
554 }
555 }
556
557 crate::logger::init_log();
558 let path = "/tmp/test-ipc-30009";
559 let (signal, receiver) = futures::channel::mpsc::channel(16);
560 let session_metadata_extractor = SessionEndExtractor {
561 drop_receivers: Arc::new(Mutex::new(signal)),
562 };
563
564 let io = MetaIoHandler::<Arc<SessionEndMeta>>::default();
565 let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor);
566 let server = builder.start(path).expect("Server must run with no issues");
567 {
568 let _ = UnixStream::connect(path).expect("Socket should connect");
569 }
570
571 thread::spawn(move || {
572 futures::executor::block_on(async move {
573 let (drop_receiver, ..) = receiver.into_future().await;
574 drop_receiver.unwrap().await.unwrap();
575 });
576 })
577 .join()
578 .unwrap();
579 server.close();
580 }
581
582 #[test]
583 fn close_handle() {
584 crate::logger::init_log();
585 let path = "/tmp/test-ipc-90000";
586 let server = run(path);
587 let handle = server.close_handle();
588 handle.close();
589 assert!(
590 UnixStream::connect(path).is_err(),
591 "Connection to the closed socket should fail"
592 );
593 }
594
595 #[test]
596 fn close_when_waiting() {
597 crate::logger::init_log();
598 let path = "/tmp/test-ipc-70000";
599 let server = run(path);
600 let close_handle = server.close_handle();
601 let (tx, rx) = oneshot::channel();
602
603 thread::spawn(move || {
604 thread::sleep(time::Duration::from_millis(100));
605 close_handle.close();
606 });
607 thread::spawn(move || {
608 server.wait();
609 tx.send(true).expect("failed to report that the server has stopped");
610 });
611
612 let rt = tokio::runtime::Runtime::new().unwrap();
613 rt.block_on(async move {
614 let timeout = tokio::time::sleep(Duration::from_millis(500));
615 futures::pin_mut!(timeout);
616
617 match futures::future::select(rx, timeout).await {
618 futures::future::Either::Left((result, _)) => {
619 assert!(result.is_ok(), "Rx failed");
620 assert_eq!(result, Ok(true), "Wait timeout exceeded");
621 assert!(
622 UnixStream::connect(path).is_err(),
623 "Connection to the closed socket should fail"
624 );
625 Ok(())
626 }
627 futures::future::Either::Right(_) => Err("timed out"),
628 }
629 })
630 .unwrap();
631 }
632
633 #[test]
634 fn runs_with_security_attributes() {
635 let path = "/tmp/test-ipc-9001";
636 let io = MetaIoHandler::<Arc<()>>::default();
637 ServerBuilder::with_meta_extractor(io, NoopExtractor)
638 .set_security_attributes(SecurityAttributes::empty())
639 .start(path)
640 .expect("Server must run with no issues");
641 }
642}