tokio_io_pool/
lib.rs

1//! This crate provides a thread pool for executing short, I/O-heavy futures efficiently.
2//!
3//! The standard `Runtime` provided by `tokio` uses a thread-pool to allow concurrent execution of
4//! compute-heavy futures. However, its work-stealing makes it so that futures may be executed on
5//! different threads to where their reactor are running, which results in unnecessary
6//! synchronization, and thus lowers the achievable throughput. While this trade-off works well for
7//! many asynchronous applications, since it spreads load more evenly, it is not a great fit for
8//! high-performance I/O bound applications where the cost of synchronizing threads is high. This
9//! can happen, for example, if your application performs frequent but small I/O operations.
10//!
11//! This crate provides an alternative implementation of a futures-based thread pool. It spawns a
12//! pool of threads that each runs a `tokio::runtime::current_thread::Runtime` (and thus each have
13//! an I/O reactor of their own), and spawns futures onto the pool by assigning the future to
14//! threads round-robin. Once a future has been spawned onto a thread, it, and any child futures it
15//! may produce through `tokio::spawn`, remain under the control of that same thread.
16//!
17//! In general, you should use `tokio-io-pool` only if you perform a lot of very short I/O
18//! operations on many futures, and find that you are bottlenecked by work-stealing or reactor
19//! notifications with the regular `tokio` runtime. If you are unsure what to use, start with the
20//! `tokio` runtime.
21//!
22//! Be aware that this pool does *not* support the
23//! [`blocking`](https://docs.rs/tokio-threadpool/0.1.5/tokio_threadpool/fn.blocking.html) function
24//! since it is [not supported](https://github.com/tokio-rs/tokio/issues/432) by the underlying
25//! `current_thread::Runtime`. Hopefully this will be rectified down the line.
26//!
27//! There is some discussion around trying to merge this pool into `tokio` proper; that effort is
28//! tracked in [tokio-rs/tokio#486](https://github.com/tokio-rs/tokio/issues/486).
29//!
30//! # Examples
31//!
32//! ```no_run
33//! use tokio::prelude::*;
34//! use tokio::io::copy;
35//! use tokio::net::TcpListener;
36//!
37//! fn main() {
38//!     // Bind the server's socket.
39//!     let addr = "127.0.0.1:12345".parse().unwrap();
40//!     let listener = TcpListener::bind(&addr)
41//!         .expect("unable to bind TCP listener");
42//!
43//!     // Pull out a stream of sockets for incoming connections
44//!     let server = listener.incoming()
45//!         .map_err(|e| eprintln!("accept failed = {:?}", e))
46//!         .for_each(|sock| {
47//!             // Split up the reading and writing parts of the
48//!             // socket.
49//!             let (reader, writer) = sock.split();
50//!
51//!             // A future that echos the data and returns how
52//!             // many bytes were copied...
53//!             let bytes_copied = copy(reader, writer);
54//!
55//!             // ... after which we'll print what happened.
56//!             let handle_conn = bytes_copied.map(|amt| {
57//!                 println!("wrote {:?} bytes", amt)
58//!             }).map_err(|err| {
59//!                 eprintln!("IO error {:?}", err)
60//!             });
61//!
62//!             // Spawn the future as a concurrent task.
63//!             tokio::spawn(handle_conn)
64//!         });
65//!
66//!     // Start the Tokio runtime
67//!     tokio_io_pool::run(server);
68//! }
69//! ```
70
71#![deny(missing_docs)]
72#![deny(missing_debug_implementations)]
73#![deny(missing_copy_implementations)]
74#![deny(unused_extern_crates)]
75
76use futures::sync::oneshot;
77use futures::try_ready;
78use std::sync::{atomic, mpsc, Arc};
79use std::{fmt, io, thread};
80use tokio::executor::SpawnError;
81use tokio::prelude::*;
82use tokio::runtime::current_thread;
83
84/// Builds an I/O-oriented thread pool ([`Runtime`]) with custom configuration values.
85///
86/// Methods can be chained in order to set the configuration values. The thread pool is constructed
87/// by calling [`Builder::build`]. New instances of `Builder` are obtained via
88/// [`Builder::default`].
89///
90/// See function level documentation for details on the various configuration settings.
91pub struct Builder {
92    nworkers: usize,
93    name_prefix: Option<String>,
94    after_start: Option<Arc<dyn Fn() + Send + Sync>>,
95    before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
96}
97
98impl fmt::Debug for Builder {
99    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
100        fmt.debug_struct("Builder")
101            .field("nworkers", &self.nworkers)
102            .field("name_prefix", &self.name_prefix)
103            .finish()
104    }
105}
106
107impl Default for Builder {
108    fn default() -> Self {
109        Builder {
110            nworkers: num_cpus::get(),
111            name_prefix: None,
112            after_start: None,
113            before_stop: None,
114        }
115    }
116}
117
118impl Builder {
119    /// Set the number of worker threads for the thread pool instance.
120    ///
121    /// This must be a number between 1 and 32,768 though it is advised to keep
122    /// this value on the smaller side.
123    ///
124    /// The default value is the number of cores available to the system.
125    pub fn pool_size(&mut self, val: usize) -> &mut Self {
126        self.nworkers = val;
127        self
128    }
129
130    /// Set name prefix of threads spawned by the scheduler
131    ///
132    /// Thread name prefix is used for generating thread names. For example, if prefix is
133    /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc.
134    ///
135    /// If this configuration is not set, then the thread will use the system default naming
136    /// scheme.
137    pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
138        self.name_prefix = Some(val.into());
139        self
140    }
141
142    /// Execute function `f` after each thread is started but before it starts doing work.
143    ///
144    /// This is intended for bookkeeping and monitoring use cases.
145    pub fn after_start<F>(&mut self, f: F) -> &mut Self
146    where
147        F: Fn() + Send + Sync + 'static,
148    {
149        self.after_start = Some(Arc::new(f));
150        self
151    }
152
153    /// Execute function `f` before each thread stops.
154    ///
155    /// This is intended for bookkeeping and monitoring use cases.
156    pub fn before_stop<F>(&mut self, f: F) -> &mut Self
157    where
158        F: Fn() + Send + Sync + 'static,
159    {
160        self.before_stop = Some(Arc::new(f));
161        self
162    }
163
164    /// Create the configured [`Runtime`].
165    ///
166    /// The returned [`Runtime`] instance is ready to spawn tasks.
167    pub fn build(&self) -> io::Result<Runtime> {
168        assert!(self.nworkers > 0);
169
170        let mut handles = Vec::with_capacity(self.nworkers);
171        let mut threads = Vec::with_capacity(self.nworkers);
172        for i in 0..self.nworkers {
173            let (trigger, exit) = oneshot::channel();
174            let (handle_tx, handle_rx) = mpsc::sync_channel(1);
175
176            let mut th = thread::Builder::new();
177
178            if let Some(ref prefix) = self.name_prefix {
179                th = th.name(format!("{}{}", prefix, i + 1));
180            }
181
182            let before = self.after_start.clone();
183            let after = self.before_stop.clone();
184
185            let jh = th.spawn(move || {
186                if let Some(ref f) = before {
187                    f();
188                }
189
190                let mut rt = current_thread::Runtime::new().unwrap();
191                handle_tx.send(rt.handle()).unwrap();
192                let force_exit: bool = rt.block_on(exit).unwrap();
193                if !force_exit {
194                    rt.run().unwrap();
195                }
196
197                if let Some(ref f) = after {
198                    f();
199                }
200            })?;
201
202            threads.push((trigger, jh));
203            handles.push(handle_rx.recv().unwrap());
204        }
205
206        let handle = Handle {
207            workers: handles,
208            rri: Arc::new(atomic::AtomicUsize::new(0)),
209        };
210
211        Ok(Runtime {
212            threads,
213            force_exit: true,
214            handle,
215        })
216    }
217}
218
219/// Execute the given future and spawn any child futures onto a newly created I/O thread pool.
220///
221/// This function is used to bootstrap the execution of a Tokio application. It does the following:
222///
223///  - Start the Tokio I/O pool using a default configuration.
224///  - Configure Tokio to make any future spawned with `tokio::spawn` spawn on the pool.
225///  - Run the given future to completion on the current thread.
226///  - Block the current thread until the pool becomes idle.
227///
228/// Note that the function will not return immediately once future has completed. Instead it waits
229/// for the entire pool to become idle. Note also that the top-level future will be executed with
230/// [`Runtime::block_on`], which calls `Future::wait`, and thus does not provide access to timers,
231/// clocks, or other tokio runtime goodies.
232///
233/// # Examples
234///
235/// ```no_run
236/// # extern crate tokio_io_pool;
237/// # extern crate tokio;
238/// # extern crate futures;
239/// # use futures::{Future, Stream};
240/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> {
241/// # unimplemented!();
242/// # }
243/// # let addr = "127.0.0.1:8080".parse().unwrap();
244/// use tokio::net::TcpListener;
245///
246/// let listener = TcpListener::bind(&addr).unwrap();
247///
248/// let server = listener.incoming()
249///     .map_err(|e| println!("error = {:?}", e))
250///     .for_each(|socket| {
251///         tokio::spawn(process(socket))
252///     });
253///
254/// tokio_io_pool::run(server);
255/// ```
256pub fn run<F>(future: F)
257where
258    F: Future<Item = (), Error = ()> + Send + 'static,
259{
260    let mut rt = Runtime::new();
261    let _ = rt.block_on(future);
262    rt.shutdown_on_idle();
263}
264
265/// A handle to a [`Runtime`] that allows spawning additional futures from other threads.
266#[derive(Clone)]
267pub struct Handle {
268    workers: Vec<current_thread::Handle>,
269    rri: Arc<atomic::AtomicUsize>,
270}
271
272impl fmt::Debug for Handle {
273    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
274        fmt.debug_struct("Handle")
275            .field("nworkers", &self.workers.len())
276            .field("next", &self.rri.load(atomic::Ordering::Relaxed))
277            .finish()
278    }
279}
280
281impl tokio::executor::Executor for Handle {
282    fn spawn(
283        &mut self,
284        future: Box<dyn Future<Item = (), Error = ()> + 'static + Send>,
285    ) -> Result<(), SpawnError> {
286        Handle::spawn(self, future).map(|_| ())
287    }
288}
289
290impl Handle {
291    /// Spawn a future onto a runtime in the pool.
292    ///
293    /// This spawns the given future onto a single thread runtime's executor. That thread is then
294    /// responsible for polling the future until it completes.
295    pub fn spawn<F>(&self, future: F) -> Result<&Self, SpawnError>
296    where
297        F: Future<Item = (), Error = ()> + Send + 'static,
298    {
299        let worker = self.rri.fetch_add(1, atomic::Ordering::Relaxed) % self.workers.len();
300        self.workers[worker].spawn(future)?;
301        Ok(self)
302    }
303
304    /// Spawn all futures yielded by a stream onto the pool.
305    ///
306    /// This produces a future that accepts futures from a `Stream` and spawns them all onto the
307    /// pool round-robin.
308    pub fn spawn_all<S>(
309        &self,
310        stream: S,
311    ) -> impl Future<Item = (), Error = StreamSpawnError<<S as Stream>::Error>>
312    where
313        S: Stream,
314        <S as Stream>::Item: Future<Item = (), Error = ()> + Send + 'static,
315    {
316        Spawner {
317            handle: self.clone(),
318            stream,
319        }
320    }
321}
322
323/// An I/O-oriented thread pool for executing futures.
324///
325/// Each thread in the pool has its own I/O reactor, and futures are spawned onto threads
326/// round-robin. Futures do not (currently) move between threads in the pool once spawned, and any
327/// new futures spawned (using `tokio::spawn`) inside futures are scheduled on the same worker as
328/// the original future.
329pub struct Runtime {
330    handle: Handle,
331    threads: Vec<(oneshot::Sender<bool>, thread::JoinHandle<()>)>,
332    force_exit: bool,
333}
334
335impl fmt::Debug for Runtime {
336    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
337        fmt.debug_struct("Runtime")
338            .field("nworkers", &self.threads.len())
339            .finish()
340    }
341}
342
343impl Default for Runtime {
344    fn default() -> Self {
345        Self::new()
346    }
347}
348
349impl Runtime {
350    /// Create a new thread pool with parameters from a default [`Builder`] and return a handle to
351    /// it.
352    ///
353    /// # Panics
354    ///
355    /// Panics if enough threads could not be spawned (see [`Builder::build`]).
356    pub fn new() -> Self {
357        Builder::default().build().unwrap()
358    }
359
360    /// Return a reference to the pool.
361    ///
362    /// The returned handle reference can be cloned in order to get an owned value of the handle.
363    /// This handle can be used to spawn additional futures onto the pool from other threads.
364    pub fn handle(&self) -> &Handle {
365        &self.handle
366    }
367
368    /// Spawn a future onto a runtime in the pool.
369    ///
370    /// This spawns the given future onto a single thread runtime's executor. That thread is then
371    /// responsible for polling the future until it completes.
372    pub fn spawn<F>(&self, future: F) -> Result<&Self, SpawnError>
373    where
374        F: Future<Item = (), Error = ()> + Send + 'static,
375    {
376        self.handle.spawn(future)?;
377        Ok(self)
378    }
379
380    /// Spawn all futures yielded by a stream onto the pool.
381    ///
382    /// This produces a future that accepts futures from a `Stream` and spawns them all onto the
383    /// pool round-robin.
384    #[must_use]
385    pub fn spawn_all<S>(
386        &self,
387        stream: S,
388    ) -> impl Future<Item = (), Error = StreamSpawnError<<S as Stream>::Error>>
389    where
390        S: Stream,
391        <S as Stream>::Item: Future<Item = (), Error = ()> + Send + 'static,
392    {
393        self.handle.spawn_all(stream)
394    }
395
396    /// Run the given future on the current thread, and dispatch any child futures spawned with
397    /// `tokio::spawn` onto the I/O pool.
398    ///
399    /// Note that child futures of futures that are already running on the pool will be executed on
400    /// the same pool thread as their parent. Only the "top-level" calls to `tokio::spawn` are
401    /// scheduled to the thread pool as a whole.
402    ///
403    /// Note that the top-level future is executed using `Future::wait`, and thus does not provide
404    /// access to timers, clocks, or other tokio runtime goodies.
405    pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
406    where
407        F: Send + 'static + Future<Item = R, Error = E>,
408        R: Send + 'static,
409        E: Send + 'static,
410    {
411        let mut enter = tokio_executor::enter().expect("already running in executor context");
412        tokio_executor::with_default(&mut self.handle, &mut enter, |_| future.wait())
413    }
414
415    /// Shut down the pool as soon as possible.
416    ///
417    /// Note that once this method has been called, attempts to spawn additional futures onto the
418    /// pool through an outstanding `Handle` may fail. Futures that have not yet resolved will be
419    /// dropped.
420    ///
421    /// The pool will only terminate once any currently-running futures return `NotReady`.
422    pub fn shutdown(self) {}
423
424    /// Shut down the pool once all spawned futures have completed.
425    ///
426    /// Note that once this method has been called, attempts to spawn additional futures onto the
427    /// pool through an outstanding `Handle` may fail.
428    pub fn shutdown_on_idle(mut self) {
429        self.force_exit = false;
430    }
431}
432
433impl Drop for Runtime {
434    fn drop(&mut self) {
435        let mut handles = Vec::with_capacity(self.threads.len());
436        for (exit, jh) in self.threads.drain(..) {
437            if let Err(e) = exit.send(self.force_exit) {
438                if !thread::panicking() {
439                    panic!("oneshot::Sender::Send: {:?}", e);
440                }
441            }
442            handles.push(jh);
443        }
444        for jh in handles {
445            if let Err(e) = jh.join() {
446                if !thread::panicking() {
447                    panic!("JoinHandle::join: {:?}", e);
448                }
449            }
450        }
451    }
452}
453
454/// An error that occurred as a result of spawning futures from a stream given to
455/// [`Runtime::spawn_all`].
456#[derive(Debug)]
457pub enum StreamSpawnError<SE> {
458    /// An error occurred while spawning a future yielded by the stream onto the pool.
459    Spawn(SpawnError),
460    /// An error occurred while polling the stream for another future.
461    Stream(SE),
462}
463
464impl<SE> From<SE> for StreamSpawnError<SE> {
465    fn from(e: SE) -> Self {
466        StreamSpawnError::Stream(e)
467    }
468}
469
470struct Spawner<S> {
471    handle: Handle,
472    stream: S,
473}
474
475impl<S> fmt::Debug for Spawner<S>
476where
477    S: fmt::Debug,
478{
479    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
480        fmt.debug_struct("Spawner")
481            .field("stream", &self.stream)
482            .finish()
483    }
484}
485
486impl<S> Future for Spawner<S>
487where
488    S: Stream,
489    <S as Stream>::Item: Future<Item = (), Error = ()> + Send + 'static,
490{
491    type Item = ();
492    type Error = StreamSpawnError<<S as Stream>::Error>;
493
494    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
495        while let Some(fut) = try_ready!(self.stream.poll()) {
496            self.handle.spawn(fut).map_err(StreamSpawnError::Spawn)?;
497        }
498        Ok(Async::Ready(()))
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505
506    #[test]
507    fn it_works() {
508        use futures::future::lazy;
509        use futures::sync::oneshot;
510
511        let (tx, rx) = oneshot::channel();
512
513        let rt = Runtime::new();
514        rt.spawn(lazy(move || {
515            tx.send(()).unwrap();
516            Ok(())
517        }))
518        .unwrap();
519        assert_eq!(rx.wait().unwrap(), ());
520        rt.shutdown_on_idle();
521    }
522
523    #[test]
524    fn spawn_all() {
525        let addr = "127.0.0.1:0".parse().unwrap();
526        let listener = tokio::net::TcpListener::bind(&addr).expect("unable to bind TCP listener");
527        let addr = listener.local_addr().unwrap();
528
529        let rt = Builder::default().pool_size(1).build().unwrap();
530        let server = listener
531            .incoming()
532            .map_err(|e| unreachable!("{:?}", e))
533            .map(|sock| {
534                let (reader, writer) = sock.split();
535                let bytes_copied = tokio::io::copy(reader, writer);
536                bytes_copied
537                    .map(|_| ())
538                    .map_err(|err| unreachable!("{:?}", err))
539            });
540
541        // spawn all connections onto the pool
542        let spawner = rt.spawn_all(server);
543
544        // spawn the spawner onto the pool too
545        // (a "real" server might wait for it instead)
546        rt.spawn(spawner.map_err(|e| unreachable!("{:?}", e)))
547            .unwrap();
548
549        let mut client = ::std::net::TcpStream::connect(&addr).unwrap();
550        client.write_all(b"hello world").unwrap();
551        client.shutdown(::std::net::Shutdown::Write).unwrap();
552        let mut bytes = Vec::new();
553        client.read_to_end(&mut bytes).unwrap();
554        assert_eq!(&bytes, b"hello world");
555
556        let mut client = ::std::net::TcpStream::connect(&addr).unwrap();
557        client.write_all(b"bye world").unwrap();
558        client.shutdown(::std::net::Shutdown::Write).unwrap();
559        let mut bytes = Vec::new();
560        client.read_to_end(&mut bytes).unwrap();
561        assert_eq!(&bytes, b"bye world");
562    }
563
564    #[test]
565    fn run() {
566        let addr = "127.0.0.1:0".parse().unwrap();
567        let listener = tokio::net::TcpListener::bind(&addr).expect("unable to bind TCP listener");
568        let addr = listener.local_addr().unwrap();
569
570        thread::spawn(move || {
571            let server = listener
572                .incoming()
573                .map_err(|e| unreachable!("{:?}", e))
574                .for_each(|sock| {
575                    let (reader, writer) = sock.split();
576                    let bytes_copied = tokio::io::copy(reader, writer);
577                    tokio::spawn(
578                        bytes_copied
579                            .map(|_| ())
580                            .map_err(|err| unreachable!("{:?}", err)),
581                    )
582                });
583
584            super::run(server);
585        });
586
587        let mut client = ::std::net::TcpStream::connect(&addr).unwrap();
588        client.write_all(b"hello world").unwrap();
589        client.shutdown(::std::net::Shutdown::Write).unwrap();
590        let mut bytes = Vec::new();
591        client.read_to_end(&mut bytes).unwrap();
592        assert_eq!(&bytes, b"hello world");
593
594        let mut client = ::std::net::TcpStream::connect(&addr).unwrap();
595        client.write_all(b"bye world").unwrap();
596        client.shutdown(::std::net::Shutdown::Write).unwrap();
597        let mut bytes = Vec::new();
598        client.read_to_end(&mut bytes).unwrap();
599        assert_eq!(&bytes, b"bye world");
600    }
601
602    // futures channels can exchange information between different threads
603    #[test]
604    fn interthread_communication() {
605        use futures::sync::oneshot;
606
607        let (tx0, rx0) = oneshot::channel::<u32>();
608        let (tx1, rx1) = oneshot::channel::<u32>();
609
610        let rt = Builder::default().pool_size(2).build().unwrap();
611        rt.spawn(future::ok::<(), ()>(tx0.send(42).unwrap()))
612            .unwrap();
613        rt.spawn(rx0.map(|v| tx1.send(v + 1).unwrap()).map_err(|_| ()))
614            .unwrap();
615        assert_eq!(rx1.wait().unwrap(), 43);
616        rt.shutdown_on_idle();
617    }
618
619    // A Future that isn't Send can't be spawned into the Runtime, but it _can_ be spawned from a
620    // thread onto that same thread
621    #[test]
622    fn spawn_nonsend_futures() {
623        use futures::future::lazy;
624        use futures::sync::oneshot;
625        use std::rc::Rc;
626
627        let rt = Runtime::new();
628        rt.spawn(lazy(|| {
629            let (tx, rx) = oneshot::channel::<u32>();
630            let x = Rc::new(42u32); // Note: Rc is not Send
631            tokio_current_thread::spawn(lazy(move || {
632                tx.send(*x).unwrap();
633                Ok(())
634            }));
635            rx.map(|value| assert_eq!(42, value)).map_err(|_| ())
636        }))
637        .unwrap();
638        rt.shutdown_on_idle();
639    }
640
641    #[test]
642    fn really_lazy() {
643        super::run(future::lazy(|| {
644            tokio::spawn(future::lazy(|| Ok(())));
645            Ok(())
646        }));
647    }
648}