jwalk/
lib.rs

1#![warn(clippy::all)]
2
3//! Filesystem walk.
4//!
5//! - Performed in parallel using rayon
6//! - Entries streamed in sorted order
7//! - Custom sort/filter/skip/state
8//!
9//! # Example
10//!
11//! Recursively iterate over the "foo" directory sorting by name:
12//!
13//! ```no_run
14//! # use std::io::Error;
15//! use jwalk::{WalkDir};
16//!
17//! # fn try_main() -> Result<(), Error> {
18//! for entry in WalkDir::new("foo").sort(true) {
19//!   println!("{}", entry?.path().display());
20//! }
21//! # Ok(())
22//! # }
23//! ```
24//! # Extended Example
25//!
26//! This example uses the
27//! [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir)
28//! callback for custom:
29//! 1. **Sort** Entries by name
30//! 2. **Filter** Errors and hidden files
31//! 3. **Skip** Content of directories at depth 2
32//! 4. **State** Track depth `read_dir_state`. Mark first entry in each
33//!    directory with [`client_state`](struct.DirEntry.html#field.client_state)
34//!    `= true`.
35//!
36//! ```no_run
37//! # use std::io::Error;
38//! use std::cmp::Ordering;
39//! use jwalk::{ WalkDirGeneric };
40//!
41//! # fn try_main() -> Result<(), Error> {
42//! let walk_dir = WalkDirGeneric::<((usize),(bool))>::new("foo")
43//!     .process_read_dir(|depth, path, read_dir_state, children| {
44//!         // 1. Custom sort
45//!         children.sort_by(|a, b| match (a, b) {
46//!             (Ok(a), Ok(b)) => a.file_name.cmp(&b.file_name),
47//!             (Ok(_), Err(_)) => Ordering::Less,
48//!             (Err(_), Ok(_)) => Ordering::Greater,
49//!             (Err(_), Err(_)) => Ordering::Equal,
50//!         });
51//!         // 2. Custom filter
52//!         children.retain(|dir_entry_result| {
53//!             dir_entry_result.as_ref().map(|dir_entry| {
54//!                 dir_entry.file_name
55//!                     .to_str()
56//!                     .map(|s| s.starts_with('.'))
57//!                     .unwrap_or(false)
58//!             }).unwrap_or(false)
59//!         });
60//!         // 3. Custom skip
61//!         children.iter_mut().for_each(|dir_entry_result| {
62//!             if let Ok(dir_entry) = dir_entry_result {
63//!                 if dir_entry.depth == 2 {
64//!                     dir_entry.read_children_path = None;
65//!                 }
66//!             }
67//!         });
68//!         // 4. Custom state
69//!         *read_dir_state += 1;
70//!         children.first_mut().map(|dir_entry_result| {
71//!             if let Ok(dir_entry) = dir_entry_result {
72//!                 dir_entry.client_state = true;
73//!             }
74//!         });
75//!     });
76//!
77//! for entry in walk_dir {
78//!   println!("{}", entry?.path().display());
79//! }
80//! # Ok(())
81//! # }
82//! ```
83//! # Inspiration
84//!
85//! This crate is inspired by both [`walkdir`](https://crates.io/crates/walkdir)
86//! and [`ignore`](https://crates.io/crates/ignore). It attempts to combine the
87//! parallelism of `ignore` with `walkdir`'s streaming iterator API. Some code,
88//! comments, and test are copied directly from `walkdir`.
89//!
90//! # Implementation
91//!
92//! The following structures are central to the implementation:
93//!
94//! ## `ReadDirSpec`
95//!
96//! Specification of a future `read_dir` operation. These are stored in the
97//! `read_dir_spec_queue` in depth first order. When a rayon thread is ready for
98//! work it pulls the first availible `ReadDirSpec` from this queue.
99//!
100//! ## `ReadDir`
101//!
102//! Result of a `read_dir` operation generated by rayon thread. These results
103//! are stored in the `read_dir_result_queue`, also depth first ordered.
104//!
105//! ## `ReadDirIter`
106//!
107//! Pulls `ReadDir` results from the `read_dir_result_queue`. This iterator is
108//! driven by calling thread. Results are returned in strict depth first order.
109//!
110//! ## `DirEntryIter`
111//!
112//! Wraps a `ReadDirIter` and yields individual `DirEntry` results in strict
113//! depth first order.
114
115mod core;
116
117use rayon::{ThreadPool, ThreadPoolBuilder};
118use std::cmp::Ordering;
119use std::default::Default;
120use std::ffi::OsStr;
121use std::fmt::Debug;
122use std::fs;
123use std::path::{Path, PathBuf};
124use std::sync::Arc;
125
126use crate::core::{ReadDir, ReadDirSpec};
127
128pub use crate::core::{DirEntry, DirEntryIter, Error};
129pub use rayon;
130
131/// Builder for walking a directory.
132pub type WalkDir = WalkDirGeneric<((), ())>;
133
134/// A specialized Result type for WalkDir.
135pub type Result<T> = std::result::Result<T, Error>;
136
137/// Client state maintained while performing walk.
138///
139/// for state stored in DirEntry's
140/// [`client_state`](struct.DirEntry.html#field.client_state) field.
141///
142/// Client state can be stored from within the
143/// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) callback.
144/// The type of ClientState is determined by WalkDirGeneric type parameter.
145pub trait ClientState: Send + Default + Debug + 'static {
146    type ReadDirState: Clone + Send + Default + Debug + 'static;
147    type DirEntryState: Send + Default + Debug + 'static;
148}
149
150/// Generic builder for walking a directory.
151///
152/// [`ClientState`](trait.ClientState.html) type parameter allows you to specify
153/// state to be stored with each DirEntry from within the
154/// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir)
155/// callback.
156///
157/// Use [`WalkDir`](type.WalkDir.html) if you don't need to store client state
158/// into yeilded DirEntries.
159pub struct WalkDirGeneric<C: ClientState> {
160    root: PathBuf,
161    options: WalkDirOptions<C>,
162}
163
164type ProcessReadDirFunction<C> = dyn Fn(Option<usize>, &Path, &mut <C as ClientState>::ReadDirState, &mut Vec<Result<DirEntry<C>>>)
165    + Send
166    + Sync
167    + 'static;
168
169/// Degree of parallelism to use when performing walk.
170///
171/// Parallelism happens at the directory level. It will help when walking deep
172/// filesystems with many directories. It wont help when reading a single
173/// directory with many files.
174///
175/// If you plan to perform lots of per file processing you might want to use Rayon to
176#[derive(Clone)]
177pub enum Parallelism {
178    /// Run on calling thread, similar to what happens in the `walkdir` crate.
179    Serial,
180    /// Run in default rayon thread pool.
181    RayonDefaultPool {
182        /// Define when we consider the rayon default pool too busy to serve our iteration and abort the iteration, defaulting to 1s.
183        ///
184        /// This can happen if `jwalk` is launched from within a par-iter on a pool that only has a single thread,
185        /// or if there are many parallel `jwalk` invocations that all use the same threadpool, rendering it too busy
186        /// to respond within this duration.
187        busy_timeout: std::time::Duration,
188    },
189    /// Run in existing rayon thread pool
190    RayonExistingPool {
191        /// The pool to spawn our work onto.
192        pool: Arc<ThreadPool>,
193        /// Similar to [`Parallelism::RayonDefaultPool::busy_timeout`] if `Some`, but can be `None` to skip the deadlock check
194        /// in case you know that there is at least one free thread available on the pool.
195        busy_timeout: Option<std::time::Duration>,
196    },
197    /// Run in new rayon thread pool with # threads
198    RayonNewPool(usize),
199}
200
201struct WalkDirOptions<C: ClientState> {
202    sort: bool,
203    min_depth: usize,
204    max_depth: usize,
205    skip_hidden: bool,
206    follow_links: bool,
207    parallelism: Parallelism,
208    root_read_dir_state: C::ReadDirState,
209    process_read_dir: Option<Arc<ProcessReadDirFunction<C>>>,
210}
211
212impl<C: ClientState> WalkDirGeneric<C> {
213    /// Create a builder for a recursive directory iterator starting at the file
214    /// path root. If root is a directory, then it is the first item yielded by
215    /// the iterator. If root is a file, then it is the first and only item
216    /// yielded by the iterator.
217    ///
218    /// Note that his iterator can fail on the first element if `into_iter()` is used as it
219    /// has to be infallible. Use [`try_into_iter()`][WalkDirGeneric::try_into_iter()]
220    /// instead for error handling.
221    pub fn new<P: AsRef<Path>>(root: P) -> Self {
222        WalkDirGeneric {
223            root: root.as_ref().to_path_buf(),
224            options: WalkDirOptions {
225                sort: false,
226                min_depth: 0,
227                max_depth: ::std::usize::MAX,
228                skip_hidden: true,
229                follow_links: false,
230                parallelism: Parallelism::RayonDefaultPool {
231                    busy_timeout: std::time::Duration::from_secs(1),
232                },
233                root_read_dir_state: C::ReadDirState::default(),
234                process_read_dir: None,
235            },
236        }
237    }
238
239    /// Try to create an iterator or fail if the rayon threadpool (in any configuration) is busy.
240    pub fn try_into_iter(self) -> Result<DirEntryIter<C>> {
241        let iter = self.into_iter();
242        if iter.read_dir_iter.is_none() {
243            Err(Error::busy())
244        } else {
245            Ok(iter)
246        }
247    }
248
249    /// Root path of the walk.
250    pub fn root(&self) -> &Path {
251        &self.root
252    }
253
254    /// Sort entries by `file_name` per directory. Defaults to `false`. Use
255    /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) for custom
256    /// sorting or filtering.
257    pub fn sort(mut self, sort: bool) -> Self {
258        self.options.sort = sort;
259        self
260    }
261
262    /// Skip hidden entries. Enabled by default.
263    pub fn skip_hidden(mut self, skip_hidden: bool) -> Self {
264        self.options.skip_hidden = skip_hidden;
265        self
266    }
267
268    /// Follow symbolic links. By default, this is disabled.
269    ///
270    /// When `yes` is `true`, symbolic links are followed as if they were normal
271    /// directories and files. If a symbolic link is broken or is involved in a
272    /// loop, an error is yielded.
273    ///
274    /// When enabled, the yielded [`DirEntry`] values represent the target of
275    /// the link while the path corresponds to the link. See the [`DirEntry`]
276    /// type for more details.
277    ///
278    /// [`DirEntry`]: struct.DirEntry.html
279    pub fn follow_links(mut self, follow_links: bool) -> Self {
280        self.options.follow_links = follow_links;
281        self
282    }
283
284    /// Set the minimum depth of entries yielded by the iterator.
285    ///
286    /// The smallest depth is `0` and always corresponds to the path given
287    /// to the `new` function on this type. Its direct descendents have depth
288    /// `1`, and their descendents have depth `2`, and so on.
289    pub fn min_depth(mut self, depth: usize) -> Self {
290        self.options.min_depth = depth;
291        if self.options.min_depth > self.options.max_depth {
292            self.options.min_depth = self.options.max_depth;
293        }
294        self
295    }
296
297    /// Set the maximum depth of entries yield by the iterator.
298    ///
299    /// The smallest depth is `0` and always corresponds to the path given
300    /// to the `new` function on this type. Its direct descendents have depth
301    /// `1`, and their descendents have depth `2`, and so on.
302    ///
303    /// A depth < 2 will automatically change `parallelism` to
304    /// `Parallelism::Serial`. Parrallelism happens at the `fs::read_dir` level.
305    /// It only makes sense to use multiple threads when reading more then one
306    /// directory.
307    ///
308    /// Note that this will not simply filter the entries of the iterator, but
309    /// it will actually avoid descending into directories when the depth is
310    /// exceeded.
311    pub fn max_depth(mut self, depth: usize) -> Self {
312        self.options.max_depth = depth;
313        if self.options.max_depth < self.options.min_depth {
314            self.options.max_depth = self.options.min_depth;
315        }
316        if self.options.max_depth < 2 {
317            self.options.parallelism = Parallelism::Serial;
318        }
319        self
320    }
321
322    /// Degree of parallelism to use when performing walk. Defaults to
323    /// [`Parallelism::RayonDefaultPool`](enum.Parallelism.html#variant.RayonDefaultPool).
324    pub fn parallelism(mut self, parallelism: Parallelism) -> Self {
325        self.options.parallelism = parallelism;
326        self
327    }
328
329    /// Initial ClientState::ReadDirState that is passed to
330    /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir)
331    /// when processing root. Defaults to ClientState::ReadDirState::default().
332    pub fn root_read_dir_state(mut self, read_dir_state: C::ReadDirState) -> Self {
333        self.options.root_read_dir_state = read_dir_state;
334        self
335    }
336
337    /// A callback function to process (sort/filter/skip/state) each directory
338    /// of entries before they are yielded. Modify the given array to
339    /// sort/filter entries. Use [`entry.read_children_path =
340    /// None`](struct.DirEntry.html#field.read_children_path) to yield a
341    /// directory entry but skip reading its contents. Use
342    /// [`entry.client_state`](struct.DirEntry.html#field.client_state)
343    /// to store custom state with an entry.
344    pub fn process_read_dir<F>(mut self, process_by: F) -> Self
345    where
346        F: Fn(Option<usize>, &Path, &mut C::ReadDirState, &mut Vec<Result<DirEntry<C>>>)
347            + Send
348            + Sync
349            + 'static,
350    {
351        self.options.process_read_dir = Some(Arc::new(process_by));
352        self
353    }
354}
355
356fn process_dir_entry_result<C: ClientState>(
357    dir_entry_result: Result<DirEntry<C>>,
358    follow_links: bool,
359) -> Result<DirEntry<C>> {
360    match dir_entry_result {
361        Ok(mut dir_entry) => {
362            if follow_links && dir_entry.file_type.is_symlink() {
363                dir_entry = dir_entry.follow_symlink()?;
364            }
365
366            if dir_entry.depth == 0 && dir_entry.file_type.is_symlink() {
367                // As a special case, if we are processing a root entry, then we
368                // always follow it even if it's a symlink and follow_links is
369                // false. We are careful to not let this change the semantics of
370                // the DirEntry however. Namely, the DirEntry should still
371                // respect the follow_links setting. When it's disabled, it
372                // should report itself as a symlink. When it's enabled, it
373                // should always report itself as the target.
374                let metadata = fs::metadata(dir_entry.path())
375                    .map_err(|err| Error::from_path(0, dir_entry.path(), err))?;
376                if metadata.file_type().is_dir() {
377                    dir_entry.read_children_path = Some(Arc::from(dir_entry.path()));
378                }
379            }
380
381            Ok(dir_entry)
382        }
383        Err(err) => Err(err),
384    }
385}
386
387impl<C: ClientState> IntoIterator for WalkDirGeneric<C> {
388    type Item = Result<DirEntry<C>>;
389    type IntoIter = DirEntryIter<C>;
390
391    fn into_iter(self) -> DirEntryIter<C> {
392        let sort = self.options.sort;
393        let max_depth = self.options.max_depth;
394        let min_depth = self.options.min_depth;
395        let parallelism = self.options.parallelism;
396        let skip_hidden = self.options.skip_hidden;
397        let follow_links = self.options.follow_links;
398        let process_read_dir = self.options.process_read_dir.clone();
399        let mut root_read_dir_state = self.options.root_read_dir_state;
400        let follow_link_ancestors = if follow_links {
401            Arc::new(vec![Arc::from(self.root.clone()) as Arc<Path>])
402        } else {
403            Arc::new(vec![])
404        };
405
406        let root_entry = DirEntry::from_path(0, &self.root, false, follow_link_ancestors);
407        let root_parent_path = root_entry
408            .as_ref()
409            .map(|root| root.parent_path().to_owned())
410            .unwrap_or_default();
411        let mut root_entry_results = vec![process_dir_entry_result(root_entry, follow_links)];
412        if let Some(process_read_dir) = process_read_dir.as_ref() {
413            process_read_dir(
414                None,
415                &root_parent_path,
416                &mut root_read_dir_state,
417                &mut root_entry_results,
418            );
419        }
420
421        DirEntryIter::new(
422            root_entry_results,
423            parallelism,
424            min_depth,
425            root_read_dir_state,
426            Arc::new(move |read_dir_spec| {
427                let ReadDirSpec {
428                    path,
429                    depth,
430                    mut client_read_state,
431                    mut follow_link_ancestors,
432                } = read_dir_spec;
433
434                let read_dir_depth = depth;
435                let read_dir_contents_depth = depth + 1;
436
437                if read_dir_contents_depth > max_depth {
438                    return Ok(ReadDir::new(client_read_state, Vec::new()));
439                }
440
441                follow_link_ancestors = if follow_links {
442                    let mut ancestors = Vec::with_capacity(follow_link_ancestors.len() + 1);
443                    ancestors.extend(follow_link_ancestors.iter().cloned());
444                    ancestors.push(path.clone());
445                    Arc::new(ancestors)
446                } else {
447                    follow_link_ancestors
448                };
449
450                let mut dir_entry_results: Vec<_> = fs::read_dir(path.as_ref())
451                    .map_err(|err| Error::from_path(0, path.to_path_buf(), err))?
452                    .filter_map(|dir_entry_result| {
453                        let fs_dir_entry = match dir_entry_result {
454                            Ok(fs_dir_entry) => fs_dir_entry,
455                            Err(err) => {
456                                return Some(Err(Error::from_io(read_dir_contents_depth, err)))
457                            }
458                        };
459
460                        let dir_entry = match DirEntry::from_entry(
461                            read_dir_contents_depth,
462                            path.clone(),
463                            &fs_dir_entry,
464                            follow_link_ancestors.clone(),
465                        ) {
466                            Ok(dir_entry) => dir_entry,
467                            Err(err) => return Some(Err(err)),
468                        };
469
470                        if skip_hidden && is_hidden(&dir_entry.file_name) {
471                            return None;
472                        }
473
474                        Some(process_dir_entry_result(Ok(dir_entry), follow_links))
475                    })
476                    .collect();
477
478                if sort {
479                    dir_entry_results.sort_by(|a, b| match (a, b) {
480                        (Ok(a), Ok(b)) => a.file_name.cmp(&b.file_name),
481                        (Ok(_), Err(_)) => Ordering::Less,
482                        (Err(_), Ok(_)) => Ordering::Greater,
483                        (Err(_), Err(_)) => Ordering::Equal,
484                    });
485                }
486
487                if let Some(process_read_dir) = process_read_dir.as_ref() {
488                    process_read_dir(
489                        Some(read_dir_depth),
490                        path.as_ref(),
491                        &mut client_read_state,
492                        &mut dir_entry_results,
493                    );
494                }
495
496                Ok(ReadDir::new(client_read_state, dir_entry_results))
497            }),
498        )
499    }
500}
501
502impl<C: ClientState> Clone for WalkDirOptions<C> {
503    fn clone(&self) -> WalkDirOptions<C> {
504        WalkDirOptions {
505            sort: false,
506            min_depth: self.min_depth,
507            max_depth: self.max_depth,
508            skip_hidden: self.skip_hidden,
509            follow_links: self.follow_links,
510            parallelism: self.parallelism.clone(),
511            root_read_dir_state: self.root_read_dir_state.clone(),
512            process_read_dir: self.process_read_dir.clone(),
513        }
514    }
515}
516
517impl Parallelism {
518    pub(crate) fn spawn<OP>(&self, op: OP)
519    where
520        OP: FnOnce() + Send + 'static,
521    {
522        match self {
523            Parallelism::Serial => op(),
524            Parallelism::RayonDefaultPool { .. } => rayon::spawn(op),
525            Parallelism::RayonNewPool(num_threads) => {
526                let mut thread_pool = ThreadPoolBuilder::new();
527                if *num_threads > 0 {
528                    thread_pool = thread_pool.num_threads(*num_threads);
529                }
530                if let Ok(thread_pool) = thread_pool.build() {
531                    thread_pool.spawn(op);
532                } else {
533                    rayon::spawn(op);
534                }
535            }
536            Parallelism::RayonExistingPool { pool, .. } => pool.spawn(op),
537        }
538    }
539
540    pub(crate) fn timeout(&self) -> Option<std::time::Duration> {
541        match self {
542            Parallelism::Serial | Parallelism::RayonNewPool(_) => None,
543            Parallelism::RayonDefaultPool { busy_timeout } => Some(*busy_timeout),
544            Parallelism::RayonExistingPool { busy_timeout, .. } => *busy_timeout,
545        }
546    }
547}
548
549fn is_hidden(file_name: &OsStr) -> bool {
550    file_name
551        .to_str()
552        .map(|s| s.starts_with('.'))
553        .unwrap_or(false)
554}
555
556impl<B, E> ClientState for (B, E)
557where
558    B: Clone + Send + Default + Debug + 'static,
559    E: Send + Default + Debug + 'static,
560{
561    type ReadDirState = B;
562    type DirEntryState = E;
563}