datafusion_physical_expr/expressions/
column.rs1use std::any::Any;
21use std::hash::Hash;
22use std::sync::Arc;
23
24use crate::physical_expr::PhysicalExpr;
25use arrow::{
26 datatypes::{DataType, Schema, SchemaRef},
27 record_batch::RecordBatch,
28};
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_common::{internal_err, plan_err, Result};
31use datafusion_expr::ColumnarValue;
32
33#[derive(Debug, Hash, PartialEq, Eq, Clone)]
66pub struct Column {
67 name: String,
69 index: usize,
71}
72
73impl Column {
74 pub fn new(name: &str, index: usize) -> Self {
77 Self {
78 name: name.to_owned(),
79 index,
80 }
81 }
82
83 pub fn new_with_schema(name: &str, schema: &Schema) -> Result<Self> {
86 Ok(Column::new(name, schema.index_of(name)?))
87 }
88
89 pub fn name(&self) -> &str {
91 &self.name
92 }
93
94 pub fn index(&self) -> usize {
96 self.index
97 }
98}
99
100impl std::fmt::Display for Column {
101 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
102 write!(f, "{}@{}", self.name, self.index)
103 }
104}
105
106impl PhysicalExpr for Column {
107 fn as_any(&self) -> &dyn Any {
109 self
110 }
111
112 fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
114 self.bounds_check(input_schema)?;
115 Ok(input_schema.field(self.index).data_type().clone())
116 }
117
118 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
120 self.bounds_check(input_schema)?;
121 Ok(input_schema.field(self.index).is_nullable())
122 }
123
124 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
126 self.bounds_check(batch.schema().as_ref())?;
127 Ok(ColumnarValue::Array(Arc::clone(batch.column(self.index))))
128 }
129
130 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
131 vec![]
132 }
133
134 fn with_new_children(
135 self: Arc<Self>,
136 _children: Vec<Arc<dyn PhysicalExpr>>,
137 ) -> Result<Arc<dyn PhysicalExpr>> {
138 Ok(self)
139 }
140}
141
142impl Column {
143 fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
144 if self.index < input_schema.fields.len() {
145 Ok(())
146 } else {
147 internal_err!(
148 "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}",
149 self.name,
150 self.index,
151 input_schema.fields.len(),
152 input_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>()
153 )
154 }
155 }
156}
157
158pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
160 Ok(Arc::new(Column::new_with_schema(name, schema)?))
161}
162
163pub fn with_new_schema(
169 expr: Arc<dyn PhysicalExpr>,
170 schema: &SchemaRef,
171) -> Result<Arc<dyn PhysicalExpr>> {
172 Ok(expr
173 .transform_up(|expr| {
174 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
175 let idx = col.index();
176 let Some(field) = schema.fields().get(idx) else {
177 return plan_err!(
178 "New schema has fewer columns than original schema"
179 );
180 };
181 let new_col = Column::new(field.name(), idx);
182 Ok(Transformed::yes(Arc::new(new_col) as _))
183 } else {
184 Ok(Transformed::no(expr))
185 }
186 })?
187 .data)
188}
189
190#[cfg(test)]
191mod test {
192 use super::Column;
193 use crate::physical_expr::PhysicalExpr;
194
195 use arrow::array::StringArray;
196 use arrow::datatypes::{DataType, Field, Schema};
197 use arrow::record_batch::RecordBatch;
198 use datafusion_common::Result;
199
200 use std::sync::Arc;
201
202 #[test]
203 fn out_of_bounds_data_type() {
204 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
205 let col = Column::new("id", 9);
206 let error = col.data_type(&schema).expect_err("error").strip_backtrace();
207 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
208 but input schema only has 1 columns: [\"foo\"].\nThis was likely caused by a bug in \
209 DataFusion's code and we would welcome that you file an bug report in our issue tracker".starts_with(&error))
210 }
211
212 #[test]
213 fn out_of_bounds_nullable() {
214 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
215 let col = Column::new("id", 9);
216 let error = col.nullable(&schema).expect_err("error").strip_backtrace();
217 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
218 but input schema only has 1 columns: [\"foo\"].\nThis was likely caused by a bug in \
219 DataFusion's code and we would welcome that you file an bug report in our issue tracker".starts_with(&error))
220 }
221
222 #[test]
223 fn out_of_bounds_evaluate() -> Result<()> {
224 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
225 let data: StringArray = vec!["data"].into();
226 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)])?;
227 let col = Column::new("id", 9);
228 let error = col.evaluate(&batch).expect_err("error").strip_backtrace();
229 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
230 but input schema only has 1 columns: [\"foo\"].\nThis was likely caused by a bug in \
231 DataFusion's code and we would welcome that you file an bug report in our issue tracker".starts_with(&error));
232 Ok(())
233 }
234}