datafusion_physical_expr/expressions/
column.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical column reference: [`Column`]
19
20use std::any::Any;
21use std::hash::Hash;
22use std::sync::Arc;
23
24use crate::physical_expr::PhysicalExpr;
25use arrow::{
26    datatypes::{DataType, Schema, SchemaRef},
27    record_batch::RecordBatch,
28};
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_common::{internal_err, plan_err, Result};
31use datafusion_expr::ColumnarValue;
32
33/// Represents the column at a given index in a RecordBatch
34///
35/// This is a physical expression that represents a column at a given index in an
36/// arrow [`Schema`] / [`RecordBatch`].
37///
38/// Unlike the [logical `Expr::Column`], this expression is always resolved by schema index,
39/// even though it does have a name. This is because the physical plan is always
40/// resolved to a specific schema and there is no concept of "relation"
41///
42/// # Example:
43///  If the schema is `a`, `b`, `c` the `Column` for `b` would be represented by
44///  index 1, since `b` is the second column in the schema.
45///
46/// ```
47/// # use datafusion_physical_expr::expressions::Column;
48/// # use arrow::datatypes::{DataType, Field, Schema};
49/// // Schema with columns a, b, c
50/// let schema = Schema::new(vec![
51///    Field::new("a", DataType::Int32, false),
52///    Field::new("b", DataType::Int32, false),
53///    Field::new("c", DataType::Int32, false),
54/// ]);
55///
56/// // reference to column b is index 1
57/// let column_b = Column::new_with_schema("b", &schema).unwrap();
58/// assert_eq!(column_b.index(), 1);
59///
60/// // reference to column c is index 2
61/// let column_c = Column::new_with_schema("c", &schema).unwrap();
62/// assert_eq!(column_c.index(), 2);
63/// ```
64/// [logical `Expr::Column`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html#variant.Column
65#[derive(Debug, Hash, PartialEq, Eq, Clone)]
66pub struct Column {
67    /// The name of the column (used for debugging and display purposes)
68    name: String,
69    /// The index of the column in its schema
70    index: usize,
71}
72
73impl Column {
74    /// Create a new column expression which references the
75    /// column with the given index in the schema.
76    pub fn new(name: &str, index: usize) -> Self {
77        Self {
78            name: name.to_owned(),
79            index,
80        }
81    }
82
83    /// Create a new column expression which references the
84    /// column with the given name in the schema
85    pub fn new_with_schema(name: &str, schema: &Schema) -> Result<Self> {
86        Ok(Column::new(name, schema.index_of(name)?))
87    }
88
89    /// Get the column's name
90    pub fn name(&self) -> &str {
91        &self.name
92    }
93
94    /// Get the column's schema index
95    pub fn index(&self) -> usize {
96        self.index
97    }
98}
99
100impl std::fmt::Display for Column {
101    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
102        write!(f, "{}@{}", self.name, self.index)
103    }
104}
105
106impl PhysicalExpr for Column {
107    /// Return a reference to Any that can be used for downcasting
108    fn as_any(&self) -> &dyn Any {
109        self
110    }
111
112    /// Get the data type of this expression, given the schema of the input
113    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
114        self.bounds_check(input_schema)?;
115        Ok(input_schema.field(self.index).data_type().clone())
116    }
117
118    /// Decide whether this expression is nullable, given the schema of the input
119    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
120        self.bounds_check(input_schema)?;
121        Ok(input_schema.field(self.index).is_nullable())
122    }
123
124    /// Evaluate the expression
125    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
126        self.bounds_check(batch.schema().as_ref())?;
127        Ok(ColumnarValue::Array(Arc::clone(batch.column(self.index))))
128    }
129
130    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
131        vec![]
132    }
133
134    fn with_new_children(
135        self: Arc<Self>,
136        _children: Vec<Arc<dyn PhysicalExpr>>,
137    ) -> Result<Arc<dyn PhysicalExpr>> {
138        Ok(self)
139    }
140}
141
142impl Column {
143    fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
144        if self.index < input_schema.fields.len() {
145            Ok(())
146        } else {
147            internal_err!(
148                "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}",
149                self.name,
150                self.index,
151                input_schema.fields.len(),
152                input_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>()
153            )
154        }
155    }
156}
157
158/// Create a column expression
159pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
160    Ok(Arc::new(Column::new_with_schema(name, schema)?))
161}
162
163/// Rewrites an expression according to new schema; i.e. changes the columns it
164/// refers to with the column at corresponding index in the new schema. Returns
165/// an error if the given schema has fewer columns than the original schema.
166/// Note that the resulting expression may not be valid if data types in the
167/// new schema is incompatible with expression nodes.
168pub fn with_new_schema(
169    expr: Arc<dyn PhysicalExpr>,
170    schema: &SchemaRef,
171) -> Result<Arc<dyn PhysicalExpr>> {
172    Ok(expr
173        .transform_up(|expr| {
174            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
175                let idx = col.index();
176                let Some(field) = schema.fields().get(idx) else {
177                    return plan_err!(
178                        "New schema has fewer columns than original schema"
179                    );
180                };
181                let new_col = Column::new(field.name(), idx);
182                Ok(Transformed::yes(Arc::new(new_col) as _))
183            } else {
184                Ok(Transformed::no(expr))
185            }
186        })?
187        .data)
188}
189
190#[cfg(test)]
191mod test {
192    use super::Column;
193    use crate::physical_expr::PhysicalExpr;
194
195    use arrow::array::StringArray;
196    use arrow::datatypes::{DataType, Field, Schema};
197    use arrow::record_batch::RecordBatch;
198    use datafusion_common::Result;
199
200    use std::sync::Arc;
201
202    #[test]
203    fn out_of_bounds_data_type() {
204        let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
205        let col = Column::new("id", 9);
206        let error = col.data_type(&schema).expect_err("error").strip_backtrace();
207        assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
208            but input schema only has 1 columns: [\"foo\"].\nThis was likely caused by a bug in \
209            DataFusion's code and we would welcome that you file an bug report in our issue tracker".starts_with(&error))
210    }
211
212    #[test]
213    fn out_of_bounds_nullable() {
214        let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
215        let col = Column::new("id", 9);
216        let error = col.nullable(&schema).expect_err("error").strip_backtrace();
217        assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
218            but input schema only has 1 columns: [\"foo\"].\nThis was likely caused by a bug in \
219            DataFusion's code and we would welcome that you file an bug report in our issue tracker".starts_with(&error))
220    }
221
222    #[test]
223    fn out_of_bounds_evaluate() -> Result<()> {
224        let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
225        let data: StringArray = vec!["data"].into();
226        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)])?;
227        let col = Column::new("id", 9);
228        let error = col.evaluate(&batch).expect_err("error").strip_backtrace();
229        assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
230            but input schema only has 1 columns: [\"foo\"].\nThis was likely caused by a bug in \
231            DataFusion's code and we would welcome that you file an bug report in our issue tracker".starts_with(&error));
232        Ok(())
233    }
234}