1use std::fmt::{Debug, Formatter};
2use std::fs::File;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use polars_core::error::{feature_gated, PolarsResult};
7use polars_io::cloud::CloudOptions;
8#[cfg(feature = "cloud")]
9use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder};
10use polars_io::{expand_paths, expand_paths_hive, expanded_from_single_directory};
11use polars_utils::mmap::MemSlice;
12use polars_utils::pl_str::PlSmallStr;
13
14use super::FileScanOptions;
15
16#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
21#[derive(Clone)]
22pub enum ScanSources {
23 Paths(Arc<[PathBuf]>),
24
25 #[cfg_attr(feature = "serde", serde(skip))]
26 Files(Arc<[File]>),
27 #[cfg_attr(feature = "serde", serde(skip))]
28 Buffers(Arc<[MemSlice]>),
29}
30
31impl Debug for ScanSources {
32 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33 match self {
34 Self::Paths(p) => write!(f, "paths: {:?}", p.as_ref()),
35 Self::Files(p) => write!(f, "files: {} files", p.len()),
36 Self::Buffers(b) => write!(f, "buffers: {} in-memory-buffers", b.len()),
37 }
38 }
39}
40
41#[derive(Debug, Clone, Copy)]
43pub enum ScanSourceRef<'a> {
44 Path(&'a Path),
45 File(&'a File),
46 Buffer(&'a MemSlice),
47}
48
49pub struct ScanSourceIter<'a> {
51 sources: &'a ScanSources,
52 offset: usize,
53}
54
55impl Default for ScanSources {
56 fn default() -> Self {
57 Self::Paths(Arc::default())
60 }
61}
62
63impl std::hash::Hash for ScanSources {
64 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
65 std::mem::discriminant(self).hash(state);
66
67 match self {
73 Self::Paths(paths) => paths.hash(state),
74 Self::Files(files) => files.as_ptr().hash(state),
75 Self::Buffers(buffers) => buffers.as_ptr().hash(state),
76 }
77 }
78}
79
80impl PartialEq for ScanSources {
81 fn eq(&self, other: &Self) -> bool {
82 match (self, other) {
83 (ScanSources::Paths(l), ScanSources::Paths(r)) => l == r,
84 (ScanSources::Files(l), ScanSources::Files(r)) => std::ptr::eq(l.as_ptr(), r.as_ptr()),
85 (ScanSources::Buffers(l), ScanSources::Buffers(r)) => {
86 std::ptr::eq(l.as_ptr(), r.as_ptr())
87 },
88 _ => false,
89 }
90 }
91}
92
93impl Eq for ScanSources {}
94
95impl ScanSources {
96 pub fn expand_paths(
97 &self,
98 file_options: &FileScanOptions,
99 #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
100 ) -> PolarsResult<Self> {
101 match self {
102 Self::Paths(paths) => Ok(Self::Paths(expand_paths(
103 paths,
104 file_options.glob,
105 cloud_options,
106 )?)),
107 v => Ok(v.clone()),
108 }
109 }
110
111 #[cfg(any(feature = "ipc", feature = "parquet"))]
114 pub fn expand_paths_with_hive_update(
115 &self,
116 file_options: &mut FileScanOptions,
117 #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
118 ) -> PolarsResult<Self> {
119 match self {
120 Self::Paths(paths) => {
121 let (expanded_paths, hive_start_idx) = expand_paths_hive(
122 paths,
123 file_options.glob,
124 cloud_options,
125 file_options.hive_options.enabled.unwrap_or(false),
126 )?;
127
128 if file_options.hive_options.enabled.is_none()
129 && expanded_from_single_directory(paths, expanded_paths.as_ref())
130 {
131 file_options.hive_options.enabled = Some(true);
132 }
133 file_options.hive_options.hive_start_idx = hive_start_idx;
134
135 Ok(Self::Paths(expanded_paths))
136 },
137 v => Ok(v.clone()),
138 }
139 }
140
141 pub fn iter(&self) -> ScanSourceIter {
142 ScanSourceIter {
143 sources: self,
144 offset: 0,
145 }
146 }
147
148 pub fn is_paths(&self) -> bool {
150 matches!(self, Self::Paths(_))
151 }
152
153 pub fn as_paths(&self) -> Option<&[PathBuf]> {
155 match self {
156 Self::Paths(paths) => Some(paths.as_ref()),
157 Self::Files(_) | Self::Buffers(_) => None,
158 }
159 }
160
161 pub fn into_paths(&self) -> Option<Arc<[PathBuf]>> {
163 match self {
164 Self::Paths(paths) => Some(paths.clone()),
165 Self::Files(_) | Self::Buffers(_) => None,
166 }
167 }
168
169 pub fn first_path(&self) -> Option<&Path> {
171 match self {
172 Self::Paths(paths) => paths.first().map(|p| p.as_path()),
173 Self::Files(_) | Self::Buffers(_) => None,
174 }
175 }
176
177 pub fn is_cloud_url(&self) -> bool {
179 self.first_path().is_some_and(polars_io::is_cloud_url)
180 }
181
182 pub fn len(&self) -> usize {
183 match self {
184 Self::Paths(s) => s.len(),
185 Self::Files(s) => s.len(),
186 Self::Buffers(s) => s.len(),
187 }
188 }
189
190 pub fn is_empty(&self) -> bool {
191 self.len() == 0
192 }
193
194 pub fn first(&self) -> Option<ScanSourceRef> {
195 self.get(0)
196 }
197
198 pub fn id(&self) -> PlSmallStr {
200 if self.is_empty() {
201 return PlSmallStr::from_static("EMPTY");
202 }
203
204 match self {
205 Self::Paths(paths) => {
206 PlSmallStr::from_str(paths.first().unwrap().to_string_lossy().as_ref())
207 },
208 Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"),
209 Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"),
210 }
211 }
212
213 pub fn get(&self, idx: usize) -> Option<ScanSourceRef> {
215 match self {
216 Self::Paths(paths) => paths.get(idx).map(|p| ScanSourceRef::Path(p)),
217 Self::Files(files) => files.get(idx).map(ScanSourceRef::File),
218 Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer),
219 }
220 }
221
222 #[track_caller]
228 pub fn at(&self, idx: usize) -> ScanSourceRef {
229 self.get(idx).unwrap()
230 }
231}
232
233impl ScanSourceRef<'_> {
234 pub fn to_include_path_name(&self) -> &str {
236 match self {
237 Self::Path(path) => path.to_str().unwrap(),
238 Self::File(_) => "open-file",
239 Self::Buffer(_) => "in-mem",
240 }
241 }
242
243 pub fn to_memslice(&self) -> PolarsResult<MemSlice> {
245 self.to_memslice_possibly_async(false, None, 0)
246 }
247
248 pub fn to_memslice_async_assume_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
249 match self {
250 ScanSourceRef::Path(path) => {
251 let file = if run_async {
252 feature_gated!("cloud", {
253 polars_io::file_cache::FILE_CACHE
254 .get_entry(path.to_str().unwrap())
255 .unwrap()
257 .try_open_assume_latest()?
258 })
259 } else {
260 polars_utils::open_file(path)?
261 };
262
263 MemSlice::from_file(&file)
264 },
265 ScanSourceRef::File(file) => MemSlice::from_file(file),
266 ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
267 }
268 }
269
270 pub fn to_memslice_possibly_async(
271 &self,
272 run_async: bool,
273 #[cfg(feature = "cloud")] cache_entries: Option<
274 &Vec<Arc<polars_io::file_cache::FileCacheEntry>>,
275 >,
276 #[cfg(not(feature = "cloud"))] cache_entries: Option<&()>,
277 index: usize,
278 ) -> PolarsResult<MemSlice> {
279 match self {
280 Self::Path(path) => {
281 let file = if run_async {
282 feature_gated!("cloud", {
283 cache_entries.unwrap()[index].try_open_check_latest()?
284 })
285 } else {
286 polars_utils::open_file(path)?
287 };
288
289 MemSlice::from_file(&file)
290 },
291 Self::File(file) => MemSlice::from_file(file),
292 Self::Buffer(buff) => Ok((*buff).clone()),
293 }
294 }
295
296 #[cfg(feature = "cloud")]
297 pub async fn to_dyn_byte_source(
298 &self,
299 builder: &DynByteSourceBuilder,
300 cloud_options: Option<&CloudOptions>,
301 ) -> PolarsResult<DynByteSource> {
302 match self {
303 Self::Path(path) => {
304 builder
305 .try_build_from_path(path.to_str().unwrap(), cloud_options)
306 .await
307 },
308 Self::File(file) => Ok(DynByteSource::from(MemSlice::from_file(file)?)),
309 Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),
310 }
311 }
312}
313
314impl<'a> Iterator for ScanSourceIter<'a> {
315 type Item = ScanSourceRef<'a>;
316
317 fn next(&mut self) -> Option<Self::Item> {
318 let item = match self.sources {
319 ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?),
320 ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?),
321 ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?),
322 };
323
324 self.offset += 1;
325 Some(item)
326 }
327
328 fn size_hint(&self) -> (usize, Option<usize>) {
329 let len = self.sources.len() - self.offset;
330 (len, Some(len))
331 }
332}
333
334impl ExactSizeIterator for ScanSourceIter<'_> {}