cooklang_sync_client/
indexer.rs

1use futures::{
2    channel::mpsc::{Receiver, Sender},
3    SinkExt, StreamExt,
4};
5use std::collections::HashMap;
6use std::path::Path;
7use walkdir::WalkDir;
8
9use notify_debouncer_mini::DebounceEventResult;
10use time::OffsetDateTime;
11use tokio::time::Duration;
12
13use log::debug;
14
15use crate::chunker;
16use crate::connection::{get_connection, ConnectionPool};
17use crate::errors::SyncError;
18use crate::models::*;
19use crate::registry;
20
21type DBFiles = HashMap<String, FileRecord>;
22type DiskFiles = HashMap<String, CreateForm>;
23
24const CHECK_INTERVAL_WAIT_SEC: Duration = Duration::from_secs(61);
25
26/// Indexer main loop. It doesn't manipulate files, but only
27/// compares what we have in filesystem with what we have in DB.
28/// If it finds a difference it will update DB records.
29/// When any change made it will send a message in channel
30/// that Syncer is listening.
31///
32/// It runs both on interval and on any event coming from FS watcher.
33pub async fn run(
34    pool: &ConnectionPool,
35    storage_path: &Path,
36    namespace_id: i32,
37    mut local_file_update_rx: Receiver<DebounceEventResult>,
38    mut updated_tx: Sender<IndexerUpdateEvent>,
39) -> Result<(), SyncError> {
40    loop {
41        if check_index_once(pool, storage_path, namespace_id)? {
42            updated_tx.send(IndexerUpdateEvent::Updated).await?;
43        }
44
45        tokio::select! {
46            _ = tokio::time::sleep(CHECK_INTERVAL_WAIT_SEC) => {},
47            Some(_) = local_file_update_rx.next() => {},
48        };
49    }
50}
51
52pub fn check_index_once(
53    pool: &ConnectionPool,
54    storage_path: &Path,
55    namespace_id: i32,
56) -> Result<bool, SyncError> {
57    debug!("interval scan");
58
59    let from_db = get_file_records_from_registry(pool, namespace_id)?;
60    let from_fs = get_file_records_from_disk(storage_path, namespace_id)?;
61
62    let (to_remove, to_add) = compare_records(from_db, from_fs, namespace_id);
63
64    if !to_remove.is_empty() || !to_add.is_empty() {
65        let conn = &mut get_connection(pool)?;
66
67        if !to_remove.is_empty() {
68            registry::delete(conn, &to_remove)?;
69        }
70
71        if !to_add.is_empty() {
72            registry::create(conn, &to_add)?;
73        }
74
75        Ok(true)
76    } else {
77        Ok(false)
78    }
79}
80
81fn filter_eligible(p: &Path) -> bool {
82    // TODO properly follow symlinks, they can be broken as well
83    if p.is_symlink() {
84        return false;
85    }
86    chunker::is_text(p) || chunker::is_binary(p)
87}
88
89fn get_file_records_from_disk(base_path: &Path, namespace_id: i32) -> Result<DiskFiles, SyncError> {
90    let mut cache = HashMap::new();
91
92    let iter = WalkDir::new(base_path)
93        .into_iter()
94        .filter_map(|e| e.ok())
95        .map(|p| p.into_path())
96        .filter(|p| filter_eligible(p));
97
98    for p in iter {
99        let record = build_file_record(&p, base_path, namespace_id)?;
100
101        cache.insert(record.path.clone(), record);
102    }
103
104    Ok(cache)
105}
106
107fn get_file_records_from_registry(
108    pool: &ConnectionPool,
109    namespace_id: i32,
110) -> Result<DBFiles, SyncError> {
111    let mut cache = HashMap::new();
112
113    let conn = &mut get_connection(pool)?;
114
115    for record in registry::non_deleted(conn, namespace_id)? {
116        cache.insert(record.path.clone(), record);
117    }
118
119    Ok(cache)
120}
121
122fn compare_records(
123    from_db: DBFiles,
124    from_fs: DiskFiles,
125    namespace_id: i32,
126) -> (Vec<DeleteForm>, Vec<CreateForm>) {
127    let mut to_remove: Vec<DeleteForm> = Vec::new();
128    let mut to_add: Vec<CreateForm> = Vec::new();
129
130    for (p, db_file) in &from_db {
131        match from_fs.get(p) {
132            // When file from DB is also present on a disk
133            // we need to check if it was changed and if it was
134            // remove and add file again.
135            Some(disk_file) => {
136                if db_file != disk_file {
137                    to_add.push(disk_file.clone());
138                }
139            }
140            // When file from DB is not present on a disk
141            // we should mark it as deleted in DB
142            None => {
143                to_remove.push(build_delete_form(db_file, namespace_id));
144            }
145        }
146    }
147
148    for (p, disk_file) in &from_fs {
149        if !from_db.contains_key(p) {
150            to_add.push(disk_file.clone());
151        }
152    }
153
154    (to_remove, to_add)
155}
156
157fn build_file_record(path: &Path, base: &Path, namespace_id: i32) -> Result<CreateForm, SyncError> {
158    let metadata = path
159        .metadata()
160        .map_err(|e| SyncError::from_io_error(path, e))?;
161    // we assume that it was already checked and only one of these can be now
162    let path = path.strip_prefix(base)?.to_string_lossy().into_owned();
163    let size: i64 = metadata.len().try_into()?;
164    let time = metadata
165        .modified()
166        .map_err(|e| SyncError::from_io_error(path.clone(), e))?;
167    let modified_at = OffsetDateTime::from(time);
168
169    let f = CreateForm {
170        jid: None,
171        path,
172        deleted: false,
173        size,
174        modified_at,
175        namespace_id,
176    };
177
178    Ok(f)
179}
180
181fn build_delete_form(record: &FileRecord, namespace_id: i32) -> DeleteForm {
182    DeleteForm {
183        path: record.path.to_string(),
184        jid: None,
185        deleted: true,
186        size: record.size,
187        modified_at: record.modified_at,
188        namespace_id,
189    }
190}