use std::sync::Arc;
use crate::jsonrpc::futures::sync::{mpsc, oneshot};
use crate::jsonrpc::futures::{future, Future, Sink, Stream};
use crate::jsonrpc::{middleware, FutureResult, MetaIoHandler, Metadata, Middleware};
use tokio_service::{self, Service as TokioService};
use crate::server_utils::{
codecs, reactor, session,
tokio::{reactor::Handle, runtime::TaskExecutor},
tokio_codec::Framed,
};
use parking_lot::Mutex;
use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
use crate::select_with_weak::SelectWithWeakExt;
use parity_tokio_ipc::Endpoint;
pub use parity_tokio_ipc::SecurityAttributes;
pub struct Service<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
handler: Arc<MetaIoHandler<M, S>>,
meta: M,
}
impl<M: Metadata, S: Middleware<M>> Service<M, S> {
pub fn new(handler: Arc<MetaIoHandler<M, S>>, meta: M) -> Self {
Service { handler, meta }
}
}
impl<M: Metadata, S: Middleware<M>> tokio_service::Service for Service<M, S> {
type Request = String;
type Response = Option<String>;
type Error = ();
type Future = FutureResult<S::Future, S::CallFuture>;
fn call(&self, req: Self::Request) -> Self::Future {
trace!(target: "ipc", "Received request: {}", req);
self.handler.handle_request(&req, self.meta.clone())
}
}
pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
handler: Arc<MetaIoHandler<M, S>>,
meta_extractor: Arc<dyn MetaExtractor<M>>,
session_stats: Option<Arc<dyn session::SessionStats>>,
executor: reactor::UninitializedExecutor,
reactor: Option<Handle>,
incoming_separator: codecs::Separator,
outgoing_separator: codecs::Separator,
security_attributes: SecurityAttributes,
client_buffer_size: usize,
}
impl<M: Metadata + Default, S: Middleware<M>> ServerBuilder<M, S> {
pub fn new<T>(io_handler: T) -> ServerBuilder<M, S>
where
T: Into<MetaIoHandler<M, S>>,
{
Self::with_meta_extractor(io_handler, NoopExtractor)
}
}
impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
pub fn with_meta_extractor<T, E>(io_handler: T, extractor: E) -> ServerBuilder<M, S>
where
T: Into<MetaIoHandler<M, S>>,
E: MetaExtractor<M>,
{
ServerBuilder {
handler: Arc::new(io_handler.into()),
meta_extractor: Arc::new(extractor),
session_stats: None,
executor: reactor::UninitializedExecutor::Unspawned,
reactor: None,
incoming_separator: codecs::Separator::Empty,
outgoing_separator: codecs::Separator::default(),
security_attributes: SecurityAttributes::empty(),
client_buffer_size: 5,
}
}
pub fn event_loop_executor(mut self, executor: TaskExecutor) -> Self {
self.executor = reactor::UninitializedExecutor::Shared(executor);
self
}
pub fn event_loop_reactor(mut self, reactor: Handle) -> Self {
self.reactor = Some(reactor);
self
}
pub fn session_meta_extractor<X>(mut self, meta_extractor: X) -> Self
where
X: MetaExtractor<M>,
{
self.meta_extractor = Arc::new(meta_extractor);
self
}
pub fn session_stats<T: session::SessionStats>(mut self, stats: T) -> Self {
self.session_stats = Some(Arc::new(stats));
self
}
pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
self.incoming_separator = incoming;
self.outgoing_separator = outgoing;
self
}
pub fn set_security_attributes(mut self, attr: SecurityAttributes) -> Self {
self.security_attributes = attr;
self
}
pub fn set_client_buffer_size(mut self, buffer_size: usize) -> Self {
self.client_buffer_size = buffer_size;
self
}
pub fn start(self, path: &str) -> std::io::Result<Server> {
let executor = self.executor.initialize()?;
let reactor = self.reactor;
let rpc_handler = self.handler;
let endpoint_addr = path.to_owned();
let meta_extractor = self.meta_extractor;
let session_stats = self.session_stats;
let incoming_separator = self.incoming_separator;
let outgoing_separator = self.outgoing_separator;
let (stop_signal, stop_receiver) = oneshot::channel();
let (start_signal, start_receiver) = oneshot::channel();
let (wait_signal, wait_receiver) = oneshot::channel();
let security_attributes = self.security_attributes;
let client_buffer_size = self.client_buffer_size;
executor.spawn(future::lazy(move || {
let mut endpoint = Endpoint::new(endpoint_addr);
endpoint.set_security_attributes(security_attributes);
if cfg!(unix) {
if ::std::fs::remove_file(endpoint.path()).is_ok() {
warn!("Removed existing file '{}'.", endpoint.path());
}
}
let reactor = if cfg!(windows) {
#[allow(deprecated)]
reactor.unwrap_or_else(Handle::current)
} else {
reactor.unwrap_or_else(Handle::default)
};
let connections = match endpoint.incoming(&reactor) {
Ok(connections) => connections,
Err(e) => {
start_signal
.send(Err(e))
.expect("Cannot fail since receiver never dropped before receiving");
return future::Either::A(future::ok(()));
}
};
let mut id = 0u64;
let server = connections.map(move |(io_stream, remote_id)| {
id = id.wrapping_add(1);
let session_id = id;
let session_stats = session_stats.clone();
trace!(target: "ipc", "Accepted incoming IPC connection: {}", session_id);
if let Some(stats) = session_stats.as_ref() {
stats.open_session(session_id)
}
let (sender, receiver) = mpsc::channel(16);
let meta = meta_extractor.extract(&RequestContext {
endpoint_addr: &remote_id,
session_id,
sender,
});
let service = Service::new(rpc_handler.clone(), meta);
let (writer, reader) = Framed::new(
io_stream,
codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone()),
)
.split();
let responses = reader
.map(move |req| {
service
.call(req)
.then(|result| match result {
Err(_) => future::ok(None),
Ok(some_result) => future::ok(some_result),
})
.map_err(|_: ()| std::io::ErrorKind::Other.into())
})
.buffer_unordered(client_buffer_size)
.filter_map(|x| x)
.select_with_weak(receiver.map_err(|e| {
warn!(target: "ipc", "Notification error: {:?}", e);
std::io::ErrorKind::Other.into()
}));
let writer = writer.send_all(responses).then(move |_| {
trace!(target: "ipc", "Peer: service finished");
if let Some(stats) = session_stats.as_ref() {
stats.close_session(session_id)
}
Ok(())
});
writer
});
start_signal
.send(Ok(()))
.expect("Cannot fail since receiver never dropped before receiving");
let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted.into());
future::Either::B(
server
.buffer_unordered(1024)
.for_each(|_| Ok(()))
.select(stop)
.map(|(_, server)| {
drop(server);
let _ = wait_signal.send(());
})
.map_err(|_| ()),
)
}));
let handle = InnerHandles {
executor: Some(executor),
stop: Some(stop_signal),
path: path.to_owned(),
};
match start_receiver.wait().expect("Message should always be sent") {
Ok(()) => Ok(Server {
handles: Arc::new(Mutex::new(handle)),
wait_handle: Some(wait_receiver),
}),
Err(e) => Err(e),
}
}
}
#[derive(Debug)]
pub struct Server {
handles: Arc<Mutex<InnerHandles>>,
wait_handle: Option<oneshot::Receiver<()>>,
}
impl Server {
pub fn close(self) {
self.handles.lock().close();
}
pub fn close_handle(&self) -> CloseHandle {
CloseHandle {
inner: self.handles.clone(),
}
}
pub fn wait(mut self) {
self.wait_handle.take().map(|wait_receiver| wait_receiver.wait());
}
}
#[derive(Debug)]
struct InnerHandles {
executor: Option<reactor::Executor>,
stop: Option<oneshot::Sender<()>>,
path: String,
}
impl InnerHandles {
pub fn close(&mut self) {
let _ = self.stop.take().map(|stop| stop.send(()));
if let Some(executor) = self.executor.take() {
executor.close()
}
let _ = ::std::fs::remove_file(&self.path);
}
}
impl Drop for InnerHandles {
fn drop(&mut self) {
self.close();
}
}
#[derive(Clone)]
pub struct CloseHandle {
inner: Arc<Mutex<InnerHandles>>,
}
impl CloseHandle {
pub fn close(self) {
self.inner.lock().close();
}
}
#[cfg(test)]
#[cfg(not(windows))]
mod tests {
use tokio_uds;
use self::tokio_uds::UnixStream;
use super::SecurityAttributes;
use super::{Server, ServerBuilder};
use crate::jsonrpc::futures::sync::{mpsc, oneshot};
use crate::jsonrpc::futures::{future, Future, Sink, Stream};
use crate::jsonrpc::{MetaIoHandler, Value};
use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
use crate::server_utils::codecs;
use crate::server_utils::{
tokio::{self, timer::Delay},
tokio_codec::Decoder,
};
use parking_lot::Mutex;
use std::sync::Arc;
use std::thread;
use std::time;
use std::time::{Duration, Instant};
fn server_builder() -> ServerBuilder {
let mut io = MetaIoHandler::<()>::default();
io.add_method("say_hello", |_params| Ok(Value::String("hello".to_string())));
ServerBuilder::new(io)
}
fn run(path: &str) -> Server {
let builder = server_builder();
let server = builder.start(path).expect("Server must run with no issues");
server
}
fn dummy_request_str(path: &str, data: &str) -> String {
let stream_future = UnixStream::connect(path);
let reply = stream_future.and_then(|stream| {
let stream = codecs::StreamCodec::stream_incoming().framed(stream);
let reply = stream
.send(data.to_owned())
.and_then(move |stream| stream.into_future().map_err(|(err, _)| err))
.and_then(|(reply, _)| future::ok(reply.expect("there should be one reply")));
reply
});
reply.wait().expect("wait for reply")
}
#[test]
fn start() {
crate::logger::init_log();
let mut io = MetaIoHandler::<()>::default();
io.add_method("say_hello", |_params| Ok(Value::String("hello".to_string())));
let server = ServerBuilder::new(io);
let _server = server
.start("/tmp/test-ipc-20000")
.expect("Server must run with no issues");
}
#[test]
fn connect() {
crate::logger::init_log();
let path = "/tmp/test-ipc-30000";
let _server = run(path);
UnixStream::connect(path).wait().expect("Socket should connect");
}
#[test]
fn request() {
crate::logger::init_log();
let path = "/tmp/test-ipc-40000";
let server = run(path);
let (stop_signal, stop_receiver) = oneshot::channel();
let t = thread::spawn(move || {
let result = dummy_request_str(
path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
);
stop_signal.send(result).unwrap();
});
t.join().unwrap();
let _ = stop_receiver
.map(move |result: String| {
assert_eq!(
result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);
server.close();
})
.wait();
}
#[test]
fn req_parallel() {
crate::logger::init_log();
let path = "/tmp/test-ipc-45000";
let server = run(path);
let (stop_signal, stop_receiver) = mpsc::channel(400);
let mut handles = Vec::new();
for _ in 0..4 {
let path = path.clone();
let mut stop_signal = stop_signal.clone();
handles.push(thread::spawn(move || {
for _ in 0..100 {
let result = dummy_request_str(
&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
);
stop_signal.try_send(result).unwrap();
}
}));
}
for handle in handles.drain(..) {
handle.join().unwrap();
}
let _ = stop_receiver
.map(|result| {
assert_eq!(
result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);
})
.take(400)
.collect()
.wait();
server.close();
}
#[test]
fn close() {
crate::logger::init_log();
let path = "/tmp/test-ipc-50000";
let server = run(path);
server.close();
assert!(
::std::fs::metadata(path).is_err(),
"There should be no socket file left"
);
assert!(
UnixStream::connect(path).wait().is_err(),
"Connection to the closed socket should fail"
);
}
fn huge_response_test_str() -> String {
let mut result = String::from("begin_hello");
result.push_str("begin_hello");
for _ in 0..16384 {
result.push(' ');
}
result.push_str("end_hello");
result
}
fn huge_response_test_json() -> String {
let mut result = String::from("{\"jsonrpc\":\"2.0\",\"result\":\"");
result.push_str(&huge_response_test_str());
result.push_str("\",\"id\":1}");
result
}
#[test]
fn test_huge_response() {
crate::logger::init_log();
let path = "/tmp/test-ipc-60000";
let mut io = MetaIoHandler::<()>::default();
io.add_method("say_huge_hello", |_params| Ok(Value::String(huge_response_test_str())));
let builder = ServerBuilder::new(io);
let server = builder.start(path).expect("Server must run with no issues");
let (stop_signal, stop_receiver) = oneshot::channel();
let t = thread::spawn(move || {
let result = dummy_request_str(
&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
);
stop_signal.send(result).unwrap();
});
t.join().unwrap();
let _ = stop_receiver
.map(move |result: String| {
assert_eq!(
result,
huge_response_test_json(),
"Response does not exactly match the expected response",
);
server.close();
})
.wait();
}
#[test]
fn test_session_end() {
struct SessionEndMeta {
drop_signal: Option<oneshot::Sender<()>>,
}
impl Drop for SessionEndMeta {
fn drop(&mut self) {
trace!(target: "ipc", "Dropping session meta");
self.drop_signal.take().unwrap().send(()).unwrap()
}
}
struct SessionEndExtractor {
drop_receivers: Arc<Mutex<mpsc::Sender<oneshot::Receiver<()>>>>,
}
impl MetaExtractor<Arc<SessionEndMeta>> for SessionEndExtractor {
fn extract(&self, _context: &RequestContext) -> Arc<SessionEndMeta> {
let (signal, receiver) = oneshot::channel();
self.drop_receivers.lock().try_send(receiver).unwrap();
let meta = SessionEndMeta {
drop_signal: Some(signal),
};
Arc::new(meta)
}
}
crate::logger::init_log();
let path = "/tmp/test-ipc-30009";
let (signal, receiver) = mpsc::channel(16);
let session_metadata_extractor = SessionEndExtractor {
drop_receivers: Arc::new(Mutex::new(signal)),
};
let io = MetaIoHandler::<Arc<SessionEndMeta>>::default();
let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor);
let server = builder.start(path).expect("Server must run with no issues");
{
let _ = UnixStream::connect(path).wait().expect("Socket should connect");
}
receiver
.into_future()
.map_err(|_| ())
.and_then(|drop_receiver| drop_receiver.0.unwrap().map_err(|_| ()))
.wait()
.unwrap();
server.close();
}
#[test]
fn close_handle() {
crate::logger::init_log();
let path = "/tmp/test-ipc-90000";
let server = run(path);
let handle = server.close_handle();
handle.close();
assert!(
UnixStream::connect(path).wait().is_err(),
"Connection to the closed socket should fail"
);
}
#[test]
fn close_when_waiting() {
crate::logger::init_log();
let path = "/tmp/test-ipc-70000";
let server = run(path);
let close_handle = server.close_handle();
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(time::Duration::from_millis(100));
close_handle.close();
});
thread::spawn(move || {
server.wait();
tx.send(true).expect("failed to report that the server has stopped");
});
let delay = Delay::new(Instant::now() + Duration::from_millis(500))
.map(|_| false)
.map_err(|err| panic!("{:?}", err));
let result_fut = rx.map_err(|_| ()).select(delay).then(move |result| match result {
Ok((result, _)) => {
assert_eq!(result, true, "Wait timeout exceeded");
assert!(
UnixStream::connect(path).wait().is_err(),
"Connection to the closed socket should fail"
);
Ok(())
}
Err(_) => Err(()),
});
tokio::run(result_fut);
}
#[test]
fn runs_with_security_attributes() {
let path = "/tmp/test-ipc-9001";
let io = MetaIoHandler::<Arc<()>>::default();
ServerBuilder::with_meta_extractor(io, NoopExtractor)
.set_security_attributes(SecurityAttributes::empty())
.start(path)
.expect("Server must run with no issues");
}
}