1use futures::{channel::mpsc::channel, try_join};
2use jsonwebtoken::{decode, Algorithm, DecodingKey, TokenData, Validation};
3use notify::RecursiveMode;
4use serde::{Deserialize, Serialize};
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::runtime::Runtime;
8use tokio::sync::Mutex;
9
10use log::debug;
11
12use crate::chunker::{Chunker, InMemoryCache};
13use crate::file_watcher::async_watcher;
14use crate::indexer::check_index_once;
15use crate::syncer::{check_download_once, check_upload_once};
16
17const CHANNEL_SIZE: usize = 100;
18const INMEMORY_CACHE_MAX_REC: usize = 100000;
19const INMEMORY_CACHE_MAX_MEM: u64 = 100_000_000_000;
20const DUMMY_SECRET: &[u8] = b"dummy_secret";
21
22pub mod chunker;
23pub mod connection;
24pub mod errors;
25pub mod file_watcher;
26pub mod indexer;
27pub mod models;
28pub mod registry;
29pub mod remote;
30pub mod schema;
31pub mod syncer;
32
33pub fn extract_uid_from_jwt(token: &str) -> i32 {
34 let mut validation = Validation::new(Algorithm::HS256);
35
36 validation.insecure_disable_signature_validation();
38
39 #[derive(Debug, Serialize, Deserialize)]
40 struct Claims {
41 uid: i32,
42 }
43
44 let token_data: TokenData<Claims> =
45 decode::<Claims>(token, &DecodingKey::from_secret(DUMMY_SECRET), &validation)
46 .expect("Failed to decode token");
47
48 token_data.claims.uid
49}
50
51uniffi::setup_scaffolding!();
52
53#[uniffi::export]
56pub fn run(
57 storage_dir: &str,
58 db_file_path: &str,
59 api_endpoint: &str,
60 remote_token: &str,
61 namespace_id: i32,
62 download_only: bool,
63) -> Result<(), errors::SyncError> {
64 Runtime::new()?.block_on(run_async(
65 storage_dir,
66 db_file_path,
67 api_endpoint,
68 remote_token,
69 namespace_id,
70 download_only,
71 ))?;
72
73 Ok(())
74}
75
76#[uniffi::export]
81pub fn wait_remote_update(
82 api_endpoint: &str,
83 remote_token: &str
84) -> Result<(), errors::SyncError> {
85 Runtime::new()?.block_on(remote::Remote::new(api_endpoint, remote_token).poll())?;
86
87 Ok(())
88}
89
90#[uniffi::export]
94pub fn run_download_once(
95 storage_dir: &str,
96 db_file_path: &str,
97 api_endpoint: &str,
98 remote_token: &str,
99 namespace_id: i32,
100) -> Result<(), errors::SyncError> {
101 use std::env;
102
103 env::set_var("CARGO_LOG", "trace");
104
105 let storage_dir = &PathBuf::from(storage_dir);
106 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
107 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
108 let chunker = Arc::new(Mutex::new(chunker));
109 let remote = &remote::Remote::new(api_endpoint, remote_token);
110
111 let pool = connection::get_connection_pool(db_file_path)?;
112 debug!("Started connection pool for {:?}", db_file_path);
113
114 Runtime::new()?.block_on(check_download_once(
115 &pool,
116 Arc::clone(&chunker),
117 remote,
118 storage_dir,
119 namespace_id,
120 ))?;
121
122 Ok(())
123}
124
125#[uniffi::export]
129pub fn run_upload_once(
130 storage_dir: &str,
131 db_file_path: &str,
132 api_endpoint: &str,
133 remote_token: &str,
134 namespace_id: i32,
135) -> Result<(), errors::SyncError> {
136 let storage_dir = &PathBuf::from(storage_dir);
137 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
138 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
139 let chunker = Arc::new(Mutex::new(chunker));
140 let remote = &remote::Remote::new(api_endpoint, remote_token);
141
142 let pool = connection::get_connection_pool(db_file_path)?;
143 debug!("Started connection pool for {:?}", db_file_path);
144
145 check_index_once(&pool, storage_dir, namespace_id)?;
146
147 let runtime = Runtime::new()?;
148
149 if !runtime.block_on(check_upload_once(
152 &pool,
153 Arc::clone(&chunker),
154 remote,
155 namespace_id,
156 ))? {
157 runtime.block_on(check_upload_once(
158 &pool,
159 Arc::clone(&chunker),
160 remote,
161 namespace_id,
162 ))?;
163 }
164
165 Ok(())
166}
167
168pub async fn run_async(
170 storage_dir: &str,
171 db_file_path: &str,
172 api_endpoint: &str,
173 remote_token: &str,
174 namespace_id: i32,
175 download_only: bool,
176) -> Result<(), errors::SyncError> {
177 let (mut debouncer, local_file_update_rx) = async_watcher()?;
178 let (local_registry_updated_tx, local_registry_updated_rx) = channel(CHANNEL_SIZE);
179
180 let storage_dir = &PathBuf::from(storage_dir);
181 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
182 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
183 let remote = &remote::Remote::new(api_endpoint, remote_token);
184
185 let pool = connection::get_connection_pool(db_file_path)?;
186 debug!("Started connection pool for {:?}", db_file_path);
187
188 if !download_only {
189 debouncer
190 .watcher()
191 .watch(storage_dir, RecursiveMode::Recursive)?;
192 debug!("Started watcher on {:?}", storage_dir);
193 }
194
195 let indexer = indexer::run(
196 &pool,
197 storage_dir,
198 namespace_id,
199 local_file_update_rx,
200 local_registry_updated_tx,
201 );
202 debug!("Started indexer on {:?}", storage_dir);
203
204 let syncer = syncer::run(
205 &pool,
206 storage_dir,
207 namespace_id,
208 chunker,
209 remote,
210 local_registry_updated_rx,
211 download_only,
212 );
213 debug!("Started syncer");
214
215 let _ = try_join!(indexer, syncer)?;
216
217 Ok(())
218}