mod cache;
mod time;
#[cfg(test)]
mod testing;
use std::{
cmp::Reverse,
collections::{BinaryHeap, HashMap, VecDeque},
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
};
use time::now;
pub use cache::{FileIdCache, FileIdMap, NoCache, RecommendedCache};
pub use file_id;
pub use notify;
pub use notify_types::debouncer_full::DebouncedEvent;
use file_id::FileId;
use notify::{
event::{ModifyKind, RemoveKind, RenameMode},
Error, ErrorKind, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, WatcherKind,
};
pub trait DebounceEventHandler: Send + 'static {
fn handle_event(&mut self, event: DebounceEventResult);
}
impl<F> DebounceEventHandler for F
where
F: FnMut(DebounceEventResult) + Send + 'static,
{
fn handle_event(&mut self, event: DebounceEventResult) {
(self)(event);
}
}
#[cfg(feature = "crossbeam-channel")]
impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
fn handle_event(&mut self, event: DebounceEventResult) {
let _ = self.send(event);
}
}
impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
fn handle_event(&mut self, event: DebounceEventResult) {
let _ = self.send(event);
}
}
pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Vec<Error>>;
type DebounceData<T> = Arc<Mutex<DebounceDataInner<T>>>;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
struct Queue {
events: VecDeque<DebouncedEvent>,
}
impl Queue {
fn was_created(&self) -> bool {
self.events.front().map_or(false, |event| {
matches!(
event.kind,
EventKind::Create(_) | EventKind::Modify(ModifyKind::Name(RenameMode::To))
)
})
}
fn was_removed(&self) -> bool {
self.events.front().map_or(false, |event| {
matches!(
event.kind,
EventKind::Remove(_) | EventKind::Modify(ModifyKind::Name(RenameMode::From))
)
})
}
}
#[derive(Debug)]
pub(crate) struct DebounceDataInner<T> {
queues: HashMap<PathBuf, Queue>,
roots: Vec<(PathBuf, RecursiveMode)>,
cache: T,
rename_event: Option<(DebouncedEvent, Option<FileId>)>,
rescan_event: Option<DebouncedEvent>,
errors: Vec<Error>,
timeout: Duration,
}
impl<T: FileIdCache> DebounceDataInner<T> {
pub(crate) fn new(cache: T, timeout: Duration) -> Self {
Self {
queues: HashMap::new(),
roots: Vec::new(),
cache,
rename_event: None,
rescan_event: None,
errors: Vec::new(),
timeout,
}
}
pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
let now = now();
let mut events_expired = Vec::with_capacity(self.queues.len());
let mut queues_remaining = HashMap::with_capacity(self.queues.len());
if let Some(event) = self.rescan_event.take() {
if now.saturating_duration_since(event.time) >= self.timeout {
log::trace!("debounced event: {event:?}");
events_expired.push(event);
} else {
self.rescan_event = Some(event);
}
}
for (path, mut queue) in self.queues.drain() {
let mut kind_index = HashMap::new();
while let Some(event) = queue.events.pop_front() {
if now.saturating_duration_since(event.time) >= self.timeout {
if let Some(idx) = kind_index.get(&event.kind).copied() {
events_expired.remove(idx);
kind_index.values_mut().for_each(|i| {
if *i > idx {
*i -= 1
}
})
}
kind_index.insert(event.kind, events_expired.len());
events_expired.push(event);
} else {
queue.events.push_front(event);
break;
}
}
if !queue.events.is_empty() {
queues_remaining.insert(path, queue);
}
}
self.queues = queues_remaining;
sort_events(events_expired)
}
pub fn errors(&mut self) -> Vec<Error> {
std::mem::take(&mut self.errors)
}
pub fn add_error(&mut self, error: Error) {
log::trace!("raw error: {error:?}");
self.errors.push(error);
}
pub fn add_event(&mut self, event: Event) {
log::trace!("raw event: {event:?}");
if event.need_rescan() {
self.cache.rescan(&self.roots);
self.rescan_event = Some(DebouncedEvent { event, time: now() });
return;
}
let path = &event.paths[0];
match &event.kind {
EventKind::Create(_) => {
let recursive_mode = self.recursive_mode(path);
self.cache.add_path(path, recursive_mode);
self.push_event(event, now());
}
EventKind::Modify(ModifyKind::Name(rename_mode)) => {
match rename_mode {
RenameMode::Any => {
if event.paths[0].exists() {
self.handle_rename_to(event);
} else {
self.handle_rename_from(event);
}
}
RenameMode::To => {
self.handle_rename_to(event);
}
RenameMode::From => {
self.handle_rename_from(event);
}
RenameMode::Both => {
}
RenameMode::Other => {
}
}
}
EventKind::Remove(_) => {
self.push_remove_event(event, now());
}
EventKind::Other => {
}
_ => {
if self.cache.cached_file_id(path).is_none() {
let recursive_mode = self.recursive_mode(path);
self.cache.add_path(path, recursive_mode);
}
self.push_event(event, now());
}
}
}
fn recursive_mode(&mut self, path: &Path) -> RecursiveMode {
self.roots
.iter()
.find_map(|(root, recursive_mode)| {
if path.starts_with(root) {
Some(*recursive_mode)
} else {
None
}
})
.unwrap_or(RecursiveMode::NonRecursive)
}
fn handle_rename_from(&mut self, event: Event) {
let time = now();
let path = &event.paths[0];
let file_id = self.cache.cached_file_id(path).cloned();
self.rename_event = Some((DebouncedEvent::new(event.clone(), time), file_id));
self.cache.remove_path(path);
self.push_event(event, time);
}
fn handle_rename_to(&mut self, event: Event) {
let recursive_mode = self.recursive_mode(&event.paths[0]);
self.cache.add_path(&event.paths[0], recursive_mode);
let trackers_match = self
.rename_event
.as_ref()
.and_then(|(e, _)| e.tracker())
.and_then(|from_tracker| {
event
.attrs
.tracker()
.map(|to_tracker| from_tracker == to_tracker)
})
.unwrap_or_default();
let file_ids_match = self
.rename_event
.as_ref()
.and_then(|(_, id)| id.as_ref())
.and_then(|from_file_id| {
self.cache
.cached_file_id(&event.paths[0])
.map(|to_file_id| from_file_id == to_file_id)
})
.unwrap_or_default();
if trackers_match || file_ids_match {
let (mut rename_event, _) = self.rename_event.take().unwrap(); let path = rename_event.paths.remove(0);
let time = rename_event.time;
self.push_rename_event(path, event, time);
} else {
self.push_event(event, now());
}
self.rename_event = None;
}
fn push_rename_event(&mut self, path: PathBuf, event: Event, time: Instant) {
self.cache.remove_path(&path);
let mut source_queue = self.queues.remove(&path).unwrap_or_default();
source_queue.events.pop_back();
let (remove_index, original_path, original_time) = source_queue
.events
.iter()
.enumerate()
.find_map(|(index, e)| {
if matches!(
e.kind,
EventKind::Modify(ModifyKind::Name(RenameMode::Both))
) {
Some((Some(index), e.paths[0].clone(), e.time))
} else {
None
}
})
.unwrap_or((None, path, time));
if let Some(remove_index) = remove_index {
source_queue.events.remove(remove_index);
}
if source_queue.was_removed() {
let event = source_queue.events.pop_front().unwrap();
self.queues.insert(
event.paths[0].clone(),
Queue {
events: [event].into(),
},
);
}
for e in &mut source_queue.events {
e.paths = vec![event.paths[0].clone()];
}
if !source_queue.was_created() {
source_queue.events.push_front(DebouncedEvent {
event: Event {
kind: EventKind::Modify(ModifyKind::Name(RenameMode::Both)),
paths: vec![original_path, event.paths[0].clone()],
attrs: event.attrs,
},
time: original_time,
});
}
if let Some(target_queue) = self.queues.get_mut(&event.paths[0]) {
if !target_queue.was_created() {
let mut remove_event = DebouncedEvent {
event: Event {
kind: EventKind::Remove(RemoveKind::Any),
paths: vec![event.paths[0].clone()],
attrs: Default::default(),
},
time: original_time,
};
if !target_queue.was_removed() {
remove_event.event = remove_event.event.set_info("override");
}
source_queue.events.push_front(remove_event);
}
*target_queue = source_queue;
} else {
self.queues.insert(event.paths[0].clone(), source_queue);
}
}
fn push_remove_event(&mut self, event: Event, time: Instant) {
let path = &event.paths[0];
self.queues.retain(|p, _| !p.starts_with(path) || p == path);
self.cache.remove_path(path);
match self.queues.get_mut(path) {
Some(queue) if queue.was_created() => {
self.queues.remove(path);
}
Some(queue) => {
queue.events = [DebouncedEvent::new(event, time)].into();
}
None => {
self.push_event(event, time);
}
}
}
fn push_event(&mut self, event: Event, time: Instant) {
let path = &event.paths[0];
if let Some(queue) = self.queues.get_mut(path) {
if match event.kind {
EventKind::Modify(ModifyKind::Data(_) | ModifyKind::Metadata(_))
| EventKind::Create(_) => !queue.was_created(),
_ => true,
} {
queue.events.push_back(DebouncedEvent::new(event, time));
}
} else {
self.queues.insert(
path.to_path_buf(),
Queue {
events: [DebouncedEvent::new(event, time)].into(),
},
);
}
}
}
#[derive(Debug)]
pub struct Debouncer<T: Watcher, C: FileIdCache> {
watcher: T,
debouncer_thread: Option<std::thread::JoinHandle<()>>,
data: DebounceData<C>,
stop: Arc<AtomicBool>,
}
impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
pub fn stop(mut self) {
self.set_stop();
if let Some(t) = self.debouncer_thread.take() {
let _ = t.join();
}
}
pub fn stop_nonblocking(self) {
self.set_stop();
}
fn set_stop(&self) {
self.stop.store(true, Ordering::Relaxed);
}
#[deprecated = "`Debouncer` provides all methods from `Watcher` itself now. Remove `.watcher()` and use those methods directly."]
pub fn watcher(&mut self) {}
#[deprecated = "`Debouncer` now manages root paths automatically. Remove all calls to `add_root` and `remove_root`."]
pub fn cache(&mut self) {}
fn add_root(&mut self, path: impl Into<PathBuf>, recursive_mode: RecursiveMode) {
let path = path.into();
let mut data = self.data.lock().unwrap();
if data.roots.iter().any(|(p, _)| p == &path) {
return;
}
data.roots.push((path.clone(), recursive_mode));
data.cache.add_path(&path, recursive_mode);
}
fn remove_root(&mut self, path: impl AsRef<Path>) {
let mut data = self.data.lock().unwrap();
data.roots.retain(|(root, _)| !root.starts_with(&path));
data.cache.remove_path(path.as_ref());
}
pub fn watch(
&mut self,
path: impl AsRef<Path>,
recursive_mode: RecursiveMode,
) -> notify::Result<()> {
self.watcher.watch(path.as_ref(), recursive_mode)?;
self.add_root(path.as_ref(), recursive_mode);
Ok(())
}
pub fn unwatch(&mut self, path: impl AsRef<Path>) -> notify::Result<()> {
self.watcher.unwatch(path.as_ref())?;
self.remove_root(path);
Ok(())
}
pub fn configure(&mut self, option: notify::Config) -> notify::Result<bool> {
self.watcher.configure(option)
}
pub fn kind() -> WatcherKind
where
Self: Sized,
{
T::kind()
}
}
impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
fn drop(&mut self) {
self.set_stop();
}
}
pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
timeout: Duration,
tick_rate: Option<Duration>,
mut event_handler: F,
file_id_cache: C,
config: notify::Config,
) -> Result<Debouncer<T, C>, Error> {
let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
let stop = Arc::new(AtomicBool::new(false));
let tick_div = 4;
let tick = match tick_rate {
Some(v) => {
if v > timeout {
return Err(Error::new(ErrorKind::Generic(format!(
"Invalid tick_rate, tick rate {:?} > {:?} timeout!",
v, timeout
))));
}
v
}
None => timeout.checked_div(tick_div).ok_or_else(|| {
Error::new(ErrorKind::Generic(format!(
"Failed to calculate tick as {:?}/{}!",
timeout, tick_div
)))
})?,
};
let data_c = data.clone();
let stop_c = stop.clone();
let thread = std::thread::Builder::new()
.name("notify-rs debouncer loop".to_string())
.spawn(move || loop {
if stop_c.load(Ordering::Acquire) {
break;
}
std::thread::sleep(tick);
let send_data;
let errors;
{
let mut lock = data_c.lock().unwrap();
send_data = lock.debounced_events();
errors = lock.errors();
}
if !send_data.is_empty() {
event_handler.handle_event(Ok(send_data));
}
if !errors.is_empty() {
event_handler.handle_event(Err(errors));
}
})?;
let data_c = data.clone();
let watcher = T::new(
move |e: Result<Event, Error>| {
let mut lock = data_c.lock().unwrap();
match e {
Ok(e) => lock.add_event(e),
Err(e) => lock.add_error(e),
}
},
config,
)?;
let guard = Debouncer {
watcher,
debouncer_thread: Some(thread),
data,
stop,
};
Ok(guard)
}
pub fn new_debouncer<F: DebounceEventHandler>(
timeout: Duration,
tick_rate: Option<Duration>,
event_handler: F,
) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, Error> {
new_debouncer_opt::<F, RecommendedWatcher, RecommendedCache>(
timeout,
tick_rate,
event_handler,
RecommendedCache::new(),
notify::Config::default(),
)
}
fn sort_events(events: Vec<DebouncedEvent>) -> Vec<DebouncedEvent> {
let mut sorted = Vec::with_capacity(events.len());
let mut events_by_path: HashMap<_, VecDeque<_>> =
events.into_iter().fold(HashMap::new(), |mut acc, event| {
acc.entry(event.paths.last().cloned().unwrap_or_default())
.or_default()
.push_back(event);
acc
});
let mut min_time_heap = events_by_path
.iter()
.map(|(path, events)| Reverse((events[0].time, path.clone())))
.collect::<BinaryHeap<_>>();
while let Some(Reverse((min_time, path))) = min_time_heap.pop() {
let events = events_by_path.get_mut(&path).unwrap();
let mut push_next = false;
while events.front().is_some_and(|event| event.time <= min_time) {
let event = events.pop_front().unwrap();
sorted.push(event);
push_next = true;
}
if push_next {
if let Some(event) = events.front() {
min_time_heap.push(Reverse((event.time, path)));
}
}
}
sorted
}
#[cfg(test)]
mod tests {
use std::{fs, path::Path};
use super::*;
use pretty_assertions::assert_eq;
use rstest::rstest;
use tempfile::tempdir;
use testing::TestCase;
use time::MockTime;
#[rstest]
fn state(
#[values(
"add_create_event",
"add_create_event_after_remove_event",
"add_create_dir_event_twice",
"add_modify_content_event_after_create_event",
"add_rename_from_event",
"add_rename_from_event_after_create_event",
"add_rename_from_event_after_modify_event",
"add_rename_from_event_after_create_and_modify_event",
"add_rename_from_event_after_rename_from_event",
"add_rename_to_event",
"add_rename_to_dir_event",
"add_rename_from_and_to_event",
"add_rename_from_and_to_event_after_create",
"add_rename_from_and_to_event_after_rename",
"add_rename_from_and_to_event_after_modify_content",
"add_rename_from_and_to_event_override_created",
"add_rename_from_and_to_event_override_modified",
"add_rename_from_and_to_event_override_removed",
"add_rename_from_and_to_event_with_file_ids",
"add_rename_from_and_to_event_with_different_file_ids",
"add_rename_from_and_to_event_with_different_tracker",
"add_rename_both_event",
"add_remove_event",
"add_remove_event_after_create_event",
"add_remove_event_after_modify_event",
"add_remove_event_after_create_and_modify_event",
"add_remove_parent_event_after_remove_child_event",
"add_errors",
"emit_continuous_modify_content_events",
"emit_events_in_chronological_order",
"emit_events_with_a_prepended_rename_event",
"emit_close_events_only_once",
"emit_modify_event_after_close_event",
"emit_needs_rescan_event",
"read_file_id_without_create_event",
"sort_events_chronologically",
"sort_events_with_reordering"
)]
file_name: &str,
) {
let file_content =
fs::read_to_string(Path::new(&format!("./test_cases/{file_name}.hjson"))).unwrap();
let mut test_case = deser_hjson::from_str::<TestCase>(&file_content).unwrap();
let time = now();
MockTime::set_time(time);
let mut state = test_case.state.into_debounce_data_inner(time);
state.roots = vec![(PathBuf::from("/"), RecursiveMode::Recursive)];
let mut prev_event_time = Duration::default();
for event in test_case.events {
let event_time = Duration::from_millis(event.time);
let event = event.into_debounced_event(time, None);
MockTime::advance(event_time - prev_event_time);
prev_event_time = event_time;
state.add_event(event.event);
}
for error in test_case.errors {
let error = error.into_notify_error();
state.add_error(error);
}
let expected_errors = std::mem::take(&mut test_case.expected.errors);
let expected_events = std::mem::take(&mut test_case.expected.events);
let expected_state = test_case.expected.into_debounce_data_inner(time);
assert_eq!(
state.queues, expected_state.queues,
"queues not as expected"
);
assert_eq!(
state.rename_event, expected_state.rename_event,
"rename event not as expected"
);
assert_eq!(
state.rescan_event, expected_state.rescan_event,
"rescan event not as expected"
);
assert_eq!(
state.cache.paths, expected_state.cache.paths,
"cache not as expected"
);
assert_eq!(
state
.errors
.iter()
.map(|e| format!("{:?}", e))
.collect::<Vec<_>>(),
expected_errors
.iter()
.map(|e| format!("{:?}", e.clone().into_notify_error()))
.collect::<Vec<_>>(),
"errors not as expected"
);
let backup_time = now();
let backup_queues = state.queues.clone();
for (delay, events) in expected_events {
MockTime::set_time(backup_time);
state.queues = backup_queues.clone();
match delay.as_str() {
"none" => {}
"short" => MockTime::advance(Duration::from_millis(10)),
"long" => MockTime::advance(Duration::from_millis(100)),
_ => {
if let Ok(ts) = delay.parse::<u64>() {
MockTime::set_time(time + Duration::from_millis(ts));
}
}
}
let events = events
.into_iter()
.map(|event| event.into_debounced_event(time, None))
.collect::<Vec<_>>();
assert_eq!(
state.debounced_events(),
events,
"debounced events after a `{delay}` delay"
);
}
}
#[test]
fn integration() -> Result<(), Box<dyn std::error::Error>> {
let dir = tempdir()?;
let (tx, rx) = std::sync::mpsc::channel();
let mut debouncer = new_debouncer(Duration::from_millis(10), None, tx)?;
debouncer.watch(dir.path(), RecursiveMode::Recursive)?;
fs::write(dir.path().join("file.txt"), b"Lorem ipsum")?;
let events = rx
.recv_timeout(Duration::from_secs(10))
.expect("no events received")
.expect("received an error");
assert!(!events.is_empty(), "received empty event list");
Ok(())
}
}