tokio_io_pool/lib.rs
1//! This crate provides a thread pool for executing short, I/O-heavy futures efficiently.
2//!
3//! The standard `Runtime` provided by `tokio` uses a thread-pool to allow concurrent execution of
4//! compute-heavy futures. However, its work-stealing makes it so that futures may be executed on
5//! different threads to where their reactor are running, which results in unnecessary
6//! synchronization, and thus lowers the achievable throughput. While this trade-off works well for
7//! many asynchronous applications, since it spreads load more evenly, it is not a great fit for
8//! high-performance I/O bound applications where the cost of synchronizing threads is high. This
9//! can happen, for example, if your application performs frequent but small I/O operations.
10//!
11//! This crate provides an alternative implementation of a futures-based thread pool. It spawns a
12//! pool of threads that each runs a `tokio::runtime::current_thread::Runtime` (and thus each have
13//! an I/O reactor of their own), and spawns futures onto the pool by assigning the future to
14//! threads round-robin. Once a future has been spawned onto a thread, it, and any child futures it
15//! may produce through `tokio::spawn`, remain under the control of that same thread.
16//!
17//! In general, you should use `tokio-io-pool` only if you perform a lot of very short I/O
18//! operations on many futures, and find that you are bottlenecked by work-stealing or reactor
19//! notifications with the regular `tokio` runtime. If you are unsure what to use, start with the
20//! `tokio` runtime.
21//!
22//! Be aware that this pool does *not* support the
23//! [`blocking`](https://docs.rs/tokio-threadpool/0.1.5/tokio_threadpool/fn.blocking.html) function
24//! since it is [not supported](https://github.com/tokio-rs/tokio/issues/432) by the underlying
25//! `current_thread::Runtime`. Hopefully this will be rectified down the line.
26//!
27//! There is some discussion around trying to merge this pool into `tokio` proper; that effort is
28//! tracked in [tokio-rs/tokio#486](https://github.com/tokio-rs/tokio/issues/486).
29//!
30//! # Examples
31//!
32//! ```no_run
33//! use tokio::prelude::*;
34//! use tokio::io::copy;
35//! use tokio::net::TcpListener;
36//!
37//! fn main() {
38//! // Bind the server's socket.
39//! let addr = "127.0.0.1:12345".parse().unwrap();
40//! let listener = TcpListener::bind(&addr)
41//! .expect("unable to bind TCP listener");
42//!
43//! // Pull out a stream of sockets for incoming connections
44//! let server = listener.incoming()
45//! .map_err(|e| eprintln!("accept failed = {:?}", e))
46//! .for_each(|sock| {
47//! // Split up the reading and writing parts of the
48//! // socket.
49//! let (reader, writer) = sock.split();
50//!
51//! // A future that echos the data and returns how
52//! // many bytes were copied...
53//! let bytes_copied = copy(reader, writer);
54//!
55//! // ... after which we'll print what happened.
56//! let handle_conn = bytes_copied.map(|amt| {
57//! println!("wrote {:?} bytes", amt)
58//! }).map_err(|err| {
59//! eprintln!("IO error {:?}", err)
60//! });
61//!
62//! // Spawn the future as a concurrent task.
63//! tokio::spawn(handle_conn)
64//! });
65//!
66//! // Start the Tokio runtime
67//! tokio_io_pool::run(server);
68//! }
69//! ```
70
71#![deny(missing_docs)]
72#![deny(missing_debug_implementations)]
73#![deny(missing_copy_implementations)]
74#![deny(unused_extern_crates)]
75
76use futures::sync::oneshot;
77use futures::try_ready;
78use std::sync::{atomic, mpsc, Arc};
79use std::{fmt, io, thread};
80use tokio::executor::SpawnError;
81use tokio::prelude::*;
82use tokio::runtime::current_thread;
83
84/// Builds an I/O-oriented thread pool ([`Runtime`]) with custom configuration values.
85///
86/// Methods can be chained in order to set the configuration values. The thread pool is constructed
87/// by calling [`Builder::build`]. New instances of `Builder` are obtained via
88/// [`Builder::default`].
89///
90/// See function level documentation for details on the various configuration settings.
91pub struct Builder {
92 nworkers: usize,
93 name_prefix: Option<String>,
94 after_start: Option<Arc<dyn Fn() + Send + Sync>>,
95 before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
96}
97
98impl fmt::Debug for Builder {
99 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
100 fmt.debug_struct("Builder")
101 .field("nworkers", &self.nworkers)
102 .field("name_prefix", &self.name_prefix)
103 .finish()
104 }
105}
106
107impl Default for Builder {
108 fn default() -> Self {
109 Builder {
110 nworkers: num_cpus::get(),
111 name_prefix: None,
112 after_start: None,
113 before_stop: None,
114 }
115 }
116}
117
118impl Builder {
119 /// Set the number of worker threads for the thread pool instance.
120 ///
121 /// This must be a number between 1 and 32,768 though it is advised to keep
122 /// this value on the smaller side.
123 ///
124 /// The default value is the number of cores available to the system.
125 pub fn pool_size(&mut self, val: usize) -> &mut Self {
126 self.nworkers = val;
127 self
128 }
129
130 /// Set name prefix of threads spawned by the scheduler
131 ///
132 /// Thread name prefix is used for generating thread names. For example, if prefix is
133 /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc.
134 ///
135 /// If this configuration is not set, then the thread will use the system default naming
136 /// scheme.
137 pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
138 self.name_prefix = Some(val.into());
139 self
140 }
141
142 /// Execute function `f` after each thread is started but before it starts doing work.
143 ///
144 /// This is intended for bookkeeping and monitoring use cases.
145 pub fn after_start<F>(&mut self, f: F) -> &mut Self
146 where
147 F: Fn() + Send + Sync + 'static,
148 {
149 self.after_start = Some(Arc::new(f));
150 self
151 }
152
153 /// Execute function `f` before each thread stops.
154 ///
155 /// This is intended for bookkeeping and monitoring use cases.
156 pub fn before_stop<F>(&mut self, f: F) -> &mut Self
157 where
158 F: Fn() + Send + Sync + 'static,
159 {
160 self.before_stop = Some(Arc::new(f));
161 self
162 }
163
164 /// Create the configured [`Runtime`].
165 ///
166 /// The returned [`Runtime`] instance is ready to spawn tasks.
167 pub fn build(&self) -> io::Result<Runtime> {
168 assert!(self.nworkers > 0);
169
170 let mut handles = Vec::with_capacity(self.nworkers);
171 let mut threads = Vec::with_capacity(self.nworkers);
172 for i in 0..self.nworkers {
173 let (trigger, exit) = oneshot::channel();
174 let (handle_tx, handle_rx) = mpsc::sync_channel(1);
175
176 let mut th = thread::Builder::new();
177
178 if let Some(ref prefix) = self.name_prefix {
179 th = th.name(format!("{}{}", prefix, i + 1));
180 }
181
182 let before = self.after_start.clone();
183 let after = self.before_stop.clone();
184
185 let jh = th.spawn(move || {
186 if let Some(ref f) = before {
187 f();
188 }
189
190 let mut rt = current_thread::Runtime::new().unwrap();
191 handle_tx.send(rt.handle()).unwrap();
192 let force_exit: bool = rt.block_on(exit).unwrap();
193 if !force_exit {
194 rt.run().unwrap();
195 }
196
197 if let Some(ref f) = after {
198 f();
199 }
200 })?;
201
202 threads.push((trigger, jh));
203 handles.push(handle_rx.recv().unwrap());
204 }
205
206 let handle = Handle {
207 workers: handles,
208 rri: Arc::new(atomic::AtomicUsize::new(0)),
209 };
210
211 Ok(Runtime {
212 threads,
213 force_exit: true,
214 handle,
215 })
216 }
217}
218
219/// Execute the given future and spawn any child futures onto a newly created I/O thread pool.
220///
221/// This function is used to bootstrap the execution of a Tokio application. It does the following:
222///
223/// - Start the Tokio I/O pool using a default configuration.
224/// - Configure Tokio to make any future spawned with `tokio::spawn` spawn on the pool.
225/// - Run the given future to completion on the current thread.
226/// - Block the current thread until the pool becomes idle.
227///
228/// Note that the function will not return immediately once future has completed. Instead it waits
229/// for the entire pool to become idle. Note also that the top-level future will be executed with
230/// [`Runtime::block_on`], which calls `Future::wait`, and thus does not provide access to timers,
231/// clocks, or other tokio runtime goodies.
232///
233/// # Examples
234///
235/// ```no_run
236/// # extern crate tokio_io_pool;
237/// # extern crate tokio;
238/// # extern crate futures;
239/// # use futures::{Future, Stream};
240/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> {
241/// # unimplemented!();
242/// # }
243/// # let addr = "127.0.0.1:8080".parse().unwrap();
244/// use tokio::net::TcpListener;
245///
246/// let listener = TcpListener::bind(&addr).unwrap();
247///
248/// let server = listener.incoming()
249/// .map_err(|e| println!("error = {:?}", e))
250/// .for_each(|socket| {
251/// tokio::spawn(process(socket))
252/// });
253///
254/// tokio_io_pool::run(server);
255/// ```
256pub fn run<F>(future: F)
257where
258 F: Future<Item = (), Error = ()> + Send + 'static,
259{
260 let mut rt = Runtime::new();
261 let _ = rt.block_on(future);
262 rt.shutdown_on_idle();
263}
264
265/// A handle to a [`Runtime`] that allows spawning additional futures from other threads.
266#[derive(Clone)]
267pub struct Handle {
268 workers: Vec<current_thread::Handle>,
269 rri: Arc<atomic::AtomicUsize>,
270}
271
272impl fmt::Debug for Handle {
273 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
274 fmt.debug_struct("Handle")
275 .field("nworkers", &self.workers.len())
276 .field("next", &self.rri.load(atomic::Ordering::Relaxed))
277 .finish()
278 }
279}
280
281impl tokio::executor::Executor for Handle {
282 fn spawn(
283 &mut self,
284 future: Box<dyn Future<Item = (), Error = ()> + 'static + Send>,
285 ) -> Result<(), SpawnError> {
286 Handle::spawn(self, future).map(|_| ())
287 }
288}
289
290impl Handle {
291 /// Spawn a future onto a runtime in the pool.
292 ///
293 /// This spawns the given future onto a single thread runtime's executor. That thread is then
294 /// responsible for polling the future until it completes.
295 pub fn spawn<F>(&self, future: F) -> Result<&Self, SpawnError>
296 where
297 F: Future<Item = (), Error = ()> + Send + 'static,
298 {
299 let worker = self.rri.fetch_add(1, atomic::Ordering::Relaxed) % self.workers.len();
300 self.workers[worker].spawn(future)?;
301 Ok(self)
302 }
303
304 /// Spawn all futures yielded by a stream onto the pool.
305 ///
306 /// This produces a future that accepts futures from a `Stream` and spawns them all onto the
307 /// pool round-robin.
308 pub fn spawn_all<S>(
309 &self,
310 stream: S,
311 ) -> impl Future<Item = (), Error = StreamSpawnError<<S as Stream>::Error>>
312 where
313 S: Stream,
314 <S as Stream>::Item: Future<Item = (), Error = ()> + Send + 'static,
315 {
316 Spawner {
317 handle: self.clone(),
318 stream,
319 }
320 }
321}
322
323/// An I/O-oriented thread pool for executing futures.
324///
325/// Each thread in the pool has its own I/O reactor, and futures are spawned onto threads
326/// round-robin. Futures do not (currently) move between threads in the pool once spawned, and any
327/// new futures spawned (using `tokio::spawn`) inside futures are scheduled on the same worker as
328/// the original future.
329pub struct Runtime {
330 handle: Handle,
331 threads: Vec<(oneshot::Sender<bool>, thread::JoinHandle<()>)>,
332 force_exit: bool,
333}
334
335impl fmt::Debug for Runtime {
336 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
337 fmt.debug_struct("Runtime")
338 .field("nworkers", &self.threads.len())
339 .finish()
340 }
341}
342
343impl Default for Runtime {
344 fn default() -> Self {
345 Self::new()
346 }
347}
348
349impl Runtime {
350 /// Create a new thread pool with parameters from a default [`Builder`] and return a handle to
351 /// it.
352 ///
353 /// # Panics
354 ///
355 /// Panics if enough threads could not be spawned (see [`Builder::build`]).
356 pub fn new() -> Self {
357 Builder::default().build().unwrap()
358 }
359
360 /// Return a reference to the pool.
361 ///
362 /// The returned handle reference can be cloned in order to get an owned value of the handle.
363 /// This handle can be used to spawn additional futures onto the pool from other threads.
364 pub fn handle(&self) -> &Handle {
365 &self.handle
366 }
367
368 /// Spawn a future onto a runtime in the pool.
369 ///
370 /// This spawns the given future onto a single thread runtime's executor. That thread is then
371 /// responsible for polling the future until it completes.
372 pub fn spawn<F>(&self, future: F) -> Result<&Self, SpawnError>
373 where
374 F: Future<Item = (), Error = ()> + Send + 'static,
375 {
376 self.handle.spawn(future)?;
377 Ok(self)
378 }
379
380 /// Spawn all futures yielded by a stream onto the pool.
381 ///
382 /// This produces a future that accepts futures from a `Stream` and spawns them all onto the
383 /// pool round-robin.
384 #[must_use]
385 pub fn spawn_all<S>(
386 &self,
387 stream: S,
388 ) -> impl Future<Item = (), Error = StreamSpawnError<<S as Stream>::Error>>
389 where
390 S: Stream,
391 <S as Stream>::Item: Future<Item = (), Error = ()> + Send + 'static,
392 {
393 self.handle.spawn_all(stream)
394 }
395
396 /// Run the given future on the current thread, and dispatch any child futures spawned with
397 /// `tokio::spawn` onto the I/O pool.
398 ///
399 /// Note that child futures of futures that are already running on the pool will be executed on
400 /// the same pool thread as their parent. Only the "top-level" calls to `tokio::spawn` are
401 /// scheduled to the thread pool as a whole.
402 ///
403 /// Note that the top-level future is executed using `Future::wait`, and thus does not provide
404 /// access to timers, clocks, or other tokio runtime goodies.
405 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
406 where
407 F: Send + 'static + Future<Item = R, Error = E>,
408 R: Send + 'static,
409 E: Send + 'static,
410 {
411 let mut enter = tokio_executor::enter().expect("already running in executor context");
412 tokio_executor::with_default(&mut self.handle, &mut enter, |_| future.wait())
413 }
414
415 /// Shut down the pool as soon as possible.
416 ///
417 /// Note that once this method has been called, attempts to spawn additional futures onto the
418 /// pool through an outstanding `Handle` may fail. Futures that have not yet resolved will be
419 /// dropped.
420 ///
421 /// The pool will only terminate once any currently-running futures return `NotReady`.
422 pub fn shutdown(self) {}
423
424 /// Shut down the pool once all spawned futures have completed.
425 ///
426 /// Note that once this method has been called, attempts to spawn additional futures onto the
427 /// pool through an outstanding `Handle` may fail.
428 pub fn shutdown_on_idle(mut self) {
429 self.force_exit = false;
430 }
431}
432
433impl Drop for Runtime {
434 fn drop(&mut self) {
435 let mut handles = Vec::with_capacity(self.threads.len());
436 for (exit, jh) in self.threads.drain(..) {
437 if let Err(e) = exit.send(self.force_exit) {
438 if !thread::panicking() {
439 panic!("oneshot::Sender::Send: {:?}", e);
440 }
441 }
442 handles.push(jh);
443 }
444 for jh in handles {
445 if let Err(e) = jh.join() {
446 if !thread::panicking() {
447 panic!("JoinHandle::join: {:?}", e);
448 }
449 }
450 }
451 }
452}
453
454/// An error that occurred as a result of spawning futures from a stream given to
455/// [`Runtime::spawn_all`].
456#[derive(Debug)]
457pub enum StreamSpawnError<SE> {
458 /// An error occurred while spawning a future yielded by the stream onto the pool.
459 Spawn(SpawnError),
460 /// An error occurred while polling the stream for another future.
461 Stream(SE),
462}
463
464impl<SE> From<SE> for StreamSpawnError<SE> {
465 fn from(e: SE) -> Self {
466 StreamSpawnError::Stream(e)
467 }
468}
469
470struct Spawner<S> {
471 handle: Handle,
472 stream: S,
473}
474
475impl<S> fmt::Debug for Spawner<S>
476where
477 S: fmt::Debug,
478{
479 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
480 fmt.debug_struct("Spawner")
481 .field("stream", &self.stream)
482 .finish()
483 }
484}
485
486impl<S> Future for Spawner<S>
487where
488 S: Stream,
489 <S as Stream>::Item: Future<Item = (), Error = ()> + Send + 'static,
490{
491 type Item = ();
492 type Error = StreamSpawnError<<S as Stream>::Error>;
493
494 fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
495 while let Some(fut) = try_ready!(self.stream.poll()) {
496 self.handle.spawn(fut).map_err(StreamSpawnError::Spawn)?;
497 }
498 Ok(Async::Ready(()))
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 #[test]
507 fn it_works() {
508 use futures::future::lazy;
509 use futures::sync::oneshot;
510
511 let (tx, rx) = oneshot::channel();
512
513 let rt = Runtime::new();
514 rt.spawn(lazy(move || {
515 tx.send(()).unwrap();
516 Ok(())
517 }))
518 .unwrap();
519 assert_eq!(rx.wait().unwrap(), ());
520 rt.shutdown_on_idle();
521 }
522
523 #[test]
524 fn spawn_all() {
525 let addr = "127.0.0.1:0".parse().unwrap();
526 let listener = tokio::net::TcpListener::bind(&addr).expect("unable to bind TCP listener");
527 let addr = listener.local_addr().unwrap();
528
529 let rt = Builder::default().pool_size(1).build().unwrap();
530 let server = listener
531 .incoming()
532 .map_err(|e| unreachable!("{:?}", e))
533 .map(|sock| {
534 let (reader, writer) = sock.split();
535 let bytes_copied = tokio::io::copy(reader, writer);
536 bytes_copied
537 .map(|_| ())
538 .map_err(|err| unreachable!("{:?}", err))
539 });
540
541 // spawn all connections onto the pool
542 let spawner = rt.spawn_all(server);
543
544 // spawn the spawner onto the pool too
545 // (a "real" server might wait for it instead)
546 rt.spawn(spawner.map_err(|e| unreachable!("{:?}", e)))
547 .unwrap();
548
549 let mut client = ::std::net::TcpStream::connect(&addr).unwrap();
550 client.write_all(b"hello world").unwrap();
551 client.shutdown(::std::net::Shutdown::Write).unwrap();
552 let mut bytes = Vec::new();
553 client.read_to_end(&mut bytes).unwrap();
554 assert_eq!(&bytes, b"hello world");
555
556 let mut client = ::std::net::TcpStream::connect(&addr).unwrap();
557 client.write_all(b"bye world").unwrap();
558 client.shutdown(::std::net::Shutdown::Write).unwrap();
559 let mut bytes = Vec::new();
560 client.read_to_end(&mut bytes).unwrap();
561 assert_eq!(&bytes, b"bye world");
562 }
563
564 #[test]
565 fn run() {
566 let addr = "127.0.0.1:0".parse().unwrap();
567 let listener = tokio::net::TcpListener::bind(&addr).expect("unable to bind TCP listener");
568 let addr = listener.local_addr().unwrap();
569
570 thread::spawn(move || {
571 let server = listener
572 .incoming()
573 .map_err(|e| unreachable!("{:?}", e))
574 .for_each(|sock| {
575 let (reader, writer) = sock.split();
576 let bytes_copied = tokio::io::copy(reader, writer);
577 tokio::spawn(
578 bytes_copied
579 .map(|_| ())
580 .map_err(|err| unreachable!("{:?}", err)),
581 )
582 });
583
584 super::run(server);
585 });
586
587 let mut client = ::std::net::TcpStream::connect(&addr).unwrap();
588 client.write_all(b"hello world").unwrap();
589 client.shutdown(::std::net::Shutdown::Write).unwrap();
590 let mut bytes = Vec::new();
591 client.read_to_end(&mut bytes).unwrap();
592 assert_eq!(&bytes, b"hello world");
593
594 let mut client = ::std::net::TcpStream::connect(&addr).unwrap();
595 client.write_all(b"bye world").unwrap();
596 client.shutdown(::std::net::Shutdown::Write).unwrap();
597 let mut bytes = Vec::new();
598 client.read_to_end(&mut bytes).unwrap();
599 assert_eq!(&bytes, b"bye world");
600 }
601
602 // futures channels can exchange information between different threads
603 #[test]
604 fn interthread_communication() {
605 use futures::sync::oneshot;
606
607 let (tx0, rx0) = oneshot::channel::<u32>();
608 let (tx1, rx1) = oneshot::channel::<u32>();
609
610 let rt = Builder::default().pool_size(2).build().unwrap();
611 rt.spawn(future::ok::<(), ()>(tx0.send(42).unwrap()))
612 .unwrap();
613 rt.spawn(rx0.map(|v| tx1.send(v + 1).unwrap()).map_err(|_| ()))
614 .unwrap();
615 assert_eq!(rx1.wait().unwrap(), 43);
616 rt.shutdown_on_idle();
617 }
618
619 // A Future that isn't Send can't be spawned into the Runtime, but it _can_ be spawned from a
620 // thread onto that same thread
621 #[test]
622 fn spawn_nonsend_futures() {
623 use futures::future::lazy;
624 use futures::sync::oneshot;
625 use std::rc::Rc;
626
627 let rt = Runtime::new();
628 rt.spawn(lazy(|| {
629 let (tx, rx) = oneshot::channel::<u32>();
630 let x = Rc::new(42u32); // Note: Rc is not Send
631 tokio_current_thread::spawn(lazy(move || {
632 tx.send(*x).unwrap();
633 Ok(())
634 }));
635 rx.map(|value| assert_eq!(42, value)).map_err(|_| ())
636 }))
637 .unwrap();
638 rt.shutdown_on_idle();
639 }
640
641 #[test]
642 fn really_lazy() {
643 super::run(future::lazy(|| {
644 tokio::spawn(future::lazy(|| Ok(())));
645 Ok(())
646 }));
647 }
648}