polars_plan/plans/
hive.rs

1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::predicates::{BatchStats, ColumnStats};
5use polars_io::prelude::schema_inference::{finish_infer_field_schema, infer_field_schema};
6#[cfg(feature = "serde")]
7use serde::{Deserialize, Serialize};
8
9#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
10#[derive(Debug, Clone)]
11pub struct HivePartitions {
12    /// Single value Series that can be used to run the predicate against.
13    /// They are to be broadcasted if the predicates don't filter them out.
14    stats: BatchStats,
15}
16
17impl HivePartitions {
18    pub fn get_projection_schema_and_indices(
19        &self,
20        names: &PlHashSet<PlSmallStr>,
21    ) -> (SchemaRef, Vec<usize>) {
22        let mut out_schema = Schema::with_capacity(self.stats.schema().len());
23        let mut out_indices = Vec::with_capacity(self.stats.column_stats().len());
24
25        for (i, cs) in self.stats.column_stats().iter().enumerate() {
26            let name = cs.field_name();
27            if names.contains(name.as_str()) {
28                out_indices.push(i);
29                out_schema
30                    .insert_at_index(out_schema.len(), name.clone(), cs.dtype().clone())
31                    .unwrap();
32            }
33        }
34
35        (out_schema.into(), out_indices)
36    }
37
38    pub fn apply_projection(&mut self, new_schema: SchemaRef, column_indices: &[usize]) {
39        self.stats.with_schema(new_schema);
40        self.stats.take_indices(column_indices);
41    }
42
43    pub fn get_statistics(&self) -> &BatchStats {
44        &self.stats
45    }
46
47    pub(crate) fn schema(&self) -> &SchemaRef {
48        self.get_statistics().schema()
49    }
50
51    pub fn materialize_partition_columns(&self) -> Vec<Series> {
52        self.get_statistics()
53            .column_stats()
54            .iter()
55            .map(|cs| cs.get_min_state().unwrap().clone())
56            .collect()
57    }
58}
59
60/// Note: Returned hive partitions are ordered by their position in the `reader_schema`
61///
62/// # Safety
63/// `hive_start_idx <= [min path length]`
64pub fn hive_partitions_from_paths(
65    paths: &[PathBuf],
66    hive_start_idx: usize,
67    schema: Option<SchemaRef>,
68    reader_schema: &Schema,
69    try_parse_dates: bool,
70) -> PolarsResult<Option<Arc<Vec<HivePartitions>>>> {
71    let Some(path) = paths.first() else {
72        return Ok(None);
73    };
74
75    let sep = separator(path);
76    let path_string = path.to_str().unwrap();
77
78    fn parse_hive_string_and_decode(part: &'_ str) -> Option<(&'_ str, std::borrow::Cow<'_, str>)> {
79        let (k, v) = parse_hive_string(part)?;
80        let v = percent_encoding::percent_decode(v.as_bytes())
81            .decode_utf8()
82            .ok()?;
83
84        Some((k, v))
85    }
86
87    macro_rules! get_hive_parts_iter {
88        ($e:expr) => {{
89            let path_parts = $e[hive_start_idx..].split(sep);
90            let file_index = path_parts.clone().count() - 1;
91
92            path_parts.enumerate().filter_map(move |(index, part)| {
93                if index == file_index {
94                    return None;
95                }
96
97                parse_hive_string_and_decode(part)
98            })
99        }};
100    }
101
102    let hive_schema = if let Some(ref schema) = schema {
103        Arc::new(get_hive_parts_iter!(path_string).map(|(name, _)| {
104                let Some(dtype) = schema.get(name) else {
105                    polars_bail!(
106                        SchemaFieldNotFound:
107                        "path contains column not present in the given Hive schema: {:?}, path = {:?}",
108                        name,
109                        path
110                    )
111                };
112
113                let dtype = if !try_parse_dates && dtype.is_temporal() {
114                    DataType::String
115                } else {
116                    dtype.clone()
117                };
118
119                Ok(Field::new(PlSmallStr::from_str(name), dtype))
120            }).collect::<PolarsResult<Schema>>()?)
121    } else {
122        let mut hive_schema = Schema::with_capacity(16);
123        let mut schema_inference_map: PlHashMap<&str, PlHashSet<DataType>> =
124            PlHashMap::with_capacity(16);
125
126        for (name, _) in get_hive_parts_iter!(path_string) {
127            // If the column is also in the file we can use the dtype stored there.
128            if let Some(dtype) = reader_schema.get(name) {
129                let dtype = if !try_parse_dates && dtype.is_temporal() {
130                    DataType::String
131                } else {
132                    dtype.clone()
133                };
134
135                hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?;
136                continue;
137            }
138
139            hive_schema.insert_at_index(hive_schema.len(), name.into(), DataType::String)?;
140            schema_inference_map.insert(name, PlHashSet::with_capacity(4));
141        }
142
143        if hive_schema.is_empty() && schema_inference_map.is_empty() {
144            return Ok(None);
145        }
146
147        if !schema_inference_map.is_empty() {
148            for path in paths {
149                for (name, value) in get_hive_parts_iter!(path.to_str().unwrap()) {
150                    let Some(entry) = schema_inference_map.get_mut(name) else {
151                        continue;
152                    };
153
154                    if value.is_empty() || value == "__HIVE_DEFAULT_PARTITION__" {
155                        continue;
156                    }
157
158                    entry.insert(infer_field_schema(value.as_ref(), try_parse_dates, false));
159                }
160            }
161
162            for (name, ref possibilities) in schema_inference_map.drain() {
163                let dtype = finish_infer_field_schema(possibilities);
164                *hive_schema.try_get_mut(name).unwrap() = dtype;
165            }
166        }
167        Arc::new(hive_schema)
168    };
169
170    let mut buffers = polars_io::csv::read::buffer::init_buffers(
171        &(0..hive_schema.len()).collect::<Vec<_>>(),
172        paths.len(),
173        hive_schema.as_ref(),
174        None,
175        polars_io::prelude::CsvEncoding::Utf8,
176        false,
177    )?;
178
179    for path in paths {
180        let path = path.to_str().unwrap();
181
182        for (name, value) in get_hive_parts_iter!(path) {
183            let Some(index) = hive_schema.index_of(name) else {
184                polars_bail!(
185                    SchemaFieldNotFound:
186                    "path contains column not present in the given Hive schema: {:?}, path = {:?}",
187                    name,
188                    path
189                )
190            };
191
192            let buf = buffers.get_mut(index).unwrap();
193
194            if !value.is_empty() && value != "__HIVE_DEFAULT_PARTITION__" {
195                buf.add(value.as_bytes(), false, false, false)?;
196            } else {
197                buf.add_null(false);
198            }
199        }
200    }
201
202    let mut hive_partitions = Vec::with_capacity(paths.len());
203    let mut buffers = buffers
204        .into_iter()
205        .map(|x| x.into_series())
206        .collect::<PolarsResult<Vec<_>>>()?;
207
208    buffers.sort_by_key(|s| reader_schema.index_of(s.name()).unwrap_or(usize::MAX));
209
210    #[allow(clippy::needless_range_loop)]
211    for i in 0..paths.len() {
212        let column_stats = buffers
213            .iter()
214            .map(|x| {
215                ColumnStats::from_column_literal(unsafe { x.take_slice_unchecked(&[i as IdxSize]) })
216            })
217            .collect::<Vec<_>>();
218
219        if column_stats.is_empty() {
220            polars_bail!(
221                ComputeError: "expected Hive partitioned path, got {}\n\n\
222                This error occurs if some paths are Hive partitioned and some paths are not.",
223                paths[i].to_str().unwrap(),
224            )
225        }
226
227        let stats = BatchStats::new(hive_schema.clone(), column_stats, None);
228        hive_partitions.push(HivePartitions { stats });
229    }
230
231    Ok(Some(Arc::from(hive_partitions)))
232}
233
234/// Determine the path separator for identifying Hive partitions.
235fn separator(url: &Path) -> &[char] {
236    if cfg!(target_family = "windows") {
237        if polars_io::path_utils::is_cloud_url(url) {
238            &['/']
239        } else {
240            &['/', '\\']
241        }
242    } else {
243        &['/']
244    }
245}
246
247/// Parse a Hive partition string (e.g. "column=1.5") into a name and value part.
248///
249/// Returns `None` if the string is not a Hive partition string.
250fn parse_hive_string(part: &'_ str) -> Option<(&'_ str, &'_ str)> {
251    let mut it = part.split('=');
252    let name = it.next()?;
253    let value = it.next()?;
254
255    // Having multiple '=' doesn't seem like a valid Hive partition.
256    if it.next().is_some() {
257        return None;
258    }
259
260    // Files are not Hive partitions, so globs are not valid.
261    if value.contains('*') {
262        return None;
263    };
264
265    Some((name, value))
266}