1use std::fs::File;
2use std::ops::{Deref, Range, RangeBounds};
3use std::sync::Arc;
4use std::{fmt, io};
5
6use async_trait::async_trait;
7use ownedbytes::{OwnedBytes, StableDeref};
8
9use crate::{ByteCount, HasLen};
10
11#[async_trait]
21pub trait FileHandle: 'static + Send + Sync + HasLen + fmt::Debug {
22 fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes>;
26
27 #[doc(hidden)]
28 async fn read_bytes_async(&self, _byte_range: Range<usize>) -> io::Result<OwnedBytes> {
29 Err(io::Error::new(
30 io::ErrorKind::Unsupported,
31 "Async read is not supported.",
32 ))
33 }
34}
35
36#[derive(Debug)]
37pub struct WrapFile {
39 file: File,
40 len: usize,
41}
42impl WrapFile {
43 pub fn new(file: File) -> io::Result<Self> {
45 let len = file.metadata()?.len() as usize;
46 Ok(WrapFile { file, len })
47 }
48}
49
50#[async_trait]
51impl FileHandle for WrapFile {
52 fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
53 let file_len = self.len();
54
55 let start = range.start;
57 let end = range.end.min(file_len);
58
59 if start >= end {
61 return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid range"));
62 }
63
64 let mut buffer = vec![0; end - start];
65
66 #[cfg(unix)]
67 {
68 use std::os::unix::prelude::FileExt;
69 self.file.read_exact_at(&mut buffer, start as u64)?;
70 }
71
72 #[cfg(not(unix))]
73 {
74 use std::io::{Read, Seek};
75 let mut file = self.file.try_clone()?; file.seek(io::SeekFrom::Start(start as u64))?;
78 file.read_exact(&mut buffer)?;
80 }
81
82 Ok(OwnedBytes::new(buffer))
83 }
84 }
86impl HasLen for WrapFile {
87 fn len(&self) -> usize {
88 self.len
89 }
90}
91
92#[async_trait]
93impl FileHandle for &'static [u8] {
94 fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
95 let bytes = &self[range];
96 Ok(OwnedBytes::new(bytes))
97 }
98
99 async fn read_bytes_async(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
100 Ok(self.read_bytes(byte_range)?)
101 }
102}
103
104impl<B> From<B> for FileSlice
105where B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync
106{
107 fn from(bytes: B) -> FileSlice {
108 FileSlice::new(Arc::new(OwnedBytes::new(bytes)))
109 }
110}
111
112#[derive(Clone)]
116pub struct FileSlice {
117 data: Arc<dyn FileHandle>,
118 range: Range<usize>,
119}
120
121impl fmt::Debug for FileSlice {
122 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123 write!(f, "FileSlice({:?}, {:?})", &self.data, self.range)
124 }
125}
126
127impl FileSlice {
128 pub fn stream_file_chunks(&self) -> impl Iterator<Item = io::Result<OwnedBytes>> + '_ {
129 let len = self.range.end;
130 let mut start = self.range.start;
131 std::iter::from_fn(move || {
132 const CHUNK_SIZE: usize = 1024 * 1024; if start < len {
136 let end = (start + CHUNK_SIZE).min(len);
137 let range = start..end;
138 let chunk = self.data.read_bytes(range);
139 start += CHUNK_SIZE;
140 match chunk {
141 Ok(chunk) => Some(Ok(chunk)),
142 Err(e) => Some(Err(e)),
143 }
144 } else {
145 None
146 }
147 })
148 }
149}
150
151fn combine_ranges<R: RangeBounds<usize>>(orig_range: Range<usize>, rel_range: R) -> Range<usize> {
162 let start: usize = orig_range.start
163 + match rel_range.start_bound().cloned() {
164 std::ops::Bound::Included(rel_start) => rel_start,
165 std::ops::Bound::Excluded(rel_start) => rel_start + 1,
166 std::ops::Bound::Unbounded => 0,
167 };
168 assert!(start <= orig_range.end);
169 let end: usize = match rel_range.end_bound().cloned() {
170 std::ops::Bound::Included(rel_end) => orig_range.start + rel_end + 1,
171 std::ops::Bound::Excluded(rel_end) => orig_range.start + rel_end,
172 std::ops::Bound::Unbounded => orig_range.end,
173 };
174 assert!(end >= start);
175 assert!(end <= orig_range.end);
176 start..end
177}
178
179impl FileSlice {
180 pub fn new(file_handle: Arc<dyn FileHandle>) -> Self {
182 let num_bytes = file_handle.len();
183 FileSlice::new_with_num_bytes(file_handle, num_bytes)
184 }
185
186 #[doc(hidden)]
188 #[must_use]
189 pub fn new_with_num_bytes(file_handle: Arc<dyn FileHandle>, num_bytes: usize) -> Self {
190 FileSlice {
191 data: file_handle,
192 range: 0..num_bytes,
193 }
194 }
195
196 #[must_use]
202 #[inline]
203 pub fn slice<R: RangeBounds<usize>>(&self, byte_range: R) -> FileSlice {
204 FileSlice {
205 data: self.data.clone(),
206 range: combine_ranges(self.range.clone(), byte_range),
207 }
208 }
209
210 pub fn empty() -> FileSlice {
212 const EMPTY_SLICE: &[u8] = &[];
213 FileSlice::from(EMPTY_SLICE)
214 }
215
216 pub fn read_bytes(&self) -> io::Result<OwnedBytes> {
223 self.data.read_bytes(self.range.clone())
224 }
225
226 #[doc(hidden)]
227 pub async fn read_bytes_async(&self) -> io::Result<OwnedBytes> {
228 self.data.read_bytes_async(self.range.clone()).await
229 }
230
231 pub fn read_bytes_slice(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
235 assert!(
236 range.end <= self.len(),
237 "end of requested range exceeds the fileslice length ({} > {})",
238 range.end,
239 self.len()
240 );
241 self.data
242 .read_bytes(self.range.start + range.start..self.range.start + range.end)
243 }
244
245 #[doc(hidden)]
246 pub async fn read_bytes_slice_async(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
247 assert!(
248 self.range.start + byte_range.end <= self.range.end,
249 "`to` exceeds the fileslice length"
250 );
251 self.data
252 .read_bytes_async(
253 self.range.start + byte_range.start..self.range.start + byte_range.end,
254 )
255 .await
256 }
257
258 pub fn split(self, left_len: usize) -> (FileSlice, FileSlice) {
263 let left = self.slice_to(left_len);
264 let right = self.slice_from(left_len);
265 (left, right)
266 }
267
268 pub fn split_from_end(self, right_len: usize) -> (FileSlice, FileSlice) {
271 let left_len = self.len() - right_len;
272 self.split(left_len)
273 }
274
275 #[must_use]
280 pub fn slice_from(&self, from_offset: usize) -> FileSlice {
281 self.slice(from_offset..self.len())
282 }
283
284 #[must_use]
288 pub fn slice_from_end(&self, from_offset: usize) -> FileSlice {
289 self.slice(self.len() - from_offset..self.len())
290 }
291
292 #[must_use]
297 pub fn slice_to(&self, to_offset: usize) -> FileSlice {
298 self.slice(0..to_offset)
299 }
300
301 pub fn num_bytes(&self) -> ByteCount {
303 self.range.len().into()
304 }
305}
306
307#[async_trait]
308impl FileHandle for FileSlice {
309 fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
310 self.read_bytes_slice(range)
311 }
312
313 async fn read_bytes_async(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
314 self.read_bytes_slice_async(byte_range).await
315 }
316}
317
318impl HasLen for FileSlice {
319 fn len(&self) -> usize {
320 self.range.len()
321 }
322}
323
324#[async_trait]
325impl FileHandle for OwnedBytes {
326 fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
327 Ok(self.slice(range))
328 }
329
330 async fn read_bytes_async(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
331 self.read_bytes(range)
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use std::io;
338 use std::ops::Bound;
339 use std::sync::Arc;
340
341 use super::{FileHandle, FileSlice};
342 use crate::file_slice::combine_ranges;
343 use crate::HasLen;
344
345 #[test]
346 fn test_file_slice() -> io::Result<()> {
347 let file_slice = FileSlice::new(Arc::new(b"abcdef".as_ref()));
348 assert_eq!(file_slice.len(), 6);
349 assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef");
350 assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab");
351 assert_eq!(
352 file_slice
353 .slice_from(1)
354 .slice_to(2)
355 .read_bytes()?
356 .as_slice(),
357 b"bc"
358 );
359 {
360 let (left, right) = file_slice.clone().split(0);
361 assert_eq!(left.read_bytes()?.as_slice(), b"");
362 assert_eq!(right.read_bytes()?.as_slice(), b"abcdef");
363 }
364 {
365 let (left, right) = file_slice.clone().split(2);
366 assert_eq!(left.read_bytes()?.as_slice(), b"ab");
367 assert_eq!(right.read_bytes()?.as_slice(), b"cdef");
368 }
369 {
370 let (left, right) = file_slice.clone().split_from_end(0);
371 assert_eq!(left.read_bytes()?.as_slice(), b"abcdef");
372 assert_eq!(right.read_bytes()?.as_slice(), b"");
373 }
374 {
375 let (left, right) = file_slice.split_from_end(2);
376 assert_eq!(left.read_bytes()?.as_slice(), b"abcd");
377 assert_eq!(right.read_bytes()?.as_slice(), b"ef");
378 }
379 Ok(())
380 }
381
382 #[test]
383 fn test_file_slice_trait_slice_len() {
384 let blop: &'static [u8] = b"abc";
385 let owned_bytes: Box<dyn FileHandle> = Box::new(blop);
386 assert_eq!(owned_bytes.len(), 3);
387 }
388
389 #[test]
390 fn test_slice_simple_read() -> io::Result<()> {
391 let slice = FileSlice::new(Arc::new(&b"abcdef"[..]));
392 assert_eq!(slice.len(), 6);
393 assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef");
394 assert_eq!(slice.slice(1..4).read_bytes()?.as_ref(), b"bcd");
395 Ok(())
396 }
397
398 #[test]
399 fn test_slice_read_slice() -> io::Result<()> {
400 let slice_deref = FileSlice::new(Arc::new(&b"abcdef"[..]));
401 assert_eq!(slice_deref.read_bytes_slice(1..4)?.as_ref(), b"bcd");
402 Ok(())
403 }
404
405 #[test]
406 #[should_panic(expected = "end of requested range exceeds the fileslice length (10 > 6)")]
407 fn test_slice_read_slice_invalid_range_exceeds() {
408 let slice_deref = FileSlice::new(Arc::new(&b"abcdef"[..]));
409 assert_eq!(
410 slice_deref.read_bytes_slice(0..10).unwrap().as_ref(),
411 b"bcd"
412 );
413 }
414
415 #[test]
416 fn test_combine_range() {
417 assert_eq!(combine_ranges(1..3, 0..1), 1..2);
418 assert_eq!(combine_ranges(1..3, 1..), 2..3);
419 assert_eq!(combine_ranges(1..4, ..2), 1..3);
420 assert_eq!(combine_ranges(3..10, 2..5), 5..8);
421 assert_eq!(combine_ranges(2..11, 5..=7), 7..10);
422 assert_eq!(
423 combine_ranges(2..11, (Bound::Excluded(5), Bound::Unbounded)),
424 8..11
425 );
426 }
427
428 #[test]
429 #[should_panic]
430 fn test_combine_range_panics() {
431 let _ = combine_ranges(3..5, 1..4);
432 }
433}