polars_pipe/operators/chunks.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use super::*;
#[derive(Clone, Debug)]
pub struct DataChunk {
pub chunk_index: IdxSize,
pub data: DataFrame,
}
impl DataChunk {
pub(crate) fn new(chunk_index: IdxSize, data: DataFrame) -> Self {
// Check the invariant that all columns have a single chunk.
#[cfg(debug_assertions)]
{
for c in data.get_columns() {
assert_eq!(c.as_materialized_series().chunks().len(), 1);
}
}
Self { chunk_index, data }
}
pub(crate) fn with_data(&self, data: DataFrame) -> Self {
Self::new(self.chunk_index, data)
}
pub(crate) fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
pub(crate) fn chunks_to_df_unchecked(chunks: Vec<DataChunk>) -> DataFrame {
accumulate_dataframes_vertical_unchecked(chunks.into_iter().map(|c| c.data))
}
/// Combine a series of `DataFrame`s, and if they're small enough, combine them
/// into larger `DataFrame`s using `vstack`. This allows the caller to turn them
/// into contiguous memory allocations so that we don't suffer from overhead of
/// many small writes. The assumption is that added `DataFrame`s are already in
/// the correct order, and can therefore be combined.
///
/// The benefit of having a series of `DataFrame` that are e.g. 4MB each that
/// are then made contiguous is that you're not using a lot of memory (an extra
/// 4MB), but you're still doing better than if you had a series of 2KB
/// `DataFrame`s.
///
/// Changing the `DataFrame` into contiguous chunks is the caller's
/// responsibility.
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
#[derive(Clone)]
pub(crate) struct StreamingVstacker {
current_dataframe: Option<DataFrame>,
/// How big should resulting chunks be, if possible?
output_chunk_size: usize,
}
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
impl StreamingVstacker {
/// Create a new instance.
pub fn new(output_chunk_size: usize) -> Self {
Self {
current_dataframe: None,
output_chunk_size,
}
}
/// Add another `DataFrame`, return (potentially combined) `DataFrame`s that
/// result, if any.
pub fn add(&mut self, next_frame: DataFrame) -> impl Iterator<Item = DataFrame> {
let mut result: [Option<DataFrame>; 2] = [None, None];
// If the next chunk is too large, we probably don't want make copies of
// it if a caller does as_single_chunk(), so we flush in advance.
if self.current_dataframe.is_some()
&& next_frame.estimated_size() > self.output_chunk_size / 4
{
result[0] = self.flush();
}
if let Some(ref mut current_frame) = self.current_dataframe {
current_frame
.vstack_mut(&next_frame)
.expect("These are chunks from the same dataframe");
} else {
self.current_dataframe = Some(next_frame);
};
if self.current_dataframe.as_ref().unwrap().estimated_size() > self.output_chunk_size {
result[1] = self.flush();
}
result.into_iter().flatten()
}
/// Clear and return any cached `DataFrame` data.
#[must_use]
fn flush(&mut self) -> Option<DataFrame> {
std::mem::take(&mut self.current_dataframe)
}
/// Finish and return any remaining cached `DataFrame` data. The only way
/// that `SemicontiguousVstacker` should be cleaned up.
#[must_use]
pub fn finish(mut self) -> Option<DataFrame> {
self.flush()
}
}
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
impl Default for StreamingVstacker {
/// 4 MB was chosen based on some empirical experiments that showed it to
/// be decently faster than lower or higher values, and it's small enough
/// it won't impact memory usage significantly.
fn default() -> Self {
StreamingVstacker::new(4 * 1024 * 1024)
}
}
#[cfg(test)]
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
mod test {
use super::*;
/// DataFrames get merged into chunks that are bigger than the specified
/// size when possible.
#[test]
fn semicontiguous_vstacker_merges() {
let test = semicontiguous_vstacker_merges_impl;
test(vec![10]);
test(vec![10, 10, 10, 10, 10, 10, 10]);
test(vec![10, 40, 10, 10, 10, 10]);
test(vec![40, 10, 10, 40, 10, 10, 40]);
test(vec![50, 50, 50]);
}
/// Eventually would be nice to drive this with proptest.
fn semicontiguous_vstacker_merges_impl(df_lengths: Vec<usize>) {
// Convert the lengths into a series of DataFrames:
let mut vstacker = StreamingVstacker::new(4096);
let dfs: Vec<DataFrame> = df_lengths
.iter()
.enumerate()
.map(|(i, length)| {
let series = Column::new("val".into(), vec![i as u64; *length]);
DataFrame::new(vec![series]).unwrap()
})
.collect();
// Combine the DataFrames using a SemicontiguousVstacker:
let mut results = vec![];
for (i, df) in dfs.iter().enumerate() {
for mut result_df in vstacker.add(df.clone()) {
result_df.as_single_chunk();
results.push((i, result_df));
}
}
if let Some(mut result_df) = vstacker.finish() {
result_df.as_single_chunk();
results.push((df_lengths.len() - 1, result_df));
}
// Make sure the lengths are as sufficiently large, and the chunks
// were merged, the whole point of the exercise:
for (original_idx, result_df) in &results {
if result_df.height() < 40 {
// This means either this was the last df, or the next one
// was big enough we decided not to aggregate.
if *original_idx < results.len() - 1 {
assert!(dfs[original_idx + 1].height() > 10);
}
}
// Make sure all result DataFrames only have a single chunk.
assert_eq!(
result_df.get_columns()[0]
.as_materialized_series()
.chunk_lengths()
.len(),
1
);
}
// Make sure the data was preserved:
assert_eq!(
accumulate_dataframes_vertical_unchecked(dfs.into_iter()),
accumulate_dataframes_vertical_unchecked(results.into_iter().map(|(_, df)| df)),
);
}
}