cooklang_sync_client/
indexer.rs1use futures::{
2 channel::mpsc::{Receiver, Sender},
3 SinkExt, StreamExt,
4};
5use std::collections::HashMap;
6use std::path::Path;
7use walkdir::WalkDir;
8
9use notify_debouncer_mini::DebounceEventResult;
10use time::OffsetDateTime;
11use tokio::time::Duration;
12
13use log::debug;
14
15use crate::chunker;
16use crate::connection::{get_connection, ConnectionPool};
17use crate::errors::SyncError;
18use crate::models::*;
19use crate::registry;
20
21type DBFiles = HashMap<String, FileRecord>;
22type DiskFiles = HashMap<String, CreateForm>;
23
24const CHECK_INTERVAL_WAIT_SEC: Duration = Duration::from_secs(61);
25
26pub async fn run(
34 pool: &ConnectionPool,
35 storage_path: &Path,
36 namespace_id: i32,
37 mut local_file_update_rx: Receiver<DebounceEventResult>,
38 mut updated_tx: Sender<IndexerUpdateEvent>,
39) -> Result<(), SyncError> {
40 loop {
41 if check_index_once(pool, storage_path, namespace_id)? {
42 updated_tx.send(IndexerUpdateEvent::Updated).await?;
43 }
44
45 tokio::select! {
46 _ = tokio::time::sleep(CHECK_INTERVAL_WAIT_SEC) => {},
47 Some(_) = local_file_update_rx.next() => {},
48 };
49 }
50}
51
52pub fn check_index_once(
53 pool: &ConnectionPool,
54 storage_path: &Path,
55 namespace_id: i32,
56) -> Result<bool, SyncError> {
57 debug!("interval scan");
58
59 let from_db = get_file_records_from_registry(pool, namespace_id)?;
60 let from_fs = get_file_records_from_disk(storage_path, namespace_id)?;
61
62 let (to_remove, to_add) = compare_records(from_db, from_fs, namespace_id);
63
64 if !to_remove.is_empty() || !to_add.is_empty() {
65 let conn = &mut get_connection(pool)?;
66
67 if !to_remove.is_empty() {
68 registry::delete(conn, &to_remove)?;
69 }
70
71 if !to_add.is_empty() {
72 registry::create(conn, &to_add)?;
73 }
74
75 Ok(true)
76 } else {
77 Ok(false)
78 }
79}
80
81fn filter_eligible(p: &Path) -> bool {
82 if p.is_symlink() {
84 return false;
85 }
86 chunker::is_text(p) || chunker::is_binary(p)
87}
88
89fn get_file_records_from_disk(base_path: &Path, namespace_id: i32) -> Result<DiskFiles, SyncError> {
90 let mut cache = HashMap::new();
91
92 let iter = WalkDir::new(base_path)
93 .into_iter()
94 .filter_map(|e| e.ok())
95 .map(|p| p.into_path())
96 .filter(|p| filter_eligible(p));
97
98 for p in iter {
99 let record = build_file_record(&p, base_path, namespace_id)?;
100
101 cache.insert(record.path.clone(), record);
102 }
103
104 Ok(cache)
105}
106
107fn get_file_records_from_registry(
108 pool: &ConnectionPool,
109 namespace_id: i32,
110) -> Result<DBFiles, SyncError> {
111 let mut cache = HashMap::new();
112
113 let conn = &mut get_connection(pool)?;
114
115 for record in registry::non_deleted(conn, namespace_id)? {
116 cache.insert(record.path.clone(), record);
117 }
118
119 Ok(cache)
120}
121
122fn compare_records(
123 from_db: DBFiles,
124 from_fs: DiskFiles,
125 namespace_id: i32,
126) -> (Vec<DeleteForm>, Vec<CreateForm>) {
127 let mut to_remove: Vec<DeleteForm> = Vec::new();
128 let mut to_add: Vec<CreateForm> = Vec::new();
129
130 for (p, db_file) in &from_db {
131 match from_fs.get(p) {
132 Some(disk_file) => {
136 if db_file != disk_file {
137 to_add.push(disk_file.clone());
138 }
139 }
140 None => {
143 to_remove.push(build_delete_form(db_file, namespace_id));
144 }
145 }
146 }
147
148 for (p, disk_file) in &from_fs {
149 if !from_db.contains_key(p) {
150 to_add.push(disk_file.clone());
151 }
152 }
153
154 (to_remove, to_add)
155}
156
157fn build_file_record(path: &Path, base: &Path, namespace_id: i32) -> Result<CreateForm, SyncError> {
158 let metadata = path
159 .metadata()
160 .map_err(|e| SyncError::from_io_error(path, e))?;
161 let path = path.strip_prefix(base)?.to_string_lossy().into_owned();
163 let size: i64 = metadata.len().try_into()?;
164 let time = metadata
165 .modified()
166 .map_err(|e| SyncError::from_io_error(path.clone(), e))?;
167 let modified_at = OffsetDateTime::from(time);
168
169 let f = CreateForm {
170 jid: None,
171 path,
172 deleted: false,
173 size,
174 modified_at,
175 namespace_id,
176 };
177
178 Ok(f)
179}
180
181fn build_delete_form(record: &FileRecord, namespace_id: i32) -> DeleteForm {
182 DeleteForm {
183 path: record.path.to_string(),
184 jid: None,
185 deleted: true,
186 size: record.size,
187 modified_at: record.modified_at,
188 namespace_id,
189 }
190}