1use crate::{expr::Sort, lit};
27use arrow::datatypes::DataType;
28use std::fmt::{self, Formatter};
29use std::hash::Hash;
30
31use datafusion_common::{plan_err, sql_err, DataFusionError, Result, ScalarValue};
32use sqlparser::ast;
33use sqlparser::parser::ParserError::ParserError;
34
35#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
40pub struct WindowFrame {
41 pub units: WindowFrameUnits,
43 pub start_bound: WindowFrameBound,
45 pub end_bound: WindowFrameBound,
47 causal: bool,
94}
95
96impl fmt::Display for WindowFrame {
97 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
98 write!(
99 f,
100 "{} BETWEEN {} AND {}",
101 self.units, self.start_bound, self.end_bound
102 )?;
103 Ok(())
104 }
105}
106
107impl fmt::Debug for WindowFrame {
108 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
109 write!(
110 f,
111 "WindowFrame {{ units: {:?}, start_bound: {:?}, end_bound: {:?}, is_causal: {:?} }}",
112 self.units, self.start_bound, self.end_bound, self.causal
113 )?;
114 Ok(())
115 }
116}
117
118impl TryFrom<ast::WindowFrame> for WindowFrame {
119 type Error = DataFusionError;
120
121 fn try_from(value: ast::WindowFrame) -> Result<Self> {
122 let start_bound = WindowFrameBound::try_parse(value.start_bound, &value.units)?;
123 let end_bound = match value.end_bound {
124 Some(bound) => WindowFrameBound::try_parse(bound, &value.units)?,
125 None => WindowFrameBound::CurrentRow,
126 };
127
128 if let WindowFrameBound::Following(val) = &start_bound {
129 if val.is_null() {
130 plan_err!(
131 "Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING"
132 )?
133 }
134 } else if let WindowFrameBound::Preceding(val) = &end_bound {
135 if val.is_null() {
136 plan_err!(
137 "Invalid window frame: end bound cannot be UNBOUNDED PRECEDING"
138 )?
139 }
140 };
141
142 let units = value.units.into();
143 Ok(Self::new_bounds(units, start_bound, end_bound))
144 }
145}
146
147impl WindowFrame {
148 pub fn new(order_by: Option<bool>) -> Self {
152 if let Some(strict) = order_by {
153 Self {
158 units: if strict {
159 WindowFrameUnits::Rows
160 } else {
161 WindowFrameUnits::Range
162 },
163 start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
164 end_bound: WindowFrameBound::CurrentRow,
165 causal: strict,
166 }
167 } else {
168 Self {
172 units: WindowFrameUnits::Rows,
173 start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
174 end_bound: WindowFrameBound::Following(ScalarValue::UInt64(None)),
175 causal: false,
176 }
177 }
178 }
179
180 pub fn reverse(&self) -> Self {
184 let start_bound = match &self.end_bound {
185 WindowFrameBound::Preceding(value) => {
186 WindowFrameBound::Following(value.clone())
187 }
188 WindowFrameBound::Following(value) => {
189 WindowFrameBound::Preceding(value.clone())
190 }
191 WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
192 };
193 let end_bound = match &self.start_bound {
194 WindowFrameBound::Preceding(value) => {
195 WindowFrameBound::Following(value.clone())
196 }
197 WindowFrameBound::Following(value) => {
198 WindowFrameBound::Preceding(value.clone())
199 }
200 WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
201 };
202 Self::new_bounds(self.units, start_bound, end_bound)
203 }
204
205 pub fn is_causal(&self) -> bool {
207 self.causal
208 }
209
210 pub fn new_bounds(
212 units: WindowFrameUnits,
213 start_bound: WindowFrameBound,
214 end_bound: WindowFrameBound,
215 ) -> Self {
216 let causal = match units {
217 WindowFrameUnits::Rows => match &end_bound {
218 WindowFrameBound::Following(value) => {
219 if value.is_null() {
220 false
222 } else {
223 let zero = ScalarValue::new_zero(&value.data_type());
224 zero.map(|zero| value.eq(&zero)).unwrap_or(false)
225 }
226 }
227 _ => true,
228 },
229 WindowFrameUnits::Range | WindowFrameUnits::Groups => match &end_bound {
230 WindowFrameBound::Preceding(value) => {
231 if value.is_null() {
232 true
234 } else {
235 let zero = ScalarValue::new_zero(&value.data_type());
236 zero.map(|zero| value.gt(&zero)).unwrap_or(false)
237 }
238 }
239 _ => false,
240 },
241 };
242 Self {
243 units,
244 start_bound,
245 end_bound,
246 causal,
247 }
248 }
249
250 pub fn regularize_order_bys(&self, order_by: &mut Vec<Sort>) -> Result<()> {
252 match self.units {
253 WindowFrameUnits::Range if self.free_range() => {
258 if order_by.is_empty() {
263 order_by.push(lit(1u64).sort(true, false));
264 }
265 }
266 WindowFrameUnits::Range if order_by.len() != 1 => {
267 return plan_err!("RANGE requires exactly one ORDER BY column");
268 }
269 WindowFrameUnits::Groups if order_by.is_empty() => {
270 return plan_err!("GROUPS requires an ORDER BY clause");
271 }
272 _ => {}
273 }
274 Ok(())
275 }
276
277 pub fn can_accept_multi_orderby(&self) -> bool {
279 match self.units {
280 WindowFrameUnits::Rows => true,
281 WindowFrameUnits::Range => self.free_range(),
282 WindowFrameUnits::Groups => true,
283 }
284 }
285
286 fn free_range(&self) -> bool {
289 (self.start_bound.is_unbounded()
290 || self.start_bound == WindowFrameBound::CurrentRow)
291 && (self.end_bound.is_unbounded()
292 || self.end_bound == WindowFrameBound::CurrentRow)
293 }
294
295 pub fn is_ever_expanding(&self) -> bool {
299 self.start_bound.is_unbounded()
300 }
301}
302
303#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
312pub enum WindowFrameBound {
313 Preceding(ScalarValue),
320 CurrentRow,
327 Following(ScalarValue),
333}
334
335impl WindowFrameBound {
336 pub fn is_unbounded(&self) -> bool {
337 match self {
338 WindowFrameBound::Preceding(elem) => elem.is_null(),
339 WindowFrameBound::CurrentRow => false,
340 WindowFrameBound::Following(elem) => elem.is_null(),
341 }
342 }
343}
344
345impl WindowFrameBound {
346 fn try_parse(
347 value: ast::WindowFrameBound,
348 units: &ast::WindowFrameUnits,
349 ) -> Result<Self> {
350 Ok(match value {
351 ast::WindowFrameBound::Preceding(Some(v)) => {
352 Self::Preceding(convert_frame_bound_to_scalar_value(*v, units)?)
353 }
354 ast::WindowFrameBound::Preceding(None) => Self::Preceding(ScalarValue::Null),
355 ast::WindowFrameBound::Following(Some(v)) => {
356 Self::Following(convert_frame_bound_to_scalar_value(*v, units)?)
357 }
358 ast::WindowFrameBound::Following(None) => Self::Following(ScalarValue::Null),
359 ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
360 })
361 }
362}
363
364fn convert_frame_bound_to_scalar_value(
365 v: ast::Expr,
366 units: &ast::WindowFrameUnits,
367) -> Result<ScalarValue> {
368 match units {
369 ast::WindowFrameUnits::Rows | ast::WindowFrameUnits::Groups => match v {
371 ast::Expr::Value(ast::Value::Number(value, false)) => {
372 Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
373 },
374 ast::Expr::Interval(ast::Interval {
375 value,
376 leading_field: None,
377 leading_precision: None,
378 last_field: None,
379 fractional_seconds_precision: None,
380 }) => {
381 let value = match *value {
382 ast::Expr::Value(ast::Value::SingleQuotedString(item)) => item,
383 e => {
384 return sql_err!(ParserError(format!(
385 "INTERVAL expression cannot be {e:?}"
386 )));
387 }
388 };
389 Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
390 }
391 _ => plan_err!(
392 "Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
393 ),
394 },
395 ast::WindowFrameUnits::Range => Ok(ScalarValue::Utf8(Some(match v {
398 ast::Expr::Value(ast::Value::Number(value, false)) => value,
399 ast::Expr::Interval(ast::Interval {
400 value,
401 leading_field,
402 ..
403 }) => {
404 let result = match *value {
405 ast::Expr::Value(ast::Value::SingleQuotedString(item)) => item,
406 e => {
407 return sql_err!(ParserError(format!(
408 "INTERVAL expression cannot be {e:?}"
409 )));
410 }
411 };
412 if let Some(leading_field) = leading_field {
413 format!("{result} {leading_field}")
414 } else {
415 result
416 }
417 }
418 _ => plan_err!(
419 "Invalid window frame: frame offsets for RANGE must be either a numeric value, a string value or an interval"
420 )?,
421 }))),
422 }
423}
424
425impl fmt::Display for WindowFrameBound {
426 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
427 match self {
428 WindowFrameBound::Preceding(n) => {
429 if n.is_null() {
430 f.write_str("UNBOUNDED PRECEDING")
431 } else {
432 write!(f, "{n} PRECEDING")
433 }
434 }
435 WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
436 WindowFrameBound::Following(n) => {
437 if n.is_null() {
438 f.write_str("UNBOUNDED FOLLOWING")
439 } else {
440 write!(f, "{n} FOLLOWING")
441 }
442 }
443 }
444 }
445}
446
447#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
450pub enum WindowFrameUnits {
451 Rows,
454 Range,
460 Groups,
464}
465
466impl fmt::Display for WindowFrameUnits {
467 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
468 f.write_str(match self {
469 WindowFrameUnits::Rows => "ROWS",
470 WindowFrameUnits::Range => "RANGE",
471 WindowFrameUnits::Groups => "GROUPS",
472 })
473 }
474}
475
476impl From<ast::WindowFrameUnits> for WindowFrameUnits {
477 fn from(value: ast::WindowFrameUnits) -> Self {
478 match value {
479 ast::WindowFrameUnits::Range => Self::Range,
480 ast::WindowFrameUnits::Groups => Self::Groups,
481 ast::WindowFrameUnits::Rows => Self::Rows,
482 }
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489
490 #[test]
491 fn test_window_frame_creation() -> Result<()> {
492 let window_frame = ast::WindowFrame {
493 units: ast::WindowFrameUnits::Range,
494 start_bound: ast::WindowFrameBound::Following(None),
495 end_bound: None,
496 };
497 let err = WindowFrame::try_from(window_frame).unwrap_err();
498 assert_eq!(
499 err.strip_backtrace(),
500 "Error during planning: Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING".to_owned()
501 );
502
503 let window_frame = ast::WindowFrame {
504 units: ast::WindowFrameUnits::Range,
505 start_bound: ast::WindowFrameBound::Preceding(None),
506 end_bound: Some(ast::WindowFrameBound::Preceding(None)),
507 };
508 let err = WindowFrame::try_from(window_frame).unwrap_err();
509 assert_eq!(
510 err.strip_backtrace(),
511 "Error during planning: Invalid window frame: end bound cannot be UNBOUNDED PRECEDING".to_owned()
512 );
513
514 let window_frame = ast::WindowFrame {
515 units: ast::WindowFrameUnits::Rows,
516 start_bound: ast::WindowFrameBound::Preceding(Some(Box::new(
517 ast::Expr::Value(ast::Value::Number("2".to_string(), false)),
518 ))),
519 end_bound: Some(ast::WindowFrameBound::Preceding(Some(Box::new(
520 ast::Expr::Value(ast::Value::Number("1".to_string(), false)),
521 )))),
522 };
523
524 let window_frame = WindowFrame::try_from(window_frame)?;
525 assert_eq!(window_frame.units, WindowFrameUnits::Rows);
526 assert_eq!(
527 window_frame.start_bound,
528 WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2)))
529 );
530 assert_eq!(
531 window_frame.end_bound,
532 WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1)))
533 );
534
535 Ok(())
536 }
537
538 macro_rules! test_bound {
539 ($unit:ident, $value:expr, $expected:expr) => {
540 let preceding = WindowFrameBound::try_parse(
541 ast::WindowFrameBound::Preceding($value),
542 &ast::WindowFrameUnits::$unit,
543 )?;
544 assert_eq!(preceding, WindowFrameBound::Preceding($expected));
545 let following = WindowFrameBound::try_parse(
546 ast::WindowFrameBound::Following($value),
547 &ast::WindowFrameUnits::$unit,
548 )?;
549 assert_eq!(following, WindowFrameBound::Following($expected));
550 };
551 }
552
553 macro_rules! test_bound_err {
554 ($unit:ident, $value:expr, $expected:expr) => {
555 let err = WindowFrameBound::try_parse(
556 ast::WindowFrameBound::Preceding($value),
557 &ast::WindowFrameUnits::$unit,
558 )
559 .unwrap_err();
560 assert_eq!(err.strip_backtrace(), $expected);
561 let err = WindowFrameBound::try_parse(
562 ast::WindowFrameBound::Following($value),
563 &ast::WindowFrameUnits::$unit,
564 )
565 .unwrap_err();
566 assert_eq!(err.strip_backtrace(), $expected);
567 };
568 }
569
570 #[test]
571 fn test_window_frame_bound_creation() -> Result<()> {
572 test_bound!(Rows, None, ScalarValue::Null);
574 test_bound!(Groups, None, ScalarValue::Null);
575 test_bound!(Range, None, ScalarValue::Null);
576
577 let number = Some(Box::new(ast::Expr::Value(ast::Value::Number(
579 "42".to_string(),
580 false,
581 ))));
582 test_bound!(Rows, number.clone(), ScalarValue::UInt64(Some(42)));
583 test_bound!(Groups, number.clone(), ScalarValue::UInt64(Some(42)));
584 test_bound!(
585 Range,
586 number.clone(),
587 ScalarValue::Utf8(Some("42".to_string()))
588 );
589
590 let number = Some(Box::new(ast::Expr::Interval(ast::Interval {
592 value: Box::new(ast::Expr::Value(ast::Value::SingleQuotedString(
593 "1".to_string(),
594 ))),
595 leading_field: Some(ast::DateTimeField::Day),
596 fractional_seconds_precision: None,
597 last_field: None,
598 leading_precision: None,
599 })));
600 test_bound_err!(Rows, number.clone(), "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers");
601 test_bound_err!(Groups, number.clone(), "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers");
602 test_bound!(
603 Range,
604 number.clone(),
605 ScalarValue::Utf8(Some("1 DAY".to_string()))
606 );
607
608 Ok(())
609 }
610}