polars_plan/plans/ir/
scan_sources.rs

1use std::fmt::{Debug, Formatter};
2use std::fs::File;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use polars_core::error::{feature_gated, PolarsResult};
7use polars_io::cloud::CloudOptions;
8#[cfg(feature = "cloud")]
9use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder};
10use polars_io::{expand_paths, expand_paths_hive, expanded_from_single_directory};
11use polars_utils::mmap::MemSlice;
12use polars_utils::pl_str::PlSmallStr;
13
14use super::FileScanOptions;
15
16/// Set of sources to scan from
17///
18/// This can either be a list of paths to files, opened files or in-memory buffers. Mixing of
19/// buffers is not currently possible.
20#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
21#[derive(Clone)]
22pub enum ScanSources {
23    Paths(Arc<[PathBuf]>),
24
25    #[cfg_attr(feature = "serde", serde(skip))]
26    Files(Arc<[File]>),
27    #[cfg_attr(feature = "serde", serde(skip))]
28    Buffers(Arc<[MemSlice]>),
29}
30
31impl Debug for ScanSources {
32    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33        match self {
34            Self::Paths(p) => write!(f, "paths: {:?}", p.as_ref()),
35            Self::Files(p) => write!(f, "files: {} files", p.len()),
36            Self::Buffers(b) => write!(f, "buffers: {} in-memory-buffers", b.len()),
37        }
38    }
39}
40
41/// A reference to a single item in [`ScanSources`]
42#[derive(Debug, Clone, Copy)]
43pub enum ScanSourceRef<'a> {
44    Path(&'a Path),
45    File(&'a File),
46    Buffer(&'a MemSlice),
47}
48
49/// An iterator for [`ScanSources`]
50pub struct ScanSourceIter<'a> {
51    sources: &'a ScanSources,
52    offset: usize,
53}
54
55impl Default for ScanSources {
56    fn default() -> Self {
57        // We need to use `Paths` here to avoid erroring when doing hive-partitioned scans of empty
58        // file lists.
59        Self::Paths(Arc::default())
60    }
61}
62
63impl std::hash::Hash for ScanSources {
64    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
65        std::mem::discriminant(self).hash(state);
66
67        // @NOTE: This is a bit crazy
68        //
69        // We don't really want to hash the file descriptors or the whole buffers so for now we
70        // just settle with the fact that the memory behind Arc's does not really move. Therefore,
71        // we can just hash the pointer.
72        match self {
73            Self::Paths(paths) => paths.hash(state),
74            Self::Files(files) => files.as_ptr().hash(state),
75            Self::Buffers(buffers) => buffers.as_ptr().hash(state),
76        }
77    }
78}
79
80impl PartialEq for ScanSources {
81    fn eq(&self, other: &Self) -> bool {
82        match (self, other) {
83            (ScanSources::Paths(l), ScanSources::Paths(r)) => l == r,
84            (ScanSources::Files(l), ScanSources::Files(r)) => std::ptr::eq(l.as_ptr(), r.as_ptr()),
85            (ScanSources::Buffers(l), ScanSources::Buffers(r)) => {
86                std::ptr::eq(l.as_ptr(), r.as_ptr())
87            },
88            _ => false,
89        }
90    }
91}
92
93impl Eq for ScanSources {}
94
95impl ScanSources {
96    pub fn expand_paths(
97        &self,
98        file_options: &FileScanOptions,
99        #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
100    ) -> PolarsResult<Self> {
101        match self {
102            Self::Paths(paths) => Ok(Self::Paths(expand_paths(
103                paths,
104                file_options.glob,
105                cloud_options,
106            )?)),
107            v => Ok(v.clone()),
108        }
109    }
110
111    /// This will update `file_options.hive_options.enabled` to `true` if the existing value is `None`
112    /// and the paths are expanded from a single directory. Otherwise the existing value is maintained.
113    #[cfg(any(feature = "ipc", feature = "parquet"))]
114    pub fn expand_paths_with_hive_update(
115        &self,
116        file_options: &mut FileScanOptions,
117        #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
118    ) -> PolarsResult<Self> {
119        match self {
120            Self::Paths(paths) => {
121                let (expanded_paths, hive_start_idx) = expand_paths_hive(
122                    paths,
123                    file_options.glob,
124                    cloud_options,
125                    file_options.hive_options.enabled.unwrap_or(false),
126                )?;
127
128                if file_options.hive_options.enabled.is_none()
129                    && expanded_from_single_directory(paths, expanded_paths.as_ref())
130                {
131                    file_options.hive_options.enabled = Some(true);
132                }
133                file_options.hive_options.hive_start_idx = hive_start_idx;
134
135                Ok(Self::Paths(expanded_paths))
136            },
137            v => Ok(v.clone()),
138        }
139    }
140
141    pub fn iter(&self) -> ScanSourceIter {
142        ScanSourceIter {
143            sources: self,
144            offset: 0,
145        }
146    }
147
148    /// Are the sources all paths?
149    pub fn is_paths(&self) -> bool {
150        matches!(self, Self::Paths(_))
151    }
152
153    /// Try cast the scan sources to [`ScanSources::Paths`]
154    pub fn as_paths(&self) -> Option<&[PathBuf]> {
155        match self {
156            Self::Paths(paths) => Some(paths.as_ref()),
157            Self::Files(_) | Self::Buffers(_) => None,
158        }
159    }
160
161    /// Try cast the scan sources to [`ScanSources::Paths`] with a clone
162    pub fn into_paths(&self) -> Option<Arc<[PathBuf]>> {
163        match self {
164            Self::Paths(paths) => Some(paths.clone()),
165            Self::Files(_) | Self::Buffers(_) => None,
166        }
167    }
168
169    /// Try get the first path in the scan sources
170    pub fn first_path(&self) -> Option<&Path> {
171        match self {
172            Self::Paths(paths) => paths.first().map(|p| p.as_path()),
173            Self::Files(_) | Self::Buffers(_) => None,
174        }
175    }
176
177    /// Is the first path a cloud URL?
178    pub fn is_cloud_url(&self) -> bool {
179        self.first_path().is_some_and(polars_io::is_cloud_url)
180    }
181
182    pub fn len(&self) -> usize {
183        match self {
184            Self::Paths(s) => s.len(),
185            Self::Files(s) => s.len(),
186            Self::Buffers(s) => s.len(),
187        }
188    }
189
190    pub fn is_empty(&self) -> bool {
191        self.len() == 0
192    }
193
194    pub fn first(&self) -> Option<ScanSourceRef> {
195        self.get(0)
196    }
197
198    /// Turn the [`ScanSources`] into some kind of identifier
199    pub fn id(&self) -> PlSmallStr {
200        if self.is_empty() {
201            return PlSmallStr::from_static("EMPTY");
202        }
203
204        match self {
205            Self::Paths(paths) => {
206                PlSmallStr::from_str(paths.first().unwrap().to_string_lossy().as_ref())
207            },
208            Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"),
209            Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"),
210        }
211    }
212
213    /// Get the scan source at specific address
214    pub fn get(&self, idx: usize) -> Option<ScanSourceRef> {
215        match self {
216            Self::Paths(paths) => paths.get(idx).map(|p| ScanSourceRef::Path(p)),
217            Self::Files(files) => files.get(idx).map(ScanSourceRef::File),
218            Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer),
219        }
220    }
221
222    /// Get the scan source at specific address
223    ///
224    /// # Panics
225    ///
226    /// If the `idx` is out of range.
227    #[track_caller]
228    pub fn at(&self, idx: usize) -> ScanSourceRef {
229        self.get(idx).unwrap()
230    }
231}
232
233impl ScanSourceRef<'_> {
234    /// Get the name for `include_paths`
235    pub fn to_include_path_name(&self) -> &str {
236        match self {
237            Self::Path(path) => path.to_str().unwrap(),
238            Self::File(_) => "open-file",
239            Self::Buffer(_) => "in-mem",
240        }
241    }
242
243    /// Turn the scan source into a memory slice
244    pub fn to_memslice(&self) -> PolarsResult<MemSlice> {
245        self.to_memslice_possibly_async(false, None, 0)
246    }
247
248    pub fn to_memslice_async_assume_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
249        match self {
250            ScanSourceRef::Path(path) => {
251                let file = if run_async {
252                    feature_gated!("cloud", {
253                        polars_io::file_cache::FILE_CACHE
254                            .get_entry(path.to_str().unwrap())
255                            // Safety: This was initialized by schema inference.
256                            .unwrap()
257                            .try_open_assume_latest()?
258                    })
259                } else {
260                    polars_utils::open_file(path)?
261                };
262
263                MemSlice::from_file(&file)
264            },
265            ScanSourceRef::File(file) => MemSlice::from_file(file),
266            ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
267        }
268    }
269
270    pub fn to_memslice_possibly_async(
271        &self,
272        run_async: bool,
273        #[cfg(feature = "cloud")] cache_entries: Option<
274            &Vec<Arc<polars_io::file_cache::FileCacheEntry>>,
275        >,
276        #[cfg(not(feature = "cloud"))] cache_entries: Option<&()>,
277        index: usize,
278    ) -> PolarsResult<MemSlice> {
279        match self {
280            Self::Path(path) => {
281                let file = if run_async {
282                    feature_gated!("cloud", {
283                        cache_entries.unwrap()[index].try_open_check_latest()?
284                    })
285                } else {
286                    polars_utils::open_file(path)?
287                };
288
289                MemSlice::from_file(&file)
290            },
291            Self::File(file) => MemSlice::from_file(file),
292            Self::Buffer(buff) => Ok((*buff).clone()),
293        }
294    }
295
296    #[cfg(feature = "cloud")]
297    pub async fn to_dyn_byte_source(
298        &self,
299        builder: &DynByteSourceBuilder,
300        cloud_options: Option<&CloudOptions>,
301    ) -> PolarsResult<DynByteSource> {
302        match self {
303            Self::Path(path) => {
304                builder
305                    .try_build_from_path(path.to_str().unwrap(), cloud_options)
306                    .await
307            },
308            Self::File(file) => Ok(DynByteSource::from(MemSlice::from_file(file)?)),
309            Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),
310        }
311    }
312}
313
314impl<'a> Iterator for ScanSourceIter<'a> {
315    type Item = ScanSourceRef<'a>;
316
317    fn next(&mut self) -> Option<Self::Item> {
318        let item = match self.sources {
319            ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?),
320            ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?),
321            ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?),
322        };
323
324        self.offset += 1;
325        Some(item)
326    }
327
328    fn size_hint(&self) -> (usize, Option<usize>) {
329        let len = self.sources.len() - self.offset;
330        (len, Some(len))
331    }
332}
333
334impl ExactSizeIterator for ScanSourceIter<'_> {}