lance_io/
traits.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::Range;
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use deepsize::DeepSizeOf;
9use object_store::path::Path;
10use prost::Message;
11use tokio::io::{AsyncWrite, AsyncWriteExt};
12
13use lance_core::Result;
14
15pub trait ProtoStruct {
16    type Proto: Message;
17}
18
19/// A trait for writing to a file on local file system or object store.
20#[async_trait]
21pub trait Writer: AsyncWrite + Unpin + Send {
22    /// Tell the current offset.
23    async fn tell(&mut self) -> Result<usize>;
24}
25
26/// Lance Write Extension.
27#[async_trait]
28pub trait WriteExt {
29    /// Write a Protobuf message to the [Writer], and returns the file position
30    /// where the protobuf is written.
31    async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize>;
32
33    async fn write_struct<
34        'b,
35        M: Message + From<&'b T>,
36        T: ProtoStruct<Proto = M> + Send + Sync + 'b,
37    >(
38        &mut self,
39        obj: &'b T,
40    ) -> Result<usize> {
41        let msg: M = M::from(obj);
42        self.write_protobuf(&msg).await
43    }
44    /// Write magics to the tail of a file before closing the file.
45    async fn write_magics(
46        &mut self,
47        pos: usize,
48        major_version: i16,
49        minor_version: i16,
50        magic: &[u8],
51    ) -> Result<()>;
52}
53
54#[async_trait]
55impl<W: Writer + ?Sized> WriteExt for W {
56    async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize> {
57        let offset = self.tell().await?;
58
59        let len = msg.encoded_len();
60
61        self.write_u32_le(len as u32).await?;
62        self.write_all(&msg.encode_to_vec()).await?;
63
64        Ok(offset)
65    }
66
67    async fn write_magics(
68        &mut self,
69        pos: usize,
70        major_version: i16,
71        minor_version: i16,
72        magic: &[u8],
73    ) -> Result<()> {
74        self.write_i64_le(pos as i64).await?;
75        self.write_i16_le(major_version).await?;
76        self.write_i16_le(minor_version).await?;
77        self.write_all(magic).await?;
78        Ok(())
79    }
80}
81
82#[async_trait]
83pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf {
84    fn path(&self) -> &Path;
85
86    /// Suggest optimal I/O size per storage device.
87    fn block_size(&self) -> usize;
88
89    /// Suggest optimal I/O parallelism per storage device.
90    fn io_parallelism(&self) -> usize;
91
92    /// Object/File Size.
93    async fn size(&self) -> object_store::Result<usize>;
94
95    /// Read a range of bytes from the object.
96    ///
97    /// TODO: change to read_at()?
98    async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes>;
99
100    /// Read all bytes from the object.
101    ///
102    /// By default this reads the size in a separate IOP but some implementations
103    /// may not need the size beforehand.
104    async fn get_all(&self) -> object_store::Result<Bytes>;
105}