1use std::{
20 fmt::Display,
21 ops::{Range, RangeBounds},
22};
23
24use super::Result;
25use bytes::Bytes;
26use futures::{stream::StreamExt, Stream, TryStreamExt};
27use snafu::Snafu;
28
29#[cfg(any(feature = "azure", feature = "http"))]
30pub(crate) static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
31
32#[cfg(any(feature = "azure", feature = "http"))]
34pub(crate) fn deserialize_rfc1123<'de, D>(
35 deserializer: D,
36) -> Result<chrono::DateTime<chrono::Utc>, D::Error>
37where
38 D: serde::Deserializer<'de>,
39{
40 let s: String = serde::Deserialize::deserialize(deserializer)?;
41 let naive =
42 chrono::NaiveDateTime::parse_from_str(&s, RFC1123_FMT).map_err(serde::de::Error::custom)?;
43 Ok(chrono::TimeZone::from_utc_datetime(&chrono::Utc, &naive))
44}
45
46#[cfg(any(feature = "aws", feature = "azure"))]
47pub(crate) fn hmac_sha256(secret: impl AsRef<[u8]>, bytes: impl AsRef<[u8]>) -> ring::hmac::Tag {
48 let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA256, secret.as_ref());
49 ring::hmac::sign(&key, bytes.as_ref())
50}
51
52pub async fn collect_bytes<S, E>(mut stream: S, size_hint: Option<usize>) -> Result<Bytes, E>
54where
55 E: Send,
56 S: Stream<Item = Result<Bytes, E>> + Send + Unpin,
57{
58 let first = stream.next().await.transpose()?.unwrap_or_default();
59
60 match stream.next().await.transpose()? {
62 None => Ok(first),
63 Some(second) => {
64 let size_hint = size_hint.unwrap_or_else(|| first.len() + second.len());
65
66 let mut buf = Vec::with_capacity(size_hint);
67 buf.extend_from_slice(&first);
68 buf.extend_from_slice(&second);
69 while let Some(maybe_bytes) = stream.next().await {
70 buf.extend_from_slice(&maybe_bytes?);
71 }
72
73 Ok(buf.into())
74 }
75 }
76}
77
78#[cfg(not(target_arch = "wasm32"))]
79pub(crate) async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
81where
82 F: FnOnce() -> Result<T> + Send + 'static,
83 T: Send + 'static,
84{
85 match tokio::runtime::Handle::try_current() {
86 Ok(runtime) => runtime.spawn_blocking(f).await?,
87 Err(_) => f(),
88 }
89}
90
91pub const OBJECT_STORE_COALESCE_DEFAULT: usize = 1024 * 1024;
94
95pub(crate) const OBJECT_STORE_COALESCE_PARALLEL: usize = 10;
97
98pub async fn coalesce_ranges<F, E, Fut>(
107 ranges: &[Range<usize>],
108 fetch: F,
109 coalesce: usize,
110) -> Result<Vec<Bytes>, E>
111where
112 F: Send + FnMut(Range<usize>) -> Fut,
113 E: Send,
114 Fut: std::future::Future<Output = Result<Bytes, E>> + Send,
115{
116 let fetch_ranges = merge_ranges(ranges, coalesce);
117
118 let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned())
119 .map(fetch)
120 .buffered(OBJECT_STORE_COALESCE_PARALLEL)
121 .try_collect()
122 .await?;
123
124 Ok(ranges
125 .iter()
126 .map(|range| {
127 let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
128 let fetch_range = &fetch_ranges[idx];
129 let fetch_bytes = &fetched[idx];
130
131 let start = range.start - fetch_range.start;
132 let end = range.end - fetch_range.start;
133 fetch_bytes.slice(start..end.min(fetch_bytes.len()))
134 })
135 .collect())
136}
137
138fn merge_ranges(ranges: &[Range<usize>], coalesce: usize) -> Vec<Range<usize>> {
140 if ranges.is_empty() {
141 return vec![];
142 }
143
144 let mut ranges = ranges.to_vec();
145 ranges.sort_unstable_by_key(|range| range.start);
146
147 let mut ret = Vec::with_capacity(ranges.len());
148 let mut start_idx = 0;
149 let mut end_idx = 1;
150
151 while start_idx != ranges.len() {
152 let mut range_end = ranges[start_idx].end;
153
154 while end_idx != ranges.len()
155 && ranges[end_idx]
156 .start
157 .checked_sub(range_end)
158 .map(|delta| delta <= coalesce)
159 .unwrap_or(true)
160 {
161 range_end = range_end.max(ranges[end_idx].end);
162 end_idx += 1;
163 }
164
165 let start = ranges[start_idx].start;
166 let end = range_end;
167 ret.push(start..end);
168
169 start_idx = end_idx;
170 end_idx += 1;
171 }
172
173 ret
174}
175
176#[derive(Debug, PartialEq, Eq, Clone)]
193pub enum GetRange {
194 Bounded(Range<usize>),
201 Offset(usize),
203 Suffix(usize),
205}
206
207#[derive(Debug, Snafu)]
208pub(crate) enum InvalidGetRange {
209 #[snafu(display(
210 "Wanted range starting at {requested}, but object was only {length} bytes long"
211 ))]
212 StartTooLarge { requested: usize, length: usize },
213
214 #[snafu(display("Range started at {start} and ended at {end}"))]
215 Inconsistent { start: usize, end: usize },
216}
217
218impl GetRange {
219 pub(crate) fn is_valid(&self) -> Result<(), InvalidGetRange> {
220 match self {
221 Self::Bounded(r) if r.end <= r.start => {
222 return Err(InvalidGetRange::Inconsistent {
223 start: r.start,
224 end: r.end,
225 });
226 }
227 _ => (),
228 };
229 Ok(())
230 }
231
232 pub(crate) fn as_range(&self, len: usize) -> Result<Range<usize>, InvalidGetRange> {
234 self.is_valid()?;
235 match self {
236 Self::Bounded(r) => {
237 if r.start >= len {
238 Err(InvalidGetRange::StartTooLarge {
239 requested: r.start,
240 length: len,
241 })
242 } else if r.end > len {
243 Ok(r.start..len)
244 } else {
245 Ok(r.clone())
246 }
247 }
248 Self::Offset(o) => {
249 if *o >= len {
250 Err(InvalidGetRange::StartTooLarge {
251 requested: *o,
252 length: len,
253 })
254 } else {
255 Ok(*o..len)
256 }
257 }
258 Self::Suffix(n) => Ok(len.saturating_sub(*n)..len),
259 }
260 }
261}
262
263impl Display for GetRange {
264 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265 match self {
266 Self::Bounded(r) => write!(f, "bytes={}-{}", r.start, r.end - 1),
267 Self::Offset(o) => write!(f, "bytes={o}-"),
268 Self::Suffix(n) => write!(f, "bytes=-{n}"),
269 }
270 }
271}
272
273impl<T: RangeBounds<usize>> From<T> for GetRange {
274 fn from(value: T) -> Self {
275 use std::ops::Bound::*;
276 let first = match value.start_bound() {
277 Included(i) => *i,
278 Excluded(i) => i + 1,
279 Unbounded => 0,
280 };
281 match value.end_bound() {
282 Included(i) => Self::Bounded(first..(i + 1)),
283 Excluded(i) => Self::Bounded(first..*i),
284 Unbounded => Self::Offset(first),
285 }
286 }
287}
288#[cfg(any(feature = "aws", feature = "gcp"))]
293pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC
294 .remove(b'-')
295 .remove(b'.')
296 .remove(b'_')
297 .remove(b'~');
298
299#[cfg(any(feature = "aws", feature = "gcp"))]
301pub(crate) fn hex_digest(bytes: &[u8]) -> String {
302 let digest = ring::digest::digest(&ring::digest::SHA256, bytes);
303 hex_encode(digest.as_ref())
304}
305
306#[cfg(any(feature = "aws", feature = "gcp"))]
308pub(crate) fn hex_encode(bytes: &[u8]) -> String {
309 use std::fmt::Write;
310 let mut out = String::with_capacity(bytes.len() * 2);
311 for byte in bytes {
312 let _ = write!(out, "{byte:02x}");
314 }
315 out
316}
317
318#[cfg(test)]
319mod tests {
320 use crate::Error;
321
322 use super::*;
323 use rand::{thread_rng, Rng};
324 use std::ops::Range;
325
326 async fn do_fetch(ranges: Vec<Range<usize>>, coalesce: usize) -> Vec<Range<usize>> {
330 let max = ranges.iter().map(|x| x.end).max().unwrap_or(0);
331 let src: Vec<_> = (0..max).map(|x| x as u8).collect();
332
333 let mut fetches = vec![];
334 let coalesced = coalesce_ranges::<_, Error, _>(
335 &ranges,
336 |range| {
337 fetches.push(range.clone());
338 futures::future::ready(Ok(Bytes::from(src[range].to_vec())))
339 },
340 coalesce,
341 )
342 .await
343 .unwrap();
344
345 assert_eq!(ranges.len(), coalesced.len());
346 for (range, bytes) in ranges.iter().zip(coalesced) {
347 assert_eq!(bytes.as_ref(), &src[range.clone()]);
348 }
349 fetches
350 }
351
352 #[tokio::test]
353 async fn test_coalesce_ranges() {
354 let fetches = do_fetch(vec![], 0).await;
355 assert!(fetches.is_empty());
356
357 let fetches = do_fetch(vec![0..3; 1], 0).await;
358 assert_eq!(fetches, vec![0..3]);
359
360 let fetches = do_fetch(vec![0..2, 3..5], 0).await;
361 assert_eq!(fetches, vec![0..2, 3..5]);
362
363 let fetches = do_fetch(vec![0..1, 1..2], 0).await;
364 assert_eq!(fetches, vec![0..2]);
365
366 let fetches = do_fetch(vec![0..1, 2..72], 1).await;
367 assert_eq!(fetches, vec![0..72]);
368
369 let fetches = do_fetch(vec![0..1, 56..72, 73..75], 1).await;
370 assert_eq!(fetches, vec![0..1, 56..75]);
371
372 let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
373 assert_eq!(fetches, vec![0..9]);
374
375 let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
376 assert_eq!(fetches, vec![0..9]);
377
378 let fetches = do_fetch(vec![0..1, 6..7, 8..9, 10..14, 9..10], 4).await;
379 assert_eq!(fetches, vec![0..1, 6..14]);
380 }
381
382 #[tokio::test]
383 async fn test_coalesce_fuzz() {
384 let mut rand = thread_rng();
385 for _ in 0..100 {
386 let object_len = rand.gen_range(10..250);
387 let range_count = rand.gen_range(0..10);
388 let ranges: Vec<_> = (0..range_count)
389 .map(|_| {
390 let start = rand.gen_range(0..object_len);
391 let max_len = 20.min(object_len - start);
392 let len = rand.gen_range(0..max_len);
393 start..start + len
394 })
395 .collect();
396
397 let coalesce = rand.gen_range(1..5);
398 let fetches = do_fetch(ranges.clone(), coalesce).await;
399
400 for fetch in fetches.windows(2) {
401 assert!(
402 fetch[0].start <= fetch[1].start,
403 "fetches should be sorted, {:?} vs {:?}",
404 fetch[0],
405 fetch[1]
406 );
407
408 let delta = fetch[1].end - fetch[0].end;
409 assert!(
410 delta > coalesce,
411 "fetches should not overlap by {}, {:?} vs {:?} for {:?}",
412 coalesce,
413 fetch[0],
414 fetch[1],
415 ranges
416 );
417 }
418 }
419 }
420
421 #[test]
422 fn getrange_str() {
423 assert_eq!(GetRange::Offset(0).to_string(), "bytes=0-");
424 assert_eq!(GetRange::Bounded(10..19).to_string(), "bytes=10-18");
425 assert_eq!(GetRange::Suffix(10).to_string(), "bytes=-10");
426 }
427
428 #[test]
429 fn getrange_from() {
430 assert_eq!(Into::<GetRange>::into(10..15), GetRange::Bounded(10..15),);
431 assert_eq!(Into::<GetRange>::into(10..=15), GetRange::Bounded(10..16),);
432 assert_eq!(Into::<GetRange>::into(10..), GetRange::Offset(10),);
433 assert_eq!(Into::<GetRange>::into(..=15), GetRange::Bounded(0..16));
434 }
435
436 #[test]
437 fn test_as_range() {
438 let range = GetRange::Bounded(2..5);
439 assert_eq!(range.as_range(5).unwrap(), 2..5);
440
441 let range = range.as_range(4).unwrap();
442 assert_eq!(range, 2..4);
443
444 let range = GetRange::Bounded(3..3);
445 let err = range.as_range(2).unwrap_err().to_string();
446 assert_eq!(err, "Range started at 3 and ended at 3");
447
448 let range = GetRange::Bounded(2..2);
449 let err = range.as_range(3).unwrap_err().to_string();
450 assert_eq!(err, "Range started at 2 and ended at 2");
451
452 let range = GetRange::Suffix(3);
453 assert_eq!(range.as_range(3).unwrap(), 0..3);
454 assert_eq!(range.as_range(2).unwrap(), 0..2);
455
456 let range = GetRange::Suffix(0);
457 assert_eq!(range.as_range(0).unwrap(), 0..0);
458
459 let range = GetRange::Offset(2);
460 let err = range.as_range(2).unwrap_err().to_string();
461 assert_eq!(
462 err,
463 "Wanted range starting at 2, but object was only 2 bytes long"
464 );
465
466 let err = range.as_range(1).unwrap_err().to_string();
467 assert_eq!(
468 err,
469 "Wanted range starting at 2, but object was only 1 bytes long"
470 );
471
472 let range = GetRange::Offset(1);
473 assert_eq!(range.as_range(2).unwrap(), 1..2);
474 }
475}