cooklang_sync_client/
lib.rs

1use futures::{channel::mpsc::channel, try_join};
2use jsonwebtoken::{decode, Algorithm, DecodingKey, TokenData, Validation};
3use notify::RecursiveMode;
4use serde::{Deserialize, Serialize};
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::runtime::Runtime;
8use tokio::sync::Mutex;
9
10use log::debug;
11
12use crate::chunker::{Chunker, InMemoryCache};
13use crate::file_watcher::async_watcher;
14use crate::indexer::check_index_once;
15use crate::syncer::{check_download_once, check_upload_once};
16
17const CHANNEL_SIZE: usize = 100;
18const INMEMORY_CACHE_MAX_REC: usize = 100000;
19const INMEMORY_CACHE_MAX_MEM: u64 = 100_000_000_000;
20const DUMMY_SECRET: &[u8] = b"dummy_secret";
21
22pub mod chunker;
23pub mod connection;
24pub mod errors;
25pub mod file_watcher;
26pub mod indexer;
27pub mod models;
28pub mod registry;
29pub mod remote;
30pub mod schema;
31pub mod syncer;
32
33pub fn extract_uid_from_jwt(token: &str) -> i32 {
34    let mut validation = Validation::new(Algorithm::HS256);
35
36    // Disabling signature validation because we don't know real secret
37    validation.insecure_disable_signature_validation();
38
39    #[derive(Debug, Serialize, Deserialize)]
40    struct Claims {
41        uid: i32,
42    }
43
44    let token_data: TokenData<Claims> =
45        decode::<Claims>(token, &DecodingKey::from_secret(DUMMY_SECRET), &validation)
46            .expect("Failed to decode token");
47
48    token_data.claims.uid
49}
50
51uniffi::setup_scaffolding!();
52
53/// Synchronous alias to async run function.
54/// Intended to used by external (written in other languages) callers.
55#[uniffi::export]
56pub fn run(
57    storage_dir: &str,
58    db_file_path: &str,
59    api_endpoint: &str,
60    remote_token: &str,
61    namespace_id: i32,
62    download_only: bool,
63) -> Result<(), errors::SyncError> {
64    Runtime::new()?.block_on(run_async(
65        storage_dir,
66        db_file_path,
67        api_endpoint,
68        remote_token,
69        namespace_id,
70        download_only,
71    ))?;
72
73    Ok(())
74}
75
76/// Connects to the server and waits either when `wait_time` expires or
77/// when there's a remote update for this client.
78/// Note, it doesn't do the update itself, you need to use `run_download_once`
79/// after this function completes.
80#[uniffi::export]
81pub fn wait_remote_update(
82    api_endpoint: &str,
83    remote_token: &str
84) -> Result<(), errors::SyncError> {
85    Runtime::new()?.block_on(remote::Remote::new(api_endpoint, remote_token).poll())?;
86
87    Ok(())
88}
89
90/// Runs one-off download of updates from remote server.
91/// Note, it's not very efficient as requires to re-initialize DB connection,
92/// chunker, remote client, etc every time it runs.
93#[uniffi::export]
94pub fn run_download_once(
95    storage_dir: &str,
96    db_file_path: &str,
97    api_endpoint: &str,
98    remote_token: &str,
99    namespace_id: i32,
100) -> Result<(), errors::SyncError> {
101    use std::env;
102
103    env::set_var("CARGO_LOG", "trace");
104
105    let storage_dir = &PathBuf::from(storage_dir);
106    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
107    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
108    let chunker = Arc::new(Mutex::new(chunker));
109    let remote = &remote::Remote::new(api_endpoint, remote_token);
110
111    let pool = connection::get_connection_pool(db_file_path)?;
112    debug!("Started connection pool for {:?}", db_file_path);
113
114    Runtime::new()?.block_on(check_download_once(
115        &pool,
116        Arc::clone(&chunker),
117        remote,
118        storage_dir,
119        namespace_id,
120    ))?;
121
122    Ok(())
123}
124
125/// Runs one-off upload of updates to remote server.
126/// Note, it's not very efficient as requires to re-initialize DB connection,
127/// chunker, remote client, etc every time it runs.
128#[uniffi::export]
129pub fn run_upload_once(
130    storage_dir: &str,
131    db_file_path: &str,
132    api_endpoint: &str,
133    remote_token: &str,
134    namespace_id: i32,
135) -> Result<(), errors::SyncError> {
136    let storage_dir = &PathBuf::from(storage_dir);
137    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
138    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
139    let chunker = Arc::new(Mutex::new(chunker));
140    let remote = &remote::Remote::new(api_endpoint, remote_token);
141
142    let pool = connection::get_connection_pool(db_file_path)?;
143    debug!("Started connection pool for {:?}", db_file_path);
144
145    check_index_once(&pool, storage_dir, namespace_id)?;
146
147    let runtime = Runtime::new()?;
148
149    // It requires first pass to upload missing chunks and second to
150    // commit and update `jid` to local records.
151    if !runtime.block_on(check_upload_once(
152        &pool,
153        Arc::clone(&chunker),
154        remote,
155        namespace_id,
156    ))? {
157        runtime.block_on(check_upload_once(
158            &pool,
159            Arc::clone(&chunker),
160            remote,
161            namespace_id,
162        ))?;
163    }
164
165    Ok(())
166}
167
168/// Runs local files watch and sync from/to remote continuously.
169pub async fn run_async(
170    storage_dir: &str,
171    db_file_path: &str,
172    api_endpoint: &str,
173    remote_token: &str,
174    namespace_id: i32,
175    download_only: bool,
176) -> Result<(), errors::SyncError> {
177    let (mut debouncer, local_file_update_rx) = async_watcher()?;
178    let (local_registry_updated_tx, local_registry_updated_rx) = channel(CHANNEL_SIZE);
179
180    let storage_dir = &PathBuf::from(storage_dir);
181    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
182    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
183    let remote = &remote::Remote::new(api_endpoint, remote_token);
184
185    let pool = connection::get_connection_pool(db_file_path)?;
186    debug!("Started connection pool for {:?}", db_file_path);
187
188    if !download_only {
189        debouncer
190            .watcher()
191            .watch(storage_dir, RecursiveMode::Recursive)?;
192        debug!("Started watcher on {:?}", storage_dir);
193    }
194
195    let indexer = indexer::run(
196        &pool,
197        storage_dir,
198        namespace_id,
199        local_file_update_rx,
200        local_registry_updated_tx,
201    );
202    debug!("Started indexer on {:?}", storage_dir);
203
204    let syncer = syncer::run(
205        &pool,
206        storage_dir,
207        namespace_id,
208        chunker,
209        remote,
210        local_registry_updated_rx,
211        download_only,
212    );
213    debug!("Started syncer");
214
215    let _ = try_join!(indexer, syncer)?;
216
217    Ok(())
218}