jwalk/lib.rs
1#![warn(clippy::all)]
2
3//! Filesystem walk.
4//!
5//! - Performed in parallel using rayon
6//! - Entries streamed in sorted order
7//! - Custom sort/filter/skip/state
8//!
9//! # Example
10//!
11//! Recursively iterate over the "foo" directory sorting by name:
12//!
13//! ```no_run
14//! # use std::io::Error;
15//! use jwalk::{WalkDir};
16//!
17//! # fn try_main() -> Result<(), Error> {
18//! for entry in WalkDir::new("foo").sort(true) {
19//! println!("{}", entry?.path().display());
20//! }
21//! # Ok(())
22//! # }
23//! ```
24//! # Extended Example
25//!
26//! This example uses the
27//! [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir)
28//! callback for custom:
29//! 1. **Sort** Entries by name
30//! 2. **Filter** Errors and hidden files
31//! 3. **Skip** Content of directories at depth 2
32//! 4. **State** Track depth `read_dir_state`. Mark first entry in each
33//! directory with [`client_state`](struct.DirEntry.html#field.client_state)
34//! `= true`.
35//!
36//! ```no_run
37//! # use std::io::Error;
38//! use std::cmp::Ordering;
39//! use jwalk::{ WalkDirGeneric };
40//!
41//! # fn try_main() -> Result<(), Error> {
42//! let walk_dir = WalkDirGeneric::<((usize),(bool))>::new("foo")
43//! .process_read_dir(|depth, path, read_dir_state, children| {
44//! // 1. Custom sort
45//! children.sort_by(|a, b| match (a, b) {
46//! (Ok(a), Ok(b)) => a.file_name.cmp(&b.file_name),
47//! (Ok(_), Err(_)) => Ordering::Less,
48//! (Err(_), Ok(_)) => Ordering::Greater,
49//! (Err(_), Err(_)) => Ordering::Equal,
50//! });
51//! // 2. Custom filter
52//! children.retain(|dir_entry_result| {
53//! dir_entry_result.as_ref().map(|dir_entry| {
54//! dir_entry.file_name
55//! .to_str()
56//! .map(|s| s.starts_with('.'))
57//! .unwrap_or(false)
58//! }).unwrap_or(false)
59//! });
60//! // 3. Custom skip
61//! children.iter_mut().for_each(|dir_entry_result| {
62//! if let Ok(dir_entry) = dir_entry_result {
63//! if dir_entry.depth == 2 {
64//! dir_entry.read_children_path = None;
65//! }
66//! }
67//! });
68//! // 4. Custom state
69//! *read_dir_state += 1;
70//! children.first_mut().map(|dir_entry_result| {
71//! if let Ok(dir_entry) = dir_entry_result {
72//! dir_entry.client_state = true;
73//! }
74//! });
75//! });
76//!
77//! for entry in walk_dir {
78//! println!("{}", entry?.path().display());
79//! }
80//! # Ok(())
81//! # }
82//! ```
83//! # Inspiration
84//!
85//! This crate is inspired by both [`walkdir`](https://crates.io/crates/walkdir)
86//! and [`ignore`](https://crates.io/crates/ignore). It attempts to combine the
87//! parallelism of `ignore` with `walkdir`'s streaming iterator API. Some code,
88//! comments, and test are copied directly from `walkdir`.
89//!
90//! # Implementation
91//!
92//! The following structures are central to the implementation:
93//!
94//! ## `ReadDirSpec`
95//!
96//! Specification of a future `read_dir` operation. These are stored in the
97//! `read_dir_spec_queue` in depth first order. When a rayon thread is ready for
98//! work it pulls the first availible `ReadDirSpec` from this queue.
99//!
100//! ## `ReadDir`
101//!
102//! Result of a `read_dir` operation generated by rayon thread. These results
103//! are stored in the `read_dir_result_queue`, also depth first ordered.
104//!
105//! ## `ReadDirIter`
106//!
107//! Pulls `ReadDir` results from the `read_dir_result_queue`. This iterator is
108//! driven by calling thread. Results are returned in strict depth first order.
109//!
110//! ## `DirEntryIter`
111//!
112//! Wraps a `ReadDirIter` and yields individual `DirEntry` results in strict
113//! depth first order.
114
115mod core;
116
117use rayon::{ThreadPool, ThreadPoolBuilder};
118use std::cmp::Ordering;
119use std::default::Default;
120use std::ffi::OsStr;
121use std::fmt::Debug;
122use std::fs;
123use std::path::{Path, PathBuf};
124use std::sync::Arc;
125
126use crate::core::{ReadDir, ReadDirSpec};
127
128pub use crate::core::{DirEntry, DirEntryIter, Error};
129pub use rayon;
130
131/// Builder for walking a directory.
132pub type WalkDir = WalkDirGeneric<((), ())>;
133
134/// A specialized Result type for WalkDir.
135pub type Result<T> = std::result::Result<T, Error>;
136
137/// Client state maintained while performing walk.
138///
139/// for state stored in DirEntry's
140/// [`client_state`](struct.DirEntry.html#field.client_state) field.
141///
142/// Client state can be stored from within the
143/// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) callback.
144/// The type of ClientState is determined by WalkDirGeneric type parameter.
145pub trait ClientState: Send + Default + Debug + 'static {
146 type ReadDirState: Clone + Send + Default + Debug + 'static;
147 type DirEntryState: Send + Default + Debug + 'static;
148}
149
150/// Generic builder for walking a directory.
151///
152/// [`ClientState`](trait.ClientState.html) type parameter allows you to specify
153/// state to be stored with each DirEntry from within the
154/// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir)
155/// callback.
156///
157/// Use [`WalkDir`](type.WalkDir.html) if you don't need to store client state
158/// into yeilded DirEntries.
159pub struct WalkDirGeneric<C: ClientState> {
160 root: PathBuf,
161 options: WalkDirOptions<C>,
162}
163
164type ProcessReadDirFunction<C> = dyn Fn(Option<usize>, &Path, &mut <C as ClientState>::ReadDirState, &mut Vec<Result<DirEntry<C>>>)
165 + Send
166 + Sync
167 + 'static;
168
169/// Degree of parallelism to use when performing walk.
170///
171/// Parallelism happens at the directory level. It will help when walking deep
172/// filesystems with many directories. It wont help when reading a single
173/// directory with many files.
174///
175/// If you plan to perform lots of per file processing you might want to use Rayon to
176#[derive(Clone)]
177pub enum Parallelism {
178 /// Run on calling thread, similar to what happens in the `walkdir` crate.
179 Serial,
180 /// Run in default rayon thread pool.
181 RayonDefaultPool {
182 /// Define when we consider the rayon default pool too busy to serve our iteration and abort the iteration, defaulting to 1s.
183 ///
184 /// This can happen if `jwalk` is launched from within a par-iter on a pool that only has a single thread,
185 /// or if there are many parallel `jwalk` invocations that all use the same threadpool, rendering it too busy
186 /// to respond within this duration.
187 busy_timeout: std::time::Duration,
188 },
189 /// Run in existing rayon thread pool
190 RayonExistingPool {
191 /// The pool to spawn our work onto.
192 pool: Arc<ThreadPool>,
193 /// Similar to [`Parallelism::RayonDefaultPool::busy_timeout`] if `Some`, but can be `None` to skip the deadlock check
194 /// in case you know that there is at least one free thread available on the pool.
195 busy_timeout: Option<std::time::Duration>,
196 },
197 /// Run in new rayon thread pool with # threads
198 RayonNewPool(usize),
199}
200
201struct WalkDirOptions<C: ClientState> {
202 sort: bool,
203 min_depth: usize,
204 max_depth: usize,
205 skip_hidden: bool,
206 follow_links: bool,
207 parallelism: Parallelism,
208 root_read_dir_state: C::ReadDirState,
209 process_read_dir: Option<Arc<ProcessReadDirFunction<C>>>,
210}
211
212impl<C: ClientState> WalkDirGeneric<C> {
213 /// Create a builder for a recursive directory iterator starting at the file
214 /// path root. If root is a directory, then it is the first item yielded by
215 /// the iterator. If root is a file, then it is the first and only item
216 /// yielded by the iterator.
217 ///
218 /// Note that his iterator can fail on the first element if `into_iter()` is used as it
219 /// has to be infallible. Use [`try_into_iter()`][WalkDirGeneric::try_into_iter()]
220 /// instead for error handling.
221 pub fn new<P: AsRef<Path>>(root: P) -> Self {
222 WalkDirGeneric {
223 root: root.as_ref().to_path_buf(),
224 options: WalkDirOptions {
225 sort: false,
226 min_depth: 0,
227 max_depth: ::std::usize::MAX,
228 skip_hidden: true,
229 follow_links: false,
230 parallelism: Parallelism::RayonDefaultPool {
231 busy_timeout: std::time::Duration::from_secs(1),
232 },
233 root_read_dir_state: C::ReadDirState::default(),
234 process_read_dir: None,
235 },
236 }
237 }
238
239 /// Try to create an iterator or fail if the rayon threadpool (in any configuration) is busy.
240 pub fn try_into_iter(self) -> Result<DirEntryIter<C>> {
241 let iter = self.into_iter();
242 if iter.read_dir_iter.is_none() {
243 Err(Error::busy())
244 } else {
245 Ok(iter)
246 }
247 }
248
249 /// Root path of the walk.
250 pub fn root(&self) -> &Path {
251 &self.root
252 }
253
254 /// Sort entries by `file_name` per directory. Defaults to `false`. Use
255 /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) for custom
256 /// sorting or filtering.
257 pub fn sort(mut self, sort: bool) -> Self {
258 self.options.sort = sort;
259 self
260 }
261
262 /// Skip hidden entries. Enabled by default.
263 pub fn skip_hidden(mut self, skip_hidden: bool) -> Self {
264 self.options.skip_hidden = skip_hidden;
265 self
266 }
267
268 /// Follow symbolic links. By default, this is disabled.
269 ///
270 /// When `yes` is `true`, symbolic links are followed as if they were normal
271 /// directories and files. If a symbolic link is broken or is involved in a
272 /// loop, an error is yielded.
273 ///
274 /// When enabled, the yielded [`DirEntry`] values represent the target of
275 /// the link while the path corresponds to the link. See the [`DirEntry`]
276 /// type for more details.
277 ///
278 /// [`DirEntry`]: struct.DirEntry.html
279 pub fn follow_links(mut self, follow_links: bool) -> Self {
280 self.options.follow_links = follow_links;
281 self
282 }
283
284 /// Set the minimum depth of entries yielded by the iterator.
285 ///
286 /// The smallest depth is `0` and always corresponds to the path given
287 /// to the `new` function on this type. Its direct descendents have depth
288 /// `1`, and their descendents have depth `2`, and so on.
289 pub fn min_depth(mut self, depth: usize) -> Self {
290 self.options.min_depth = depth;
291 if self.options.min_depth > self.options.max_depth {
292 self.options.min_depth = self.options.max_depth;
293 }
294 self
295 }
296
297 /// Set the maximum depth of entries yield by the iterator.
298 ///
299 /// The smallest depth is `0` and always corresponds to the path given
300 /// to the `new` function on this type. Its direct descendents have depth
301 /// `1`, and their descendents have depth `2`, and so on.
302 ///
303 /// A depth < 2 will automatically change `parallelism` to
304 /// `Parallelism::Serial`. Parrallelism happens at the `fs::read_dir` level.
305 /// It only makes sense to use multiple threads when reading more then one
306 /// directory.
307 ///
308 /// Note that this will not simply filter the entries of the iterator, but
309 /// it will actually avoid descending into directories when the depth is
310 /// exceeded.
311 pub fn max_depth(mut self, depth: usize) -> Self {
312 self.options.max_depth = depth;
313 if self.options.max_depth < self.options.min_depth {
314 self.options.max_depth = self.options.min_depth;
315 }
316 if self.options.max_depth < 2 {
317 self.options.parallelism = Parallelism::Serial;
318 }
319 self
320 }
321
322 /// Degree of parallelism to use when performing walk. Defaults to
323 /// [`Parallelism::RayonDefaultPool`](enum.Parallelism.html#variant.RayonDefaultPool).
324 pub fn parallelism(mut self, parallelism: Parallelism) -> Self {
325 self.options.parallelism = parallelism;
326 self
327 }
328
329 /// Initial ClientState::ReadDirState that is passed to
330 /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir)
331 /// when processing root. Defaults to ClientState::ReadDirState::default().
332 pub fn root_read_dir_state(mut self, read_dir_state: C::ReadDirState) -> Self {
333 self.options.root_read_dir_state = read_dir_state;
334 self
335 }
336
337 /// A callback function to process (sort/filter/skip/state) each directory
338 /// of entries before they are yielded. Modify the given array to
339 /// sort/filter entries. Use [`entry.read_children_path =
340 /// None`](struct.DirEntry.html#field.read_children_path) to yield a
341 /// directory entry but skip reading its contents. Use
342 /// [`entry.client_state`](struct.DirEntry.html#field.client_state)
343 /// to store custom state with an entry.
344 pub fn process_read_dir<F>(mut self, process_by: F) -> Self
345 where
346 F: Fn(Option<usize>, &Path, &mut C::ReadDirState, &mut Vec<Result<DirEntry<C>>>)
347 + Send
348 + Sync
349 + 'static,
350 {
351 self.options.process_read_dir = Some(Arc::new(process_by));
352 self
353 }
354}
355
356fn process_dir_entry_result<C: ClientState>(
357 dir_entry_result: Result<DirEntry<C>>,
358 follow_links: bool,
359) -> Result<DirEntry<C>> {
360 match dir_entry_result {
361 Ok(mut dir_entry) => {
362 if follow_links && dir_entry.file_type.is_symlink() {
363 dir_entry = dir_entry.follow_symlink()?;
364 }
365
366 if dir_entry.depth == 0 && dir_entry.file_type.is_symlink() {
367 // As a special case, if we are processing a root entry, then we
368 // always follow it even if it's a symlink and follow_links is
369 // false. We are careful to not let this change the semantics of
370 // the DirEntry however. Namely, the DirEntry should still
371 // respect the follow_links setting. When it's disabled, it
372 // should report itself as a symlink. When it's enabled, it
373 // should always report itself as the target.
374 let metadata = fs::metadata(dir_entry.path())
375 .map_err(|err| Error::from_path(0, dir_entry.path(), err))?;
376 if metadata.file_type().is_dir() {
377 dir_entry.read_children_path = Some(Arc::from(dir_entry.path()));
378 }
379 }
380
381 Ok(dir_entry)
382 }
383 Err(err) => Err(err),
384 }
385}
386
387impl<C: ClientState> IntoIterator for WalkDirGeneric<C> {
388 type Item = Result<DirEntry<C>>;
389 type IntoIter = DirEntryIter<C>;
390
391 fn into_iter(self) -> DirEntryIter<C> {
392 let sort = self.options.sort;
393 let max_depth = self.options.max_depth;
394 let min_depth = self.options.min_depth;
395 let parallelism = self.options.parallelism;
396 let skip_hidden = self.options.skip_hidden;
397 let follow_links = self.options.follow_links;
398 let process_read_dir = self.options.process_read_dir.clone();
399 let mut root_read_dir_state = self.options.root_read_dir_state;
400 let follow_link_ancestors = if follow_links {
401 Arc::new(vec![Arc::from(self.root.clone()) as Arc<Path>])
402 } else {
403 Arc::new(vec![])
404 };
405
406 let root_entry = DirEntry::from_path(0, &self.root, false, follow_link_ancestors);
407 let root_parent_path = root_entry
408 .as_ref()
409 .map(|root| root.parent_path().to_owned())
410 .unwrap_or_default();
411 let mut root_entry_results = vec![process_dir_entry_result(root_entry, follow_links)];
412 if let Some(process_read_dir) = process_read_dir.as_ref() {
413 process_read_dir(
414 None,
415 &root_parent_path,
416 &mut root_read_dir_state,
417 &mut root_entry_results,
418 );
419 }
420
421 DirEntryIter::new(
422 root_entry_results,
423 parallelism,
424 min_depth,
425 root_read_dir_state,
426 Arc::new(move |read_dir_spec| {
427 let ReadDirSpec {
428 path,
429 depth,
430 mut client_read_state,
431 mut follow_link_ancestors,
432 } = read_dir_spec;
433
434 let read_dir_depth = depth;
435 let read_dir_contents_depth = depth + 1;
436
437 if read_dir_contents_depth > max_depth {
438 return Ok(ReadDir::new(client_read_state, Vec::new()));
439 }
440
441 follow_link_ancestors = if follow_links {
442 let mut ancestors = Vec::with_capacity(follow_link_ancestors.len() + 1);
443 ancestors.extend(follow_link_ancestors.iter().cloned());
444 ancestors.push(path.clone());
445 Arc::new(ancestors)
446 } else {
447 follow_link_ancestors
448 };
449
450 let mut dir_entry_results: Vec<_> = fs::read_dir(path.as_ref())
451 .map_err(|err| Error::from_path(0, path.to_path_buf(), err))?
452 .filter_map(|dir_entry_result| {
453 let fs_dir_entry = match dir_entry_result {
454 Ok(fs_dir_entry) => fs_dir_entry,
455 Err(err) => {
456 return Some(Err(Error::from_io(read_dir_contents_depth, err)))
457 }
458 };
459
460 let dir_entry = match DirEntry::from_entry(
461 read_dir_contents_depth,
462 path.clone(),
463 &fs_dir_entry,
464 follow_link_ancestors.clone(),
465 ) {
466 Ok(dir_entry) => dir_entry,
467 Err(err) => return Some(Err(err)),
468 };
469
470 if skip_hidden && is_hidden(&dir_entry.file_name) {
471 return None;
472 }
473
474 Some(process_dir_entry_result(Ok(dir_entry), follow_links))
475 })
476 .collect();
477
478 if sort {
479 dir_entry_results.sort_by(|a, b| match (a, b) {
480 (Ok(a), Ok(b)) => a.file_name.cmp(&b.file_name),
481 (Ok(_), Err(_)) => Ordering::Less,
482 (Err(_), Ok(_)) => Ordering::Greater,
483 (Err(_), Err(_)) => Ordering::Equal,
484 });
485 }
486
487 if let Some(process_read_dir) = process_read_dir.as_ref() {
488 process_read_dir(
489 Some(read_dir_depth),
490 path.as_ref(),
491 &mut client_read_state,
492 &mut dir_entry_results,
493 );
494 }
495
496 Ok(ReadDir::new(client_read_state, dir_entry_results))
497 }),
498 )
499 }
500}
501
502impl<C: ClientState> Clone for WalkDirOptions<C> {
503 fn clone(&self) -> WalkDirOptions<C> {
504 WalkDirOptions {
505 sort: false,
506 min_depth: self.min_depth,
507 max_depth: self.max_depth,
508 skip_hidden: self.skip_hidden,
509 follow_links: self.follow_links,
510 parallelism: self.parallelism.clone(),
511 root_read_dir_state: self.root_read_dir_state.clone(),
512 process_read_dir: self.process_read_dir.clone(),
513 }
514 }
515}
516
517impl Parallelism {
518 pub(crate) fn spawn<OP>(&self, op: OP)
519 where
520 OP: FnOnce() + Send + 'static,
521 {
522 match self {
523 Parallelism::Serial => op(),
524 Parallelism::RayonDefaultPool { .. } => rayon::spawn(op),
525 Parallelism::RayonNewPool(num_threads) => {
526 let mut thread_pool = ThreadPoolBuilder::new();
527 if *num_threads > 0 {
528 thread_pool = thread_pool.num_threads(*num_threads);
529 }
530 if let Ok(thread_pool) = thread_pool.build() {
531 thread_pool.spawn(op);
532 } else {
533 rayon::spawn(op);
534 }
535 }
536 Parallelism::RayonExistingPool { pool, .. } => pool.spawn(op),
537 }
538 }
539
540 pub(crate) fn timeout(&self) -> Option<std::time::Duration> {
541 match self {
542 Parallelism::Serial | Parallelism::RayonNewPool(_) => None,
543 Parallelism::RayonDefaultPool { busy_timeout } => Some(*busy_timeout),
544 Parallelism::RayonExistingPool { busy_timeout, .. } => *busy_timeout,
545 }
546 }
547}
548
549fn is_hidden(file_name: &OsStr) -> bool {
550 file_name
551 .to_str()
552 .map(|s| s.starts_with('.'))
553 .unwrap_or(false)
554}
555
556impl<B, E> ClientState for (B, E)
557where
558 B: Clone + Send + Default + Debug + 'static,
559 E: Send + Default + Debug + 'static,
560{
561 type ReadDirState = B;
562 type DirEntryState = E;
563}