1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
use datafusion_common::{exec_err, DFSchema, Result};
use datafusion_expr::expr::Alias;
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::Expr;
use crate::expressions::column::Column;
use crate::expressions::literal::Literal;
use crate::expressions::CastExpr;
use crate::physical_expr::PhysicalExpr;
use crate::sort_expr::PhysicalSortExpr;
use crate::tree_node::ExprContext;
/// Represents a [`PhysicalExpr`] node with associated properties (order and
/// range) in a context where properties are tracked.
pub type ExprPropertiesNode = ExprContext<ExprProperties>;
impl ExprPropertiesNode {
/// Constructs a new `ExprPropertiesNode` with unknown properties for a
/// given physical expression. This node initializes with default properties
/// and recursively applies this to all child expressions.
pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
let children = expr
.children()
.into_iter()
.cloned()
.map(Self::new_unknown)
.collect();
Self {
expr,
data: ExprProperties::new_unknown(),
children,
}
}
}
/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
///
/// # Arguments
/// * `mask` - Boolean values used to determine where to put the `truthy` values
/// * `truthy` - All values of this array are to scatter according to `mask` into final result.
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
let truthy = truthy.to_data();
// update the mask so that any null values become false
// (SlicesIterator doesn't respect nulls)
let mask = and_kleene(mask, &is_not_null(mask)?)?;
let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
// the SlicesIterator slices only the true values. So the gaps left by this iterator we need to
// fill with falsy values
// keep track of how much is filled
let mut filled = 0;
// keep track of current position we have in truthy array
let mut true_pos = 0;
SlicesIterator::new(&mask).for_each(|(start, end)| {
// the gap needs to be filled with nulls
if start > filled {
mutable.extend_nulls(start - filled);
}
// fill with truthy values
let len = end - start;
mutable.extend(0, true_pos, true_pos + len);
true_pos += len;
filled = end;
});
// the remaining part is falsy
if filled < mask.len() {
mutable.extend_nulls(mask.len() - filled);
}
let data = mutable.freeze();
Ok(make_array(data))
}
/// Reverses the ORDER BY expression, which is useful during equivalent window
/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into
/// 'ORDER BY a DESC, NULLS FIRST'.
pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec<PhysicalSortExpr> {
order_bys
.iter()
.map(|e| PhysicalSortExpr::new(e.expr.clone(), !e.options))
.collect()
}
/// Converts `datafusion_expr::Expr` into corresponding `Arc<dyn PhysicalExpr>`.
/// If conversion is not supported yet, returns Error.
pub fn limited_convert_logical_expr_to_physical_expr_with_dfschema(
expr: &Expr,
dfschema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
match expr {
Expr::Alias(Alias { expr, .. }) => Ok(
limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, dfschema)?,
),
Expr::Column(col) => {
let idx = dfschema.index_of_column(col)?;
Ok(Arc::new(Column::new(&col.name, idx)))
}
Expr::Cast(cast_expr) => Ok(Arc::new(CastExpr::new(
limited_convert_logical_expr_to_physical_expr_with_dfschema(
cast_expr.expr.as_ref(),
dfschema,
)?,
cast_expr.data_type.clone(),
None,
))),
Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
_ => exec_err!(
"Unsupported expression: {expr} for conversion to Arc<dyn PhysicalExpr>"
),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::Int32Array;
use datafusion_common::cast::{as_boolean_array, as_int32_array};
use super::*;
#[test]
fn scatter_int() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
let mask = BooleanArray::from(vec![true, true, false, false, true]);
// the output array is expected to be the same length as the mask array
let expected =
Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_int_end_with_false() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
// output should be same length as mask
let expected =
Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_with_null_mask() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
.into_iter()
.collect();
// output should treat nulls as though they are false
let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_boolean() -> Result<()> {
let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
let mask = BooleanArray::from(vec![true, true, false, false, true]);
// the output array is expected to be the same length as the mask array
let expected = BooleanArray::from_iter(vec![
Some(false),
Some(false),
None,
None,
Some(false),
]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_boolean_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
}