datafusion_physical_expr/
physical_expr.rsuse std::sync::Arc;
pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use itertools::izip;
pub use datafusion_physical_expr_common::physical_expr::down_cast_any_ref;
pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
pub fn physical_exprs_contains(
physical_exprs: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
physical_exprs
.iter()
.any(|physical_expr| physical_expr.eq(expr))
}
pub fn physical_exprs_equal(
lhs: &[Arc<dyn PhysicalExpr>],
rhs: &[Arc<dyn PhysicalExpr>],
) -> bool {
lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
}
pub fn physical_exprs_bag_equal(
lhs: &[Arc<dyn PhysicalExpr>],
rhs: &[Arc<dyn PhysicalExpr>],
) -> bool {
if lhs.len() == rhs.len() {
let mut rhs_vec = rhs.to_vec();
for expr in lhs {
if let Some(idx) = rhs_vec.iter().position(|e| expr.eq(e)) {
rhs_vec.swap_remove(idx);
} else {
return false;
}
}
true
} else {
false
}
}
pub fn deduplicate_physical_exprs(exprs: &mut Vec<Arc<dyn PhysicalExpr>>) {
let mut idx = 0;
while idx < exprs.len() {
let mut rest_idx = idx + 1;
while rest_idx < exprs.len() {
if exprs[idx].eq(&exprs[rest_idx]) {
exprs.swap_remove(rest_idx);
} else {
rest_idx += 1;
}
}
idx += 1;
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::expressions::{Column, Literal};
use crate::physical_expr::{
deduplicate_physical_exprs, physical_exprs_bag_equal, physical_exprs_contains,
physical_exprs_equal, PhysicalExpr,
};
use datafusion_common::ScalarValue;
#[test]
fn test_physical_exprs_contains() {
let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
as Arc<dyn PhysicalExpr>;
let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
as Arc<dyn PhysicalExpr>;
let lit4 =
Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
let lit2 =
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
let lit1 =
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::clone(&lit_true),
Arc::clone(&lit_false),
Arc::clone(&lit4),
Arc::clone(&lit2),
Arc::clone(&col_a_expr),
Arc::clone(&col_b_expr),
];
assert!(physical_exprs_contains(&physical_exprs, &lit_true));
assert!(physical_exprs_contains(&physical_exprs, &lit2));
assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
assert!(!physical_exprs_contains(&physical_exprs, &lit1));
}
#[test]
fn test_physical_exprs_equal() {
let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
as Arc<dyn PhysicalExpr>;
let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
as Arc<dyn PhysicalExpr>;
let lit1 =
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
let lit2 =
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
assert!(physical_exprs_equal(&vec1, &vec1));
assert!(physical_exprs_equal(&vec1, &vec4));
assert!(physical_exprs_bag_equal(&vec1, &vec1));
assert!(physical_exprs_bag_equal(&vec1, &vec4));
assert!(!physical_exprs_equal(&vec1, &vec2));
assert!(!physical_exprs_equal(&vec1, &vec3));
assert!(!physical_exprs_bag_equal(&vec1, &vec2));
assert!(!physical_exprs_bag_equal(&vec1, &vec3));
}
#[test]
fn test_physical_exprs_set_equal() {
let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
];
let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("a", 0)),
];
assert!(!physical_exprs_bag_equal(
list1.as_slice(),
list2.as_slice()
));
assert!(!physical_exprs_bag_equal(
list2.as_slice(),
list1.as_slice()
));
assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("c", 2)),
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
];
let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("c", 2)),
Arc::new(Column::new("a", 0)),
];
assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
}
#[test]
fn test_deduplicate_physical_exprs() {
let lit_true = &(Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
as Arc<dyn PhysicalExpr>);
let lit_false = &(Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
as Arc<dyn PhysicalExpr>);
let lit4 = &(Arc::new(Literal::new(ScalarValue::Int32(Some(4))))
as Arc<dyn PhysicalExpr>);
let lit2 = &(Arc::new(Literal::new(ScalarValue::Int32(Some(2))))
as Arc<dyn PhysicalExpr>);
let col_a_expr = &(Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>);
let col_b_expr = &(Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>);
let test_cases = vec![
(
vec![
lit_true, lit_false, lit4, lit2, col_a_expr, col_a_expr, col_b_expr,
lit_true, lit2,
],
vec![lit_true, lit_false, lit4, lit2, col_a_expr, col_b_expr],
),
(
vec![lit_true, lit_true, lit_false, lit4],
vec![lit_true, lit4, lit_false],
),
];
for (exprs, expected) in test_cases {
let mut exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
let expected = expected.into_iter().cloned().collect::<Vec<_>>();
deduplicate_physical_exprs(&mut exprs);
assert!(physical_exprs_equal(&exprs, &expected));
}
}
}