datafusion_physical_expr/window/
standard.rs1use std::any::Any;
21use std::ops::Range;
22use std::sync::Arc;
23
24use super::{StandardWindowFunctionExpr, WindowExpr};
25use crate::window::window_expr::{get_orderby_values, WindowFn};
26use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
27use crate::{reverse_order_bys, EquivalenceProperties, PhysicalExpr};
28use arrow::array::{new_empty_array, ArrayRef};
29use arrow::compute::SortOptions;
30use arrow::datatypes::Field;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::utils::evaluate_partition_ranges;
33use datafusion_common::{Result, ScalarValue};
34use datafusion_expr::window_state::{WindowAggState, WindowFrameContext};
35use datafusion_expr::WindowFrame;
36use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
37
38#[derive(Debug)]
40pub struct StandardWindowExpr {
41 expr: Arc<dyn StandardWindowFunctionExpr>,
42 partition_by: Vec<Arc<dyn PhysicalExpr>>,
43 order_by: LexOrdering,
44 window_frame: Arc<WindowFrame>,
45}
46
47impl StandardWindowExpr {
48 pub fn new(
50 expr: Arc<dyn StandardWindowFunctionExpr>,
51 partition_by: &[Arc<dyn PhysicalExpr>],
52 order_by: &LexOrdering,
53 window_frame: Arc<WindowFrame>,
54 ) -> Self {
55 Self {
56 expr,
57 partition_by: partition_by.to_vec(),
58 order_by: order_by.clone(),
59 window_frame,
60 }
61 }
62
63 pub fn get_standard_func_expr(&self) -> &Arc<dyn StandardWindowFunctionExpr> {
65 &self.expr
66 }
67
68 pub fn add_equal_orderings(&self, eq_properties: &mut EquivalenceProperties) {
74 let schema = eq_properties.schema();
75 if let Some(fn_res_ordering) = self.expr.get_result_ordering(schema) {
76 add_new_ordering_expr_with_partition_by(
77 eq_properties,
78 fn_res_ordering,
79 &self.partition_by,
80 );
81 }
82 }
83}
84
85impl WindowExpr for StandardWindowExpr {
86 fn as_any(&self) -> &dyn Any {
88 self
89 }
90
91 fn name(&self) -> &str {
92 self.expr.name()
93 }
94
95 fn field(&self) -> Result<Field> {
96 self.expr.field()
97 }
98
99 fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
100 self.expr.expressions()
101 }
102
103 fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
104 &self.partition_by
105 }
106
107 fn order_by(&self) -> &LexOrdering {
108 self.order_by.as_ref()
109 }
110
111 fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
112 let mut evaluator = self.expr.create_evaluator()?;
113 let num_rows = batch.num_rows();
114 if evaluator.uses_window_frame() {
115 let sort_options: Vec<SortOptions> =
116 self.order_by.iter().map(|o| o.options).collect();
117 let mut row_wise_results = vec![];
118
119 let mut values = self.evaluate_args(batch)?;
120 let order_bys = get_orderby_values(self.order_by_columns(batch)?);
121 let n_args = values.len();
122 values.extend(order_bys);
123 let order_bys_ref = &values[n_args..];
124
125 let mut window_frame_ctx =
126 WindowFrameContext::new(Arc::clone(&self.window_frame), sort_options);
127 let mut last_range = Range { start: 0, end: 0 };
128 for idx in 0..num_rows {
130 let range = window_frame_ctx.calculate_range(
131 order_bys_ref,
132 &last_range,
133 num_rows,
134 idx,
135 )?;
136 let value = evaluator.evaluate(&values, &range)?;
137 row_wise_results.push(value);
138 last_range = range;
139 }
140 ScalarValue::iter_to_array(row_wise_results)
141 } else if evaluator.include_rank() {
142 let columns = self.order_by_columns(batch)?;
143 let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
144 evaluator.evaluate_all_with_rank(num_rows, &sort_partition_points)
145 } else {
146 let values = self.evaluate_args(batch)?;
147 evaluator.evaluate_all(&values, num_rows)
148 }
149 }
150
151 fn evaluate_stateful(
154 &self,
155 partition_batches: &PartitionBatches,
156 window_agg_state: &mut PartitionWindowAggStates,
157 ) -> Result<()> {
158 let field = self.expr.field()?;
159 let out_type = field.data_type();
160 let sort_options = self.order_by.iter().map(|o| o.options).collect::<Vec<_>>();
161 for (partition_row, partition_batch_state) in partition_batches.iter() {
162 let window_state =
163 if let Some(window_state) = window_agg_state.get_mut(partition_row) {
164 window_state
165 } else {
166 let evaluator = self.expr.create_evaluator()?;
167 window_agg_state
168 .entry(partition_row.clone())
169 .or_insert(WindowState {
170 state: WindowAggState::new(out_type)?,
171 window_fn: WindowFn::Builtin(evaluator),
172 })
173 };
174 let evaluator = match &mut window_state.window_fn {
175 WindowFn::Builtin(evaluator) => evaluator,
176 _ => unreachable!(),
177 };
178 let state = &mut window_state.state;
179
180 let batch_ref = &partition_batch_state.record_batch;
181 let mut values = self.evaluate_args(batch_ref)?;
182 let order_bys = if evaluator.uses_window_frame() || evaluator.include_rank() {
183 get_orderby_values(self.order_by_columns(batch_ref)?)
184 } else {
185 vec![]
186 };
187 let n_args = values.len();
188 values.extend(order_bys);
189 let order_bys_ref = &values[n_args..];
190
191 let record_batch = &partition_batch_state.record_batch;
193 let num_rows = record_batch.num_rows();
194 let mut row_wise_results: Vec<ScalarValue> = vec![];
195 let is_causal = if evaluator.uses_window_frame() {
196 self.window_frame.is_causal()
197 } else {
198 evaluator.is_causal()
199 };
200 for idx in state.last_calculated_index..num_rows {
201 let frame_range = if evaluator.uses_window_frame() {
202 state
203 .window_frame_ctx
204 .get_or_insert_with(|| {
205 WindowFrameContext::new(
206 Arc::clone(&self.window_frame),
207 sort_options.clone(),
208 )
209 })
210 .calculate_range(
211 order_bys_ref,
212 &state.window_frame_range,
214 num_rows,
215 idx,
216 )
217 } else {
218 evaluator.get_range(idx, num_rows)
219 }?;
220
221 if frame_range.end == num_rows
223 && !is_causal
224 && !partition_batch_state.is_end
225 {
226 break;
227 }
228 state.window_frame_range = frame_range;
230 row_wise_results
231 .push(evaluator.evaluate(&values, &state.window_frame_range)?);
232 }
233 let out_col = if row_wise_results.is_empty() {
234 new_empty_array(out_type)
235 } else {
236 ScalarValue::iter_to_array(row_wise_results.into_iter())?
237 };
238
239 state.update(&out_col, partition_batch_state)?;
240 if self.window_frame.start_bound.is_unbounded() {
241 evaluator.memoize(state)?;
242 }
243 }
244 Ok(())
245 }
246
247 fn get_window_frame(&self) -> &Arc<WindowFrame> {
248 &self.window_frame
249 }
250
251 fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>> {
252 self.expr.reverse_expr().map(|reverse_expr| {
253 Arc::new(StandardWindowExpr::new(
254 reverse_expr,
255 &self.partition_by.clone(),
256 reverse_order_bys(self.order_by.as_ref()).as_ref(),
257 Arc::new(self.window_frame.reverse()),
258 )) as _
259 })
260 }
261
262 fn uses_bounded_memory(&self) -> bool {
263 if let Ok(evaluator) = self.expr.create_evaluator() {
264 evaluator.supports_bounded_execution()
265 && (!evaluator.uses_window_frame()
266 || !self.window_frame.end_bound.is_unbounded())
267 } else {
268 false
269 }
270 }
271}
272
273pub(crate) fn add_new_ordering_expr_with_partition_by(
276 eqp: &mut EquivalenceProperties,
277 expr: PhysicalSortExpr,
278 partition_by: &[Arc<dyn PhysicalExpr>],
279) {
280 if partition_by.is_empty() {
281 eqp.add_new_orderings([LexOrdering::new(vec![expr])]);
283 } else {
284 let (mut ordering, _) = eqp.find_longest_permutation(partition_by);
291 if ordering.len() == partition_by.len() {
292 ordering.push(expr);
293 eqp.add_new_orderings([ordering]);
294 }
295 }
296}