tantivy_common/
file_slice.rs

1use std::fs::File;
2use std::ops::{Deref, Range, RangeBounds};
3use std::sync::Arc;
4use std::{fmt, io};
5
6use async_trait::async_trait;
7use ownedbytes::{OwnedBytes, StableDeref};
8
9use crate::{ByteCount, HasLen};
10
11/// Objects that represents files sections in tantivy.
12///
13/// By contract, whatever happens to the directory file, as long as a FileHandle
14/// is alive, the data associated with it cannot be altered or destroyed.
15///
16/// The underlying behavior is therefore specific to the `Directory` that
17/// created it. Despite its name, a [`FileSlice`] may or may not directly map to an actual file
18/// on the filesystem.
19
20#[async_trait]
21pub trait FileHandle: 'static + Send + Sync + HasLen + fmt::Debug {
22    /// Reads a slice of bytes.
23    ///
24    /// This method may panic if the range requested is invalid.
25    fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes>;
26
27    #[doc(hidden)]
28    async fn read_bytes_async(&self, _byte_range: Range<usize>) -> io::Result<OwnedBytes> {
29        Err(io::Error::new(
30            io::ErrorKind::Unsupported,
31            "Async read is not supported.",
32        ))
33    }
34}
35
36#[derive(Debug)]
37/// A File with it's length included.
38pub struct WrapFile {
39    file: File,
40    len: usize,
41}
42impl WrapFile {
43    /// Creates a new WrapFile and stores its length.
44    pub fn new(file: File) -> io::Result<Self> {
45        let len = file.metadata()?.len() as usize;
46        Ok(WrapFile { file, len })
47    }
48}
49
50#[async_trait]
51impl FileHandle for WrapFile {
52    fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
53        let file_len = self.len();
54
55        // Calculate the actual range to read, ensuring it stays within file boundaries
56        let start = range.start;
57        let end = range.end.min(file_len);
58
59        // Ensure the start is before the end of the range
60        if start >= end {
61            return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid range"));
62        }
63
64        let mut buffer = vec![0; end - start];
65
66        #[cfg(unix)]
67        {
68            use std::os::unix::prelude::FileExt;
69            self.file.read_exact_at(&mut buffer, start as u64)?;
70        }
71
72        #[cfg(not(unix))]
73        {
74            use std::io::{Read, Seek};
75            let mut file = self.file.try_clone()?; // Clone the file to read from it separately
76                                                   // Seek to the start position in the file
77            file.seek(io::SeekFrom::Start(start as u64))?;
78            // Read the data into the buffer
79            file.read_exact(&mut buffer)?;
80        }
81
82        Ok(OwnedBytes::new(buffer))
83    }
84    // todo implement async
85}
86impl HasLen for WrapFile {
87    fn len(&self) -> usize {
88        self.len
89    }
90}
91
92#[async_trait]
93impl FileHandle for &'static [u8] {
94    fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
95        let bytes = &self[range];
96        Ok(OwnedBytes::new(bytes))
97    }
98
99    async fn read_bytes_async(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
100        Ok(self.read_bytes(byte_range)?)
101    }
102}
103
104impl<B> From<B> for FileSlice
105where B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync
106{
107    fn from(bytes: B) -> FileSlice {
108        FileSlice::new(Arc::new(OwnedBytes::new(bytes)))
109    }
110}
111
112/// Logical slice of read only file in tantivy.
113///
114/// It can be cloned and sliced cheaply.
115#[derive(Clone)]
116pub struct FileSlice {
117    data: Arc<dyn FileHandle>,
118    range: Range<usize>,
119}
120
121impl fmt::Debug for FileSlice {
122    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123        write!(f, "FileSlice({:?}, {:?})", &self.data, self.range)
124    }
125}
126
127impl FileSlice {
128    pub fn stream_file_chunks(&self) -> impl Iterator<Item = io::Result<OwnedBytes>> + '_ {
129        let len = self.range.end;
130        let mut start = self.range.start;
131        std::iter::from_fn(move || {
132            /// Returns chunks of 1MB of data from the FileHandle.
133            const CHUNK_SIZE: usize = 1024 * 1024; // 1MB
134
135            if start < len {
136                let end = (start + CHUNK_SIZE).min(len);
137                let range = start..end;
138                let chunk = self.data.read_bytes(range);
139                start += CHUNK_SIZE;
140                match chunk {
141                    Ok(chunk) => Some(Ok(chunk)),
142                    Err(e) => Some(Err(e)),
143                }
144            } else {
145                None
146            }
147        })
148    }
149}
150
151/// Takes a range, a `RangeBounds` object, and returns
152/// a `Range` that corresponds to the relative application of the
153/// `RangeBounds` object to the original `Range`.
154///
155/// For instance, combine_ranges(`[2..11)`, `[5..7]`) returns `[7..10]`
156/// as it reads, what is the sub-range that starts at the 5 element of
157/// `[2..11)` and ends at the 9th element included.
158///
159/// This function panics, if the result would suggest something outside
160/// of the bounds of the original range.
161fn combine_ranges<R: RangeBounds<usize>>(orig_range: Range<usize>, rel_range: R) -> Range<usize> {
162    let start: usize = orig_range.start
163        + match rel_range.start_bound().cloned() {
164            std::ops::Bound::Included(rel_start) => rel_start,
165            std::ops::Bound::Excluded(rel_start) => rel_start + 1,
166            std::ops::Bound::Unbounded => 0,
167        };
168    assert!(start <= orig_range.end);
169    let end: usize = match rel_range.end_bound().cloned() {
170        std::ops::Bound::Included(rel_end) => orig_range.start + rel_end + 1,
171        std::ops::Bound::Excluded(rel_end) => orig_range.start + rel_end,
172        std::ops::Bound::Unbounded => orig_range.end,
173    };
174    assert!(end >= start);
175    assert!(end <= orig_range.end);
176    start..end
177}
178
179impl FileSlice {
180    /// Wraps a FileHandle.
181    pub fn new(file_handle: Arc<dyn FileHandle>) -> Self {
182        let num_bytes = file_handle.len();
183        FileSlice::new_with_num_bytes(file_handle, num_bytes)
184    }
185
186    /// Wraps a FileHandle.
187    #[doc(hidden)]
188    #[must_use]
189    pub fn new_with_num_bytes(file_handle: Arc<dyn FileHandle>, num_bytes: usize) -> Self {
190        FileSlice {
191            data: file_handle,
192            range: 0..num_bytes,
193        }
194    }
195
196    /// Creates a fileslice that is just a view over a slice of the data.
197    ///
198    /// # Panics
199    ///
200    /// Panics if `byte_range.end` exceeds the filesize.
201    #[must_use]
202    #[inline]
203    pub fn slice<R: RangeBounds<usize>>(&self, byte_range: R) -> FileSlice {
204        FileSlice {
205            data: self.data.clone(),
206            range: combine_ranges(self.range.clone(), byte_range),
207        }
208    }
209
210    /// Creates an empty FileSlice
211    pub fn empty() -> FileSlice {
212        const EMPTY_SLICE: &[u8] = &[];
213        FileSlice::from(EMPTY_SLICE)
214    }
215
216    /// Returns a `OwnedBytes` with all of the data in the `FileSlice`.
217    ///
218    /// The behavior is strongly dependent on the implementation of the underlying
219    /// `Directory` and the `FileSliceTrait` it creates.
220    /// In particular, it is  up to the `Directory` implementation
221    /// to handle caching if needed.
222    pub fn read_bytes(&self) -> io::Result<OwnedBytes> {
223        self.data.read_bytes(self.range.clone())
224    }
225
226    #[doc(hidden)]
227    pub async fn read_bytes_async(&self) -> io::Result<OwnedBytes> {
228        self.data.read_bytes_async(self.range.clone()).await
229    }
230
231    /// Reads a specific slice of data.
232    ///
233    /// This is equivalent to running `file_slice.slice(from, to).read_bytes()`.
234    pub fn read_bytes_slice(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
235        assert!(
236            range.end <= self.len(),
237            "end of requested range exceeds the fileslice length ({} > {})",
238            range.end,
239            self.len()
240        );
241        self.data
242            .read_bytes(self.range.start + range.start..self.range.start + range.end)
243    }
244
245    #[doc(hidden)]
246    pub async fn read_bytes_slice_async(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
247        assert!(
248            self.range.start + byte_range.end <= self.range.end,
249            "`to` exceeds the fileslice length"
250        );
251        self.data
252            .read_bytes_async(
253                self.range.start + byte_range.start..self.range.start + byte_range.end,
254            )
255            .await
256    }
257
258    /// Splits the FileSlice at the given offset and return two file slices.
259    /// `file_slice[..split_offset]` and `file_slice[split_offset..]`.
260    ///
261    /// This operation is cheap and must not copy any underlying data.
262    pub fn split(self, left_len: usize) -> (FileSlice, FileSlice) {
263        let left = self.slice_to(left_len);
264        let right = self.slice_from(left_len);
265        (left, right)
266    }
267
268    /// Splits the file slice at the given offset and return two file slices.
269    /// `file_slice[..split_offset]` and `file_slice[split_offset..]`.
270    pub fn split_from_end(self, right_len: usize) -> (FileSlice, FileSlice) {
271        let left_len = self.len() - right_len;
272        self.split(left_len)
273    }
274
275    /// Like `.slice(...)` but enforcing only the `from`
276    /// boundary.
277    ///
278    /// Equivalent to `.slice(from_offset, self.len())`
279    #[must_use]
280    pub fn slice_from(&self, from_offset: usize) -> FileSlice {
281        self.slice(from_offset..self.len())
282    }
283
284    /// Returns a slice from the end.
285    ///
286    /// Equivalent to `.slice(self.len() - from_offset, self.len())`
287    #[must_use]
288    pub fn slice_from_end(&self, from_offset: usize) -> FileSlice {
289        self.slice(self.len() - from_offset..self.len())
290    }
291
292    /// Like `.slice(...)` but enforcing only the `to`
293    /// boundary.
294    ///
295    /// Equivalent to `.slice(0, to_offset)`
296    #[must_use]
297    pub fn slice_to(&self, to_offset: usize) -> FileSlice {
298        self.slice(0..to_offset)
299    }
300
301    /// Returns the byte count of the FileSlice.
302    pub fn num_bytes(&self) -> ByteCount {
303        self.range.len().into()
304    }
305}
306
307#[async_trait]
308impl FileHandle for FileSlice {
309    fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
310        self.read_bytes_slice(range)
311    }
312
313    async fn read_bytes_async(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
314        self.read_bytes_slice_async(byte_range).await
315    }
316}
317
318impl HasLen for FileSlice {
319    fn len(&self) -> usize {
320        self.range.len()
321    }
322}
323
324#[async_trait]
325impl FileHandle for OwnedBytes {
326    fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
327        Ok(self.slice(range))
328    }
329
330    async fn read_bytes_async(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
331        self.read_bytes(range)
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use std::io;
338    use std::ops::Bound;
339    use std::sync::Arc;
340
341    use super::{FileHandle, FileSlice};
342    use crate::file_slice::combine_ranges;
343    use crate::HasLen;
344
345    #[test]
346    fn test_file_slice() -> io::Result<()> {
347        let file_slice = FileSlice::new(Arc::new(b"abcdef".as_ref()));
348        assert_eq!(file_slice.len(), 6);
349        assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef");
350        assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab");
351        assert_eq!(
352            file_slice
353                .slice_from(1)
354                .slice_to(2)
355                .read_bytes()?
356                .as_slice(),
357            b"bc"
358        );
359        {
360            let (left, right) = file_slice.clone().split(0);
361            assert_eq!(left.read_bytes()?.as_slice(), b"");
362            assert_eq!(right.read_bytes()?.as_slice(), b"abcdef");
363        }
364        {
365            let (left, right) = file_slice.clone().split(2);
366            assert_eq!(left.read_bytes()?.as_slice(), b"ab");
367            assert_eq!(right.read_bytes()?.as_slice(), b"cdef");
368        }
369        {
370            let (left, right) = file_slice.clone().split_from_end(0);
371            assert_eq!(left.read_bytes()?.as_slice(), b"abcdef");
372            assert_eq!(right.read_bytes()?.as_slice(), b"");
373        }
374        {
375            let (left, right) = file_slice.split_from_end(2);
376            assert_eq!(left.read_bytes()?.as_slice(), b"abcd");
377            assert_eq!(right.read_bytes()?.as_slice(), b"ef");
378        }
379        Ok(())
380    }
381
382    #[test]
383    fn test_file_slice_trait_slice_len() {
384        let blop: &'static [u8] = b"abc";
385        let owned_bytes: Box<dyn FileHandle> = Box::new(blop);
386        assert_eq!(owned_bytes.len(), 3);
387    }
388
389    #[test]
390    fn test_slice_simple_read() -> io::Result<()> {
391        let slice = FileSlice::new(Arc::new(&b"abcdef"[..]));
392        assert_eq!(slice.len(), 6);
393        assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef");
394        assert_eq!(slice.slice(1..4).read_bytes()?.as_ref(), b"bcd");
395        Ok(())
396    }
397
398    #[test]
399    fn test_slice_read_slice() -> io::Result<()> {
400        let slice_deref = FileSlice::new(Arc::new(&b"abcdef"[..]));
401        assert_eq!(slice_deref.read_bytes_slice(1..4)?.as_ref(), b"bcd");
402        Ok(())
403    }
404
405    #[test]
406    #[should_panic(expected = "end of requested range exceeds the fileslice length (10 > 6)")]
407    fn test_slice_read_slice_invalid_range_exceeds() {
408        let slice_deref = FileSlice::new(Arc::new(&b"abcdef"[..]));
409        assert_eq!(
410            slice_deref.read_bytes_slice(0..10).unwrap().as_ref(),
411            b"bcd"
412        );
413    }
414
415    #[test]
416    fn test_combine_range() {
417        assert_eq!(combine_ranges(1..3, 0..1), 1..2);
418        assert_eq!(combine_ranges(1..3, 1..), 2..3);
419        assert_eq!(combine_ranges(1..4, ..2), 1..3);
420        assert_eq!(combine_ranges(3..10, 2..5), 5..8);
421        assert_eq!(combine_ranges(2..11, 5..=7), 7..10);
422        assert_eq!(
423            combine_ranges(2..11, (Bound::Excluded(5), Bound::Unbounded)),
424            8..11
425        );
426    }
427
428    #[test]
429    #[should_panic]
430    fn test_combine_range_panics() {
431        let _ = combine_ranges(3..5, 1..4);
432    }
433}