1use serde::{Deserialize, Serialize};
2use uuid::Uuid;
3use std::path::Path;
4use path_slash::PathExt as _;
5
6use log::trace;
7
8use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
9use reqwest::StatusCode;
10use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
11
12use futures::{Stream, StreamExt};
13
14use crate::errors::SyncError;
15type Result<T, E = SyncError> = std::result::Result<T, E>;
16
17pub const REQUEST_TIMEOUT_SECS: u64 = 60;
18
19#[derive(Deserialize, Serialize, Debug)]
20pub struct ResponseFileRecord {
21 pub id: i32,
22 pub path: String,
23 pub deleted: bool,
24 pub chunk_ids: String,
25}
26
27#[derive(Debug, Deserialize, Serialize)]
28pub enum CommitResultStatus {
29 Success(i32),
30 NeedChunks(String),
31}
32
33pub struct Remote {
34 api_endpoint: String,
35 token: String,
36 uuid: String,
37 client: ClientWithMiddleware,
38}
39
40impl Remote {
41 pub fn new(api_endpoint: &str, token: &str) -> Remote {
42 let rc = reqwest::ClientBuilder::new()
43 .gzip(true)
44 .timeout(std::time::Duration::from_secs(REQUEST_TIMEOUT_SECS))
45 .build()
46 .unwrap();
47 let client = ClientBuilder::new(rc)
48 .build();
50
51 Self {
52 api_endpoint: api_endpoint.into(),
53 uuid: Uuid::new_v4().into(),
54 token: token.into(),
55 client,
56 }
57 }
58}
59impl Remote {
60 fn auth_headers(&self) -> HeaderMap {
61 let auth_value = format!("Bearer {}", self.token);
62
63 let mut headers = HeaderMap::new();
64 headers.insert(AUTHORIZATION, HeaderValue::from_str(&auth_value).unwrap());
65
66 headers
67 }
68
69 pub async fn upload(&self, chunk: &str, content: Vec<u8>) -> Result<()> {
70 trace!("uploading chunk {:?}", chunk);
71
72 let response = self
73 .client
74 .post(self.api_endpoint.clone() + "/chunks/" + chunk)
75 .headers(self.auth_headers())
76 .body(content)
77 .send()
78 .await?;
79
80 match response.status() {
81 StatusCode::OK => Ok(()),
82 StatusCode::UNAUTHORIZED => Err(SyncError::Unauthorized),
83 _ => Err(SyncError::Unknown),
84 }
85 }
86
87 pub async fn upload_batch(&self, chunks: Vec<(String, Vec<u8>)>) -> Result<()> {
88 trace!(
89 "uploading chunks {:?}",
90 chunks.iter().map(|(c, _)| c).collect::<Vec<_>>()
91 );
92
93 let boundary = format!("------------------------{}", Uuid::new_v4());
95 let mut headers = self.auth_headers();
96 headers.insert(
97 "content-type",
98 HeaderValue::from_str(&format!("multipart/form-data; boundary={}", &boundary)).unwrap(),
99 );
100
101 let final_boundary = format!("--{}--\r\n", &boundary).into_bytes();
102
103 let stream = futures::stream::iter(chunks)
105 .map(move |(chunk_id, content)| {
106 let part = format!(
107 "--{boundary}\r\n\
108 Content-Disposition: form-data; name=\"{chunk_id}\"\r\n\
109 Content-Type: application/octet-stream\r\n\r\n",
110 boundary = &boundary,
111 chunk_id = chunk_id
112 );
113
114 let end = "\r\n".to_string();
115
116 futures::stream::iter(vec![
118 Ok::<_, SyncError>(part.into_bytes()),
119 Ok::<_, SyncError>(content),
120 Ok::<_, SyncError>(end.into_bytes()),
121 ])
122 })
123 .flatten();
124
125 let stream = stream.chain(futures::stream::once(async move { Ok(final_boundary) }));
128
129 let response = self
130 .client
131 .post(self.api_endpoint.clone() + "/chunks/upload")
132 .headers(headers)
133 .body(reqwest::Body::wrap_stream(stream))
134 .send()
135 .await?;
136
137 match response.status() {
138 StatusCode::OK => Ok(()),
139 StatusCode::UNAUTHORIZED => Err(SyncError::Unauthorized),
140 _ => Err(SyncError::Unknown),
141 }
142 }
143
144 pub async fn download(&self, chunk: &str) -> Result<Vec<u8>> {
145 trace!("downloading chunk {:?}", chunk);
146
147 let response = self
148 .client
149 .get(self.api_endpoint.clone() + "/chunks/" + chunk)
150 .headers(self.auth_headers())
151 .send()
152 .await?;
153
154 match response.status() {
155 StatusCode::OK => match response.bytes().await {
156 Ok(bytes) => Ok(bytes.to_vec()),
157 Err(_) => Err(SyncError::BodyExtractError),
158 },
159 StatusCode::UNAUTHORIZED => Err(SyncError::Unauthorized),
160 _ => Err(SyncError::Unknown),
161 }
162 }
163
164 pub async fn list(&self, local_jid: i32) -> Result<Vec<ResponseFileRecord>> {
165 trace!("list after {:?}", local_jid);
166
167 let jid_string = local_jid.to_string();
168
169 let response = self
170 .client
171 .get(self.api_endpoint.clone() + "/metadata/list?jid=" + &jid_string)
172 .headers(self.auth_headers())
173 .send()
174 .await?;
175
176 match response.status() {
177 StatusCode::OK => {
178 let records = response.json::<Vec<ResponseFileRecord>>().await?;
179
180 Ok(records)
181 }
182 StatusCode::UNAUTHORIZED => Err(SyncError::Unauthorized),
183 _ => Err(SyncError::Unknown),
184 }
185 }
186
187 pub async fn poll(&self) -> Result<()> {
188 trace!("started poll");
189
190 let seconds = REQUEST_TIMEOUT_SECS + 10;
192
193 let seconds_string = seconds.to_string();
194
195 let response = self
196 .client
197 .get(
198 self.api_endpoint.clone()
199 + "/metadata/poll?seconds="
200 + &seconds_string
201 + "&uuid="
202 + &self.uuid,
203 )
204 .headers(self.auth_headers())
205 .send()
206 .await;
207
208 match response {
210 Ok(response) => match response.status() {
211 StatusCode::OK => Ok(()),
212 StatusCode::UNAUTHORIZED => Err(SyncError::Unauthorized),
213 _ => Err(SyncError::Unknown),
214 },
215 Err(e) if e.is_timeout() => Ok(()), Err(e) => Err(e.into()),
217 }
218 }
219
220 pub async fn commit(
221 &self,
222 path: &str,
223 deleted: bool,
224 chunk_ids: &str,
225 ) -> Result<CommitResultStatus> {
226 trace!("commit {:?}", path);
227
228 let path = Path::new(path);
229
230 let params = [
231 ("deleted", if deleted { "true" } else { "false" }),
232 ("chunk_ids", chunk_ids),
233 ("path", &path.to_slash().unwrap()),
234 ];
235
236 let response = self
237 .client
238 .post(self.api_endpoint.clone() + "/metadata/commit" + "?uuid=" + &self.uuid)
239 .headers(self.auth_headers())
240 .form(¶ms)
241 .send()
242 .await?;
243
244 match response.status() {
245 StatusCode::OK => {
246 let records = response.json::<CommitResultStatus>().await?;
247
248 Ok(records)
249 }
250 StatusCode::UNAUTHORIZED => Err(SyncError::Unauthorized),
251 _ => Err(SyncError::Unknown),
252 }
253 }
254
255 pub async fn download_batch<'a>(
256 &'a self,
257 chunk_ids: Vec<&'a str>,
258 ) -> impl Stream<Item = Result<(String, Vec<u8>)>> + Unpin + 'a {
259 Box::pin(async_stream::try_stream! {
260 trace!("Starting download_batch with chunk_ids: {:?}", chunk_ids);
261
262 let params: Vec<(&str, &str)> = chunk_ids.iter().map(|&id| ("chunk_ids[]", id)).collect();
263
264 let response = self
265 .client
266 .post(self.api_endpoint.clone() + "/chunks/download")
267 .headers(self.auth_headers())
268 .form(¶ms)
269 .send()
270 .await?;
271 trace!("Received response with status: {:?}", response.status());
272
273 match response.status() {
274 StatusCode::OK => {
275 let content_type = response
276 .headers()
277 .get("content-type")
278 .and_then(|v| v.to_str().ok())
279 .ok_or(SyncError::BatchDownloadError(
280 "No content-type header".to_string(),
281 ))?
282 .to_string();
283
284 let boundary = content_type
285 .split("boundary=")
286 .nth(1)
287 .ok_or(SyncError::BatchDownloadError(
288 "No boundary in content-type header".to_string(),
289 ))?;
290
291 let boundary_bytes = format!("--{}", boundary).into_bytes();
292
293 let mut stream = response.bytes_stream();
294 let mut buffer = Vec::new();
295
296 while let Some(chunk) = stream.next().await {
297 let chunk = chunk?;
298 buffer.extend_from_slice(&chunk);
299
300 while let Some((part, remaining)) = extract_next_part(&buffer, &boundary_bytes)? {
302 if let Some((chunk_id, content)) = process_part(&part)? {
303 yield (chunk_id, content);
304 }
305 buffer = remaining;
306 }
307 }
308 }
309 StatusCode::UNAUTHORIZED => Err(SyncError::Unauthorized)?,
310 _ => Err(SyncError::Unknown)?,
311 }
312 })
313 }
314}
315
316fn extract_next_part(buffer: &[u8], boundary: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
318 if let Some(start) = find_boundary(buffer, boundary) {
319 if let Some(next_boundary) = find_boundary(&buffer[start + boundary.len()..], boundary) {
320 let part =
321 buffer[start + boundary.len()..start + boundary.len() + next_boundary].to_vec();
322 let remaining = buffer[start + boundary.len() + next_boundary..].to_vec();
323 Ok(Some((part, remaining)))
324 } else {
325 Ok(None) }
327 } else {
328 Ok(None) }
330}
331
332fn process_part(part: &[u8]) -> Result<Option<(String, Vec<u8>)>> {
334 if let Some(headers_end) = find_double_crlf(part) {
335 let headers = std::str::from_utf8(&part[..headers_end])
336 .map_err(|_| SyncError::BatchDownloadError("Invalid headers".to_string()))?;
337
338 let chunk_id = headers
339 .lines()
340 .find(|line| line.starts_with("X-Chunk-ID:"))
341 .and_then(|line| line.split(": ").nth(1))
342 .ok_or(SyncError::BatchDownloadError(
343 "No chunk ID found".to_string(),
344 ))?
345 .trim()
346 .to_string();
347
348 let content = part[headers_end + 4..part.len() - 2].to_vec();
350 Ok(Some((chunk_id, content)))
351 } else {
352 Ok(None)
353 }
354}
355
356fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
358 data.windows(boundary.len())
359 .position(|window| window == boundary)
360}
361
362fn find_double_crlf(data: &[u8]) -> Option<usize> {
364 data.windows(4).position(|window| window == b"\r\n\r\n")
365}