lance_encoding_datafusion/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use arrow_schema::DataType;
use lance_core::datatypes::{Field, Schema};
use lance_encoding::encoder::{
default_encoding_strategy, ColumnIndexSequence, EncodingOptions, FieldEncodingStrategy,
};
use lance_file::version::LanceFileVersion;
use zone::{UnloadedPushdown, ZoneMapsFieldEncoder};
pub mod format;
pub mod substrait;
pub mod zone;
#[derive(Debug)]
struct LanceDfFieldDecoderState {
/// We assume that all columns have the same number of rows per map
#[allow(unused)]
rows_per_map: Option<u32>,
/// As we visit the decoding tree we populate this with the pushdown
/// information that is available.
#[allow(unused)]
zone_map_buffers: HashMap<u32, UnloadedPushdown>,
}
/// This strategy is responsible for creating the field scheduler
/// that handles the pushdown filtering. It is a top-level scheduler
/// that uses column info from various leaf schedulers.
///
/// The current implementation is a bit of a hack. It assumes that
/// the decoder strategy will only be used once. The very first time
/// that create_field_scheduler is called, we assume we are at the root.
///
/// Field decoding strategies are supposed to be stateless but this one
/// is not. As a result, we use a mutex to gather the state even though
/// we aren't technically doing any concurrency.
#[derive(Debug)]
pub struct LanceDfFieldDecoderStrategy {
#[allow(unused)]
state: Arc<Mutex<Option<LanceDfFieldDecoderState>>>,
#[allow(unused)]
schema: Arc<Schema>,
}
impl LanceDfFieldDecoderStrategy {
pub fn new(schema: Arc<Schema>) -> Self {
Self {
state: Arc::new(Mutex::new(None)),
schema,
}
}
#[allow(unused)]
fn initialize(&self) -> bool {
let mut state = self.state.lock().unwrap();
if state.is_none() {
*state = Some(LanceDfFieldDecoderState {
rows_per_map: None,
zone_map_buffers: HashMap::new(),
});
true
} else {
false
}
}
#[allow(unused)]
fn add_pushdown_field(
&self,
field: &Field,
rows_per_map: u32,
unloaded_pushdown: UnloadedPushdown,
) {
let mut state = self.state.lock().unwrap();
let state = state.as_mut().unwrap();
match state.rows_per_map {
Some(existing) if existing != rows_per_map => {
panic!("Inconsistent rows per map");
}
_ => {
state.rows_per_map = Some(rows_per_map);
}
}
state
.zone_map_buffers
.insert(field.id as u32, unloaded_pushdown);
}
}
// TODO: Reconnect...again
// impl FieldDecoderStrategy for LanceDfFieldDecoderStrategy {
// fn create_field_scheduler<'a>(
// &self,
// field: &Field,
// column_infos: &mut ColumnInfoIter,
// buffers: FileBuffers,
// chain: DecoderMiddlewareChainCursor<'a>,
// ) -> Result<(
// DecoderMiddlewareChainCursor<'a>,
// Result<Box<dyn FieldScheduler>>,
// )> {
// let is_root = self.initialize();
// if let Some((rows_per_map, unloaded_pushdown)) =
// extract_zone_info(column_infos, &field.data_type(), chain.current_path())
// {
// // If there is pushdown info then record it and unwrap the
// // pushdown encoding layer.
// self.add_pushdown_field(field, rows_per_map, unloaded_pushdown);
// }
// // Delegate to the rest of the chain to create the decoder
// let (chain, next) = chain.next(field, column_infos, buffers)?;
// // If this is the top level decoder then wrap it with our
// // pushdown filtering scheduler.
// if is_root {
// let state = self.state.lock().unwrap().take().unwrap();
// let schema = self.schema.clone();
// let rows_per_map = state.rows_per_map;
// let zone_map_buffers = state.zone_map_buffers;
// let next = next?;
// let num_rows = next.num_rows();
// if rows_per_map.is_none() {
// // No columns had any pushdown info
// Ok((chain, Ok(next)))
// } else {
// let scheduler = ZoneMapsFieldScheduler::new(
// next.into(),
// schema,
// zone_map_buffers,
// rows_per_map.unwrap(),
// num_rows,
// );
// Ok((chain, Ok(Box::new(scheduler))))
// }
// } else {
// Ok((chain, next))
// }
// }
// }
/// Wraps the core encoding strategy and adds the encoders from this
/// crate
#[derive(Debug)]
pub struct LanceDfFieldEncodingStrategy {
inner: Box<dyn FieldEncodingStrategy>,
rows_per_map: u32,
}
impl Default for LanceDfFieldEncodingStrategy {
fn default() -> Self {
Self {
inner: default_encoding_strategy(LanceFileVersion::default()),
rows_per_map: 10000,
}
}
}
impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy {
fn create_field_encoder(
&self,
encoding_strategy_root: &dyn FieldEncodingStrategy,
field: &lance_core::datatypes::Field,
column_index: &mut ColumnIndexSequence,
options: &EncodingOptions,
) -> lance_core::Result<Box<dyn lance_encoding::encoder::FieldEncoder>> {
let data_type = field.data_type();
if data_type.is_primitive()
|| matches!(
data_type,
DataType::Boolean | DataType::Utf8 | DataType::LargeUtf8
)
{
let inner_encoder = self.inner.create_field_encoder(
// Don't collect stats on inner string fields
self.inner.as_ref(),
field,
column_index,
options,
)?;
Ok(Box::new(ZoneMapsFieldEncoder::try_new(
inner_encoder,
data_type,
self.rows_per_map,
)?))
} else {
self.inner
.create_field_encoder(encoding_strategy_root, field, column_index, options)
}
}
}