1use std::fs::File;
7use std::io::{ErrorKind, Read, SeekFrom};
8use std::ops::Range;
9use std::sync::Arc;
10
11#[cfg(unix)]
13use std::os::unix::fs::FileExt;
14#[cfg(windows)]
15use std::os::windows::fs::FileExt;
16
17use async_trait::async_trait;
18use bytes::{Bytes, BytesMut};
19use deepsize::DeepSizeOf;
20use lance_core::{Error, Result};
21use object_store::path::Path;
22use snafu::location;
23use tokio::io::AsyncSeekExt;
24use tokio::sync::OnceCell;
25use tracing::instrument;
26
27use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
28use crate::traits::{Reader, Writer};
29
30pub fn to_local_path(path: &Path) -> String {
32 if cfg!(windows) {
33 path.to_string()
34 } else {
35 format!("/{path}")
36 }
37}
38
39pub fn remove_dir_all(path: &Path) -> Result<()> {
41 let local_path = to_local_path(path);
42 std::fs::remove_dir_all(local_path).map_err(|err| match err.kind() {
43 ErrorKind::NotFound => Error::NotFound {
44 uri: path.to_string(),
45 location: location!(),
46 },
47 _ => Error::from(err),
48 })?;
49 Ok(())
50}
51
52#[derive(Debug)]
54pub struct LocalObjectReader {
55 file: Arc<File>,
57
58 path: Path,
60
61 size: OnceCell<usize>,
64
65 block_size: usize,
67}
68
69impl DeepSizeOf for LocalObjectReader {
70 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
71 self.path.as_ref().deep_size_of_children(context)
73 }
74}
75
76impl LocalObjectReader {
77 pub async fn open_local_path(
78 path: impl AsRef<std::path::Path>,
79 block_size: usize,
80 known_size: Option<usize>,
81 ) -> Result<Box<dyn Reader>> {
82 let path = path.as_ref().to_owned();
83 let object_store_path = Path::from_filesystem_path(&path)?;
84 Self::open(&object_store_path, block_size, known_size).await
85 }
86
87 #[instrument(level = "debug")]
89 pub async fn open(
90 path: &Path,
91 block_size: usize,
92 known_size: Option<usize>,
93 ) -> Result<Box<dyn Reader>> {
94 let path = path.clone();
95 let local_path = to_local_path(&path);
96 tokio::task::spawn_blocking(move || {
97 let file = File::open(&local_path).map_err(|e| match e.kind() {
98 ErrorKind::NotFound => Error::NotFound {
99 uri: path.to_string(),
100 location: location!(),
101 },
102 _ => e.into(),
103 })?;
104 let size = OnceCell::new_with(known_size);
105 Ok(Box::new(Self {
106 file: Arc::new(file),
107 block_size,
108 size,
109 path,
110 }) as Box<dyn Reader>)
111 })
112 .await?
113 }
114}
115
116#[async_trait]
117impl Reader for LocalObjectReader {
118 fn path(&self) -> &Path {
119 &self.path
120 }
121
122 fn block_size(&self) -> usize {
123 self.block_size
124 }
125
126 fn io_parallelism(&self) -> usize {
127 DEFAULT_LOCAL_IO_PARALLELISM
128 }
129
130 async fn size(&self) -> object_store::Result<usize> {
132 let file = self.file.clone();
133 self.size
134 .get_or_try_init(|| async move {
135 let metadata = tokio::task::spawn_blocking(move || {
136 file.metadata().map_err(|err| object_store::Error::Generic {
137 store: "LocalFileSystem",
138 source: err.into(),
139 })
140 })
141 .await??;
142 Ok(metadata.len() as usize)
143 })
144 .await
145 .cloned()
146 }
147
148 #[instrument(level = "debug", skip(self))]
150 async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
151 let file = self.file.clone();
152 tokio::task::spawn_blocking(move || {
153 let mut buf = BytesMut::with_capacity(range.len());
154 unsafe { buf.set_len(range.len()) };
157 #[cfg(unix)]
158 file.read_exact_at(buf.as_mut(), range.start as u64)?;
159 #[cfg(windows)]
160 read_exact_at(file, buf.as_mut(), range.start as u64)?;
161
162 Ok(buf.freeze())
163 })
164 .await?
165 .map_err(|err: std::io::Error| object_store::Error::Generic {
166 store: "LocalFileSystem",
167 source: err.into(),
168 })
169 }
170
171 #[instrument(level = "debug", skip(self))]
173 async fn get_all(&self) -> object_store::Result<Bytes> {
174 let mut file = self.file.clone();
175 tokio::task::spawn_blocking(move || {
176 let mut buf = Vec::new();
177 file.read_to_end(buf.as_mut())?;
178 Ok(Bytes::from(buf))
179 })
180 .await?
181 .map_err(|err: std::io::Error| object_store::Error::Generic {
182 store: "LocalFileSystem",
183 source: err.into(),
184 })
185 }
186}
187
188#[cfg(windows)]
189fn read_exact_at(file: Arc<File>, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
190 let expected_len = buf.len();
191 while !buf.is_empty() {
192 match file.seek_read(buf, offset) {
193 Ok(0) => break,
194 Ok(n) => {
195 let tmp = buf;
196 buf = &mut tmp[n..];
197 offset += n as u64;
198 }
199 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
200 Err(e) => return Err(e),
201 }
202 }
203 if !buf.is_empty() {
204 Err(std::io::Error::new(
205 std::io::ErrorKind::UnexpectedEof,
206 format!(
207 "failed to fill whole buffer. Expected {} bytes, got {}",
208 expected_len, offset
209 ),
210 ))
211 } else {
212 Ok(())
213 }
214}
215
216#[async_trait]
217impl Writer for tokio::fs::File {
218 async fn tell(&mut self) -> Result<usize> {
219 Ok(self.seek(SeekFrom::Current(0)).await? as usize)
220 }
221}