lance_encoding_datafusion/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
};

use arrow_schema::DataType;
use lance_core::datatypes::{Field, Schema};
use lance_encoding::encoder::{
    default_encoding_strategy, ColumnIndexSequence, EncodingOptions, FieldEncodingStrategy,
};
use lance_file::version::LanceFileVersion;
use zone::{UnloadedPushdown, ZoneMapsFieldEncoder};

pub mod format;
pub mod substrait;
pub mod zone;

#[derive(Debug)]
struct LanceDfFieldDecoderState {
    /// We assume that all columns have the same number of rows per map
    #[allow(unused)]
    rows_per_map: Option<u32>,
    /// As we visit the decoding tree we populate this with the pushdown
    /// information that is available.
    #[allow(unused)]
    zone_map_buffers: HashMap<u32, UnloadedPushdown>,
}

/// This strategy is responsible for creating the field scheduler
/// that handles the pushdown filtering.  It is a top-level scheduler
/// that uses column info from various leaf schedulers.
///
/// The current implementation is a bit of a hack.  It assumes that
/// the decoder strategy will only be used once.  The very first time
/// that create_field_scheduler is called, we assume we are at the root.
///
/// Field decoding strategies are supposed to be stateless but this one
/// is not.  As a result, we use a mutex to gather the state even though
/// we aren't technically doing any concurrency.
#[derive(Debug)]
pub struct LanceDfFieldDecoderStrategy {
    #[allow(unused)]
    state: Arc<Mutex<Option<LanceDfFieldDecoderState>>>,
    #[allow(unused)]
    schema: Arc<Schema>,
}

impl LanceDfFieldDecoderStrategy {
    pub fn new(schema: Arc<Schema>) -> Self {
        Self {
            state: Arc::new(Mutex::new(None)),
            schema,
        }
    }

    #[allow(unused)]
    fn initialize(&self) -> bool {
        let mut state = self.state.lock().unwrap();
        if state.is_none() {
            *state = Some(LanceDfFieldDecoderState {
                rows_per_map: None,
                zone_map_buffers: HashMap::new(),
            });
            true
        } else {
            false
        }
    }

    #[allow(unused)]
    fn add_pushdown_field(
        &self,
        field: &Field,
        rows_per_map: u32,
        unloaded_pushdown: UnloadedPushdown,
    ) {
        let mut state = self.state.lock().unwrap();
        let state = state.as_mut().unwrap();
        match state.rows_per_map {
            Some(existing) if existing != rows_per_map => {
                panic!("Inconsistent rows per map");
            }
            _ => {
                state.rows_per_map = Some(rows_per_map);
            }
        }
        state
            .zone_map_buffers
            .insert(field.id as u32, unloaded_pushdown);
    }
}

// TODO: Reconnect...again
// impl FieldDecoderStrategy for LanceDfFieldDecoderStrategy {
//     fn create_field_scheduler<'a>(
//         &self,
//         field: &Field,
//         column_infos: &mut ColumnInfoIter,
//         buffers: FileBuffers,
//         chain: DecoderMiddlewareChainCursor<'a>,
//     ) -> Result<(
//         DecoderMiddlewareChainCursor<'a>,
//         Result<Box<dyn FieldScheduler>>,
//     )> {
//         let is_root = self.initialize();

//         if let Some((rows_per_map, unloaded_pushdown)) =
//             extract_zone_info(column_infos, &field.data_type(), chain.current_path())
//         {
//             // If there is pushdown info then record it and unwrap the
//             // pushdown encoding layer.
//             self.add_pushdown_field(field, rows_per_map, unloaded_pushdown);
//         }
//         // Delegate to the rest of the chain to create the decoder
//         let (chain, next) = chain.next(field, column_infos, buffers)?;

//         // If this is the top level decoder then wrap it with our
//         // pushdown filtering scheduler.
//         if is_root {
//             let state = self.state.lock().unwrap().take().unwrap();
//             let schema = self.schema.clone();
//             let rows_per_map = state.rows_per_map;
//             let zone_map_buffers = state.zone_map_buffers;
//             let next = next?;
//             let num_rows = next.num_rows();
//             if rows_per_map.is_none() {
//                 // No columns had any pushdown info
//                 Ok((chain, Ok(next)))
//             } else {
//                 let scheduler = ZoneMapsFieldScheduler::new(
//                     next.into(),
//                     schema,
//                     zone_map_buffers,
//                     rows_per_map.unwrap(),
//                     num_rows,
//                 );
//                 Ok((chain, Ok(Box::new(scheduler))))
//             }
//         } else {
//             Ok((chain, next))
//         }
//     }
// }

/// Wraps the core encoding strategy and adds the encoders from this
/// crate
#[derive(Debug)]
pub struct LanceDfFieldEncodingStrategy {
    inner: Box<dyn FieldEncodingStrategy>,
    rows_per_map: u32,
}

impl Default for LanceDfFieldEncodingStrategy {
    fn default() -> Self {
        Self {
            inner: default_encoding_strategy(LanceFileVersion::default()),
            rows_per_map: 10000,
        }
    }
}

impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy {
    fn create_field_encoder(
        &self,
        encoding_strategy_root: &dyn FieldEncodingStrategy,
        field: &lance_core::datatypes::Field,
        column_index: &mut ColumnIndexSequence,
        options: &EncodingOptions,
    ) -> lance_core::Result<Box<dyn lance_encoding::encoder::FieldEncoder>> {
        let data_type = field.data_type();
        if data_type.is_primitive()
            || matches!(
                data_type,
                DataType::Boolean | DataType::Utf8 | DataType::LargeUtf8
            )
        {
            let inner_encoder = self.inner.create_field_encoder(
                // Don't collect stats on inner string fields
                self.inner.as_ref(),
                field,
                column_index,
                options,
            )?;
            Ok(Box::new(ZoneMapsFieldEncoder::try_new(
                inner_encoder,
                data_type,
                self.rows_per_map,
            )?))
        } else {
            self.inner
                .create_field_encoder(encoding_strategy_root, field, column_index, options)
        }
    }
}