jsonrpc_tcp_server/
server.rs1use std::io;
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use tower_service::Service as _;
7
8use crate::futures::{self, future};
9use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
10use crate::server_utils::tokio_stream::wrappers::TcpListenerStream;
11use crate::server_utils::{codecs, reactor, tokio, tokio_util::codec::Framed, SuspendableStream};
12
13use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels};
14use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
15use crate::service::Service;
16
17pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
19 executor: reactor::UninitializedExecutor,
20 handler: Arc<MetaIoHandler<M, S>>,
21 meta_extractor: Arc<dyn MetaExtractor<M>>,
22 channels: Arc<SenderChannels>,
23 incoming_separator: codecs::Separator,
24 outgoing_separator: codecs::Separator,
25}
26
27impl<M: Metadata + Default, S: Middleware<M> + 'static> ServerBuilder<M, S>
28where
29 S::Future: Unpin,
30 S::CallFuture: Unpin,
31{
32 pub fn new<T>(handler: T) -> Self
34 where
35 T: Into<MetaIoHandler<M, S>>,
36 {
37 Self::with_meta_extractor(handler, NoopExtractor)
38 }
39}
40
41impl<M: Metadata, S: Middleware<M> + 'static> ServerBuilder<M, S>
42where
43 S::Future: Unpin,
44 S::CallFuture: Unpin,
45{
46 pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self
48 where
49 T: Into<MetaIoHandler<M, S>>,
50 E: MetaExtractor<M> + 'static,
51 {
52 ServerBuilder {
53 executor: reactor::UninitializedExecutor::Unspawned,
54 handler: Arc::new(handler.into()),
55 meta_extractor: Arc::new(extractor),
56 channels: Default::default(),
57 incoming_separator: Default::default(),
58 outgoing_separator: Default::default(),
59 }
60 }
61
62 pub fn event_loop_executor(mut self, handle: reactor::TaskExecutor) -> Self {
64 self.executor = reactor::UninitializedExecutor::Shared(handle);
65 self
66 }
67
68 pub fn session_meta_extractor<T: MetaExtractor<M> + 'static>(mut self, meta_extractor: T) -> Self {
70 self.meta_extractor = Arc::new(meta_extractor);
71 self
72 }
73
74 pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
76 self.incoming_separator = incoming;
77 self.outgoing_separator = outgoing;
78 self
79 }
80
81 pub fn start(self, addr: &SocketAddr) -> io::Result<Server> {
83 let meta_extractor = self.meta_extractor.clone();
84 let rpc_handler = self.handler.clone();
85 let channels = self.channels.clone();
86 let incoming_separator = self.incoming_separator;
87 let outgoing_separator = self.outgoing_separator;
88 let address = addr.to_owned();
89 let (tx, rx) = std::sync::mpsc::channel();
90 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
91
92 let executor = self.executor.initialize()?;
93
94 use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt};
95 executor.executor().spawn(async move {
96 let start = async {
97 let listener = tokio::net::TcpListener::bind(&address).await?;
98 let listener = TcpListenerStream::new(listener);
99 let connections = SuspendableStream::new(listener);
100
101 let server = connections.map(|socket| {
102 let peer_addr = match socket.peer_addr() {
103 Ok(addr) => addr,
104 Err(e) => {
105 warn!(target: "tcp", "Unable to determine socket peer address, ignoring connection {}", e);
106 return future::Either::Left(async { io::Result::Ok(()) });
107 }
108 };
109 trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr);
110 let (sender, receiver) = futures::channel::mpsc::unbounded();
111
112 let context = RequestContext {
113 peer_addr,
114 sender: sender.clone(),
115 };
116
117 let meta = meta_extractor.extract(&context);
118 let mut service = Service::new(peer_addr, rpc_handler.clone(), meta);
119 let (mut writer, reader) = Framed::new(
120 socket,
121 codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone()),
122 )
123 .split();
124
125 let responses: Pin<Box<dyn futures::Stream<Item = io::Result<String>> + Send>> =
127 Box::pin(reader.and_then(move |req| {
128 service.call(req).then(|response| match response {
129 Err(e) => {
130 warn!(target: "tcp", "Error while processing request: {:?}", e);
131 future::ok(String::new())
132 }
133 Ok(None) => {
134 trace!(target: "tcp", "JSON RPC request produced no response");
135 future::ok(String::new())
136 }
137 Ok(Some(response_data)) => {
138 trace!(target: "tcp", "Sent response: {}", &response_data);
139 future::ok(response_data)
140 }
141 })
142 }));
143
144 let mut peer_message_queue = {
145 let mut channels = channels.lock();
146 channels.insert(peer_addr, sender);
147
148 PeerMessageQueue::new(responses, receiver, peer_addr)
149 };
150
151 let shared_channels = channels.clone();
152 let writer = async move {
153 writer.send_all(&mut peer_message_queue).await?;
154 trace!(target: "tcp", "Peer {}: service finished", peer_addr);
155 let mut channels = shared_channels.lock();
156 channels.remove(&peer_addr);
157 Ok(())
158 };
159
160 future::Either::Right(writer)
161 });
162
163 Ok(server)
164 };
165
166 match start.await {
167 Ok(server) => {
168 tx.send(Ok(())).expect("Rx is blocking parent thread.");
169 let server = server.buffer_unordered(1024).for_each(|_| async {});
170
171 future::select(Box::pin(server), stop_rx).await;
172 }
173 Err(e) => {
174 tx.send(Err(e)).expect("Rx is blocking parent thread.");
175 let _ = stop_rx.await;
176 }
177 }
178 });
179
180 let res = rx.recv().expect("Response is always sent before tx is dropped.");
181
182 res.map(|_| Server {
183 executor: Some(executor),
184 stop: Some(stop_tx),
185 })
186 }
187
188 pub fn dispatcher(&self) -> Dispatcher {
190 Dispatcher::new(self.channels.clone())
191 }
192}
193
194pub struct Server {
196 executor: Option<reactor::Executor>,
197 stop: Option<futures::channel::oneshot::Sender<()>>,
198}
199
200impl Server {
201 pub fn close(mut self) {
203 let _ = self.stop.take().map(|sg| sg.send(()));
204 self.executor.take().unwrap().close();
205 }
206
207 pub fn wait(mut self) {
209 self.executor.take().unwrap().wait();
210 }
211}
212
213impl Drop for Server {
214 fn drop(&mut self) {
215 let _ = self.stop.take().map(|sg| sg.send(()));
216 if let Some(executor) = self.executor.take() {
217 executor.close()
218 }
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn server_is_send_and_sync() {
228 fn is_send_and_sync<T: Send + Sync>() {}
229
230 is_send_and_sync::<Server>();
231 }
232}