polars_plan/plans/
hive.rs1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::predicates::{BatchStats, ColumnStats};
5use polars_io::prelude::schema_inference::{finish_infer_field_schema, infer_field_schema};
6#[cfg(feature = "serde")]
7use serde::{Deserialize, Serialize};
8
9#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
10#[derive(Debug, Clone)]
11pub struct HivePartitions {
12 stats: BatchStats,
15}
16
17impl HivePartitions {
18 pub fn get_projection_schema_and_indices(
19 &self,
20 names: &PlHashSet<PlSmallStr>,
21 ) -> (SchemaRef, Vec<usize>) {
22 let mut out_schema = Schema::with_capacity(self.stats.schema().len());
23 let mut out_indices = Vec::with_capacity(self.stats.column_stats().len());
24
25 for (i, cs) in self.stats.column_stats().iter().enumerate() {
26 let name = cs.field_name();
27 if names.contains(name.as_str()) {
28 out_indices.push(i);
29 out_schema
30 .insert_at_index(out_schema.len(), name.clone(), cs.dtype().clone())
31 .unwrap();
32 }
33 }
34
35 (out_schema.into(), out_indices)
36 }
37
38 pub fn apply_projection(&mut self, new_schema: SchemaRef, column_indices: &[usize]) {
39 self.stats.with_schema(new_schema);
40 self.stats.take_indices(column_indices);
41 }
42
43 pub fn get_statistics(&self) -> &BatchStats {
44 &self.stats
45 }
46
47 pub(crate) fn schema(&self) -> &SchemaRef {
48 self.get_statistics().schema()
49 }
50
51 pub fn materialize_partition_columns(&self) -> Vec<Series> {
52 self.get_statistics()
53 .column_stats()
54 .iter()
55 .map(|cs| cs.get_min_state().unwrap().clone())
56 .collect()
57 }
58}
59
60pub fn hive_partitions_from_paths(
65 paths: &[PathBuf],
66 hive_start_idx: usize,
67 schema: Option<SchemaRef>,
68 reader_schema: &Schema,
69 try_parse_dates: bool,
70) -> PolarsResult<Option<Arc<Vec<HivePartitions>>>> {
71 let Some(path) = paths.first() else {
72 return Ok(None);
73 };
74
75 let sep = separator(path);
76 let path_string = path.to_str().unwrap();
77
78 fn parse_hive_string_and_decode(part: &'_ str) -> Option<(&'_ str, std::borrow::Cow<'_, str>)> {
79 let (k, v) = parse_hive_string(part)?;
80 let v = percent_encoding::percent_decode(v.as_bytes())
81 .decode_utf8()
82 .ok()?;
83
84 Some((k, v))
85 }
86
87 macro_rules! get_hive_parts_iter {
88 ($e:expr) => {{
89 let path_parts = $e[hive_start_idx..].split(sep);
90 let file_index = path_parts.clone().count() - 1;
91
92 path_parts.enumerate().filter_map(move |(index, part)| {
93 if index == file_index {
94 return None;
95 }
96
97 parse_hive_string_and_decode(part)
98 })
99 }};
100 }
101
102 let hive_schema = if let Some(ref schema) = schema {
103 Arc::new(get_hive_parts_iter!(path_string).map(|(name, _)| {
104 let Some(dtype) = schema.get(name) else {
105 polars_bail!(
106 SchemaFieldNotFound:
107 "path contains column not present in the given Hive schema: {:?}, path = {:?}",
108 name,
109 path
110 )
111 };
112
113 let dtype = if !try_parse_dates && dtype.is_temporal() {
114 DataType::String
115 } else {
116 dtype.clone()
117 };
118
119 Ok(Field::new(PlSmallStr::from_str(name), dtype))
120 }).collect::<PolarsResult<Schema>>()?)
121 } else {
122 let mut hive_schema = Schema::with_capacity(16);
123 let mut schema_inference_map: PlHashMap<&str, PlHashSet<DataType>> =
124 PlHashMap::with_capacity(16);
125
126 for (name, _) in get_hive_parts_iter!(path_string) {
127 if let Some(dtype) = reader_schema.get(name) {
129 let dtype = if !try_parse_dates && dtype.is_temporal() {
130 DataType::String
131 } else {
132 dtype.clone()
133 };
134
135 hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?;
136 continue;
137 }
138
139 hive_schema.insert_at_index(hive_schema.len(), name.into(), DataType::String)?;
140 schema_inference_map.insert(name, PlHashSet::with_capacity(4));
141 }
142
143 if hive_schema.is_empty() && schema_inference_map.is_empty() {
144 return Ok(None);
145 }
146
147 if !schema_inference_map.is_empty() {
148 for path in paths {
149 for (name, value) in get_hive_parts_iter!(path.to_str().unwrap()) {
150 let Some(entry) = schema_inference_map.get_mut(name) else {
151 continue;
152 };
153
154 if value.is_empty() || value == "__HIVE_DEFAULT_PARTITION__" {
155 continue;
156 }
157
158 entry.insert(infer_field_schema(value.as_ref(), try_parse_dates, false));
159 }
160 }
161
162 for (name, ref possibilities) in schema_inference_map.drain() {
163 let dtype = finish_infer_field_schema(possibilities);
164 *hive_schema.try_get_mut(name).unwrap() = dtype;
165 }
166 }
167 Arc::new(hive_schema)
168 };
169
170 let mut buffers = polars_io::csv::read::buffer::init_buffers(
171 &(0..hive_schema.len()).collect::<Vec<_>>(),
172 paths.len(),
173 hive_schema.as_ref(),
174 None,
175 polars_io::prelude::CsvEncoding::Utf8,
176 false,
177 )?;
178
179 for path in paths {
180 let path = path.to_str().unwrap();
181
182 for (name, value) in get_hive_parts_iter!(path) {
183 let Some(index) = hive_schema.index_of(name) else {
184 polars_bail!(
185 SchemaFieldNotFound:
186 "path contains column not present in the given Hive schema: {:?}, path = {:?}",
187 name,
188 path
189 )
190 };
191
192 let buf = buffers.get_mut(index).unwrap();
193
194 if !value.is_empty() && value != "__HIVE_DEFAULT_PARTITION__" {
195 buf.add(value.as_bytes(), false, false, false)?;
196 } else {
197 buf.add_null(false);
198 }
199 }
200 }
201
202 let mut hive_partitions = Vec::with_capacity(paths.len());
203 let mut buffers = buffers
204 .into_iter()
205 .map(|x| x.into_series())
206 .collect::<PolarsResult<Vec<_>>>()?;
207
208 buffers.sort_by_key(|s| reader_schema.index_of(s.name()).unwrap_or(usize::MAX));
209
210 #[allow(clippy::needless_range_loop)]
211 for i in 0..paths.len() {
212 let column_stats = buffers
213 .iter()
214 .map(|x| {
215 ColumnStats::from_column_literal(unsafe { x.take_slice_unchecked(&[i as IdxSize]) })
216 })
217 .collect::<Vec<_>>();
218
219 if column_stats.is_empty() {
220 polars_bail!(
221 ComputeError: "expected Hive partitioned path, got {}\n\n\
222 This error occurs if some paths are Hive partitioned and some paths are not.",
223 paths[i].to_str().unwrap(),
224 )
225 }
226
227 let stats = BatchStats::new(hive_schema.clone(), column_stats, None);
228 hive_partitions.push(HivePartitions { stats });
229 }
230
231 Ok(Some(Arc::from(hive_partitions)))
232}
233
234fn separator(url: &Path) -> &[char] {
236 if cfg!(target_family = "windows") {
237 if polars_io::path_utils::is_cloud_url(url) {
238 &['/']
239 } else {
240 &['/', '\\']
241 }
242 } else {
243 &['/']
244 }
245}
246
247fn parse_hive_string(part: &'_ str) -> Option<(&'_ str, &'_ str)> {
251 let mut it = part.split('=');
252 let name = it.next()?;
253 let value = it.next()?;
254
255 if it.next().is_some() {
257 return None;
258 }
259
260 if value.contains('*') {
262 return None;
263 };
264
265 Some((name, value))
266}