1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23 ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25 TimestampSecondType,
26};
27use arrow::array::{ArrayRef, PrimitiveArray};
28use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{DataType, TimeUnit};
32
33use datafusion_common::cast::as_primitive_array;
34use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
35use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
36use datafusion_expr::TypeSignature::Exact;
37use datafusion_expr::{
38 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
39};
40use datafusion_macros::user_doc;
41
42use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
43
44#[user_doc(
45 doc_section(label = "Time and Date Functions"),
46 description = r#"
47Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
48
49For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
50"#,
51 syntax_example = "date_bin(interval, expression, origin-timestamp)",
52 sql_example = r#"```sql
53-- Bin the timestamp into 1 day intervals
54> SELECT date_bin(interval '1 day', time) as bin
55FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
56+---------------------+
57| bin |
58+---------------------+
59| 2023-01-01T00:00:00 |
60| 2023-01-03T00:00:00 |
61+---------------------+
622 row(s) fetched.
63
64-- Bin the timestamp into 1 day intervals starting at 3AM on 2023-01-01
65> SELECT date_bin(interval '1 day', time, '2023-01-01T03:00:00') as bin
66FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
67+---------------------+
68| bin |
69+---------------------+
70| 2023-01-01T03:00:00 |
71| 2023-01-03T03:00:00 |
72+---------------------+
732 row(s) fetched.
74```"#,
75 argument(name = "interval", description = "Bin interval."),
76 argument(
77 name = "expression",
78 description = "Time expression to operate on. Can be a constant, column, or function."
79 ),
80 argument(
81 name = "origin-timestamp",
82 description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
83
84 - nanoseconds
85 - microseconds
86 - milliseconds
87 - seconds
88 - minutes
89 - hours
90 - days
91 - weeks
92 - months
93 - years
94 - century
95"#
96 )
97)]
98#[derive(Debug)]
99pub struct DateBinFunc {
100 signature: Signature,
101}
102
103impl Default for DateBinFunc {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109impl DateBinFunc {
110 pub fn new() -> Self {
111 let base_sig = |array_type: TimeUnit| {
112 vec![
113 Exact(vec![
114 DataType::Interval(MonthDayNano),
115 Timestamp(array_type, None),
116 Timestamp(Nanosecond, None),
117 ]),
118 Exact(vec![
119 DataType::Interval(MonthDayNano),
120 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
121 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
122 ]),
123 Exact(vec![
124 DataType::Interval(DayTime),
125 Timestamp(array_type, None),
126 Timestamp(Nanosecond, None),
127 ]),
128 Exact(vec![
129 DataType::Interval(DayTime),
130 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
131 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
132 ]),
133 Exact(vec![
134 DataType::Interval(MonthDayNano),
135 Timestamp(array_type, None),
136 ]),
137 Exact(vec![
138 DataType::Interval(MonthDayNano),
139 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
140 ]),
141 Exact(vec![
142 DataType::Interval(DayTime),
143 Timestamp(array_type, None),
144 ]),
145 Exact(vec![
146 DataType::Interval(DayTime),
147 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
148 ]),
149 ]
150 };
151
152 let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
153 .into_iter()
154 .map(base_sig)
155 .collect::<Vec<_>>()
156 .concat();
157
158 Self {
159 signature: Signature::one_of(full_sig, Volatility::Immutable),
160 }
161 }
162}
163
164impl ScalarUDFImpl for DateBinFunc {
165 fn as_any(&self) -> &dyn Any {
166 self
167 }
168
169 fn name(&self) -> &str {
170 "date_bin"
171 }
172
173 fn signature(&self) -> &Signature {
174 &self.signature
175 }
176
177 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
178 match &arg_types[1] {
179 Timestamp(Nanosecond, None) | Utf8 | Null => Ok(Timestamp(Nanosecond, None)),
180 Timestamp(Nanosecond, tz_opt) => Ok(Timestamp(Nanosecond, tz_opt.clone())),
181 Timestamp(Microsecond, tz_opt) => Ok(Timestamp(Microsecond, tz_opt.clone())),
182 Timestamp(Millisecond, tz_opt) => Ok(Timestamp(Millisecond, tz_opt.clone())),
183 Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
184 _ => plan_err!(
185 "The date_bin function can only accept timestamp as the second arg."
186 ),
187 }
188 }
189
190 fn invoke_with_args(
191 &self,
192 args: datafusion_expr::ScalarFunctionArgs,
193 ) -> Result<ColumnarValue> {
194 let args = &args.args;
195 if args.len() == 2 {
196 let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
198 Some(0),
199 Some("+00:00".into()),
200 ));
201 date_bin_impl(&args[0], &args[1], &origin)
202 } else if args.len() == 3 {
203 date_bin_impl(&args[0], &args[1], &args[2])
204 } else {
205 exec_err!("DATE_BIN expected two or three arguments")
206 }
207 }
208
209 fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
210 let step = &input[0];
212 let date_value = &input[1];
213 let reference = input.get(2);
214
215 if step.sort_properties.eq(&SortProperties::Singleton)
216 && reference
217 .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
218 .unwrap_or(true)
219 {
220 Ok(date_value.sort_properties)
221 } else {
222 Ok(SortProperties::Unordered)
223 }
224 }
225 fn documentation(&self) -> Option<&Documentation> {
226 self.doc()
227 }
228}
229
230enum Interval {
231 Nanoseconds(i64),
232 Months(i64),
233}
234
235impl Interval {
236 fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
245 match self {
246 Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
247 Interval::Months(months) => (*months, date_bin_months_interval),
248 }
249 }
250}
251
252fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 {
254 let time_diff = source - origin;
255
256 let time_delta = compute_distance(time_diff, stride_nanos);
258
259 origin + time_delta
260}
261
262fn compute_distance(time_diff: i64, stride: i64) -> i64 {
264 let time_delta = time_diff - (time_diff % stride);
265
266 if time_diff < 0 && stride > 1 && time_delta != time_diff {
267 time_delta - stride
269 } else {
270 time_delta
271 }
272}
273
274fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64 {
276 let source_date = to_utc_date_time(source);
278 let origin_date = to_utc_date_time(origin);
279
280 let month_diff = (source_date.year() - origin_date.year()) * 12
282 + source_date.month() as i32
283 - origin_date.month() as i32;
284
285 let month_delta = compute_distance(month_diff as i64, stride_months);
287
288 let mut bin_time = if month_delta < 0 {
289 origin_date - Months::new(month_delta.unsigned_abs() as u32)
290 } else {
291 origin_date + Months::new(month_delta as u32)
292 };
293
294 if bin_time > source_date {
297 let month_delta = month_delta - stride_months;
298 bin_time = if month_delta < 0 {
299 origin_date - Months::new(month_delta.unsigned_abs() as u32)
300 } else {
301 origin_date + Months::new(month_delta as u32)
302 };
303 }
304
305 bin_time.timestamp_nanos_opt().unwrap()
306}
307
308fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
309 let secs = nanos / 1_000_000_000;
310 let nsec = (nanos % 1_000_000_000) as u32;
311 DateTime::from_timestamp(secs, nsec).unwrap()
312}
313
314fn date_bin_impl(
321 stride: &ColumnarValue,
322 array: &ColumnarValue,
323 origin: &ColumnarValue,
324) -> Result<ColumnarValue> {
325 let stride = match stride {
326 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
327 let (days, ms) = IntervalDayTimeType::to_parts(*v);
328 let nanos = (TimeDelta::try_days(days as i64).unwrap()
329 + TimeDelta::try_milliseconds(ms as i64).unwrap())
330 .num_nanoseconds();
331
332 match nanos {
333 Some(v) => Interval::Nanoseconds(v),
334 _ => return exec_err!("DATE_BIN stride argument is too large"),
335 }
336 }
337 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
338 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
339
340 if months != 0 {
342 if days != 0 || nanos != 0 {
344 return not_impl_err!(
345 "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
346 );
347 } else {
348 Interval::Months(months as i64)
349 }
350 } else {
351 let nanos = (TimeDelta::try_days(days as i64).unwrap()
352 + Duration::nanoseconds(nanos))
353 .num_nanoseconds();
354 match nanos {
355 Some(v) => Interval::Nanoseconds(v),
356 _ => return exec_err!("DATE_BIN stride argument is too large"),
357 }
358 }
359 }
360 ColumnarValue::Scalar(v) => {
361 return exec_err!(
362 "DATE_BIN expects stride argument to be an INTERVAL but got {}",
363 v.data_type()
364 );
365 }
366 ColumnarValue::Array(_) => {
367 return not_impl_err!(
368 "DATE_BIN only supports literal values for the stride argument, not arrays"
369 );
370 }
371 };
372
373 let origin = match origin {
374 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
375 ColumnarValue::Scalar(v) => {
376 return exec_err!(
377 "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got {}",
378 v.data_type()
379 );
380 }
381 ColumnarValue::Array(_) => {
382 return not_impl_err!(
383 "DATE_BIN only supports literal values for the origin argument, not arrays"
384 );
385 }
386 };
387
388 let (stride, stride_fn) = stride.bin_fn();
389
390 if stride == 0 {
392 return exec_err!("DATE_BIN stride must be non-zero");
393 }
394
395 fn stride_map_fn<T: ArrowTimestampType>(
396 origin: i64,
397 stride: i64,
398 stride_fn: fn(i64, i64, i64) -> i64,
399 ) -> impl Fn(i64) -> i64 {
400 let scale = match T::UNIT {
401 Nanosecond => 1,
402 Microsecond => NANOSECONDS / 1_000_000,
403 Millisecond => NANOSECONDS / 1_000,
404 Second => NANOSECONDS,
405 };
406 move |x: i64| stride_fn(stride, x * scale, origin) / scale
407 }
408
409 Ok(match array {
410 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
411 let apply_stride_fn =
412 stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
413 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
414 v.map(apply_stride_fn),
415 tz_opt.clone(),
416 ))
417 }
418 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
419 let apply_stride_fn =
420 stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
421 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
422 v.map(apply_stride_fn),
423 tz_opt.clone(),
424 ))
425 }
426 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
427 let apply_stride_fn =
428 stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
429 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
430 v.map(apply_stride_fn),
431 tz_opt.clone(),
432 ))
433 }
434 ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
435 let apply_stride_fn =
436 stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
437 ColumnarValue::Scalar(ScalarValue::TimestampSecond(
438 v.map(apply_stride_fn),
439 tz_opt.clone(),
440 ))
441 }
442
443 ColumnarValue::Array(array) => {
444 fn transform_array_with_stride<T>(
445 origin: i64,
446 stride: i64,
447 stride_fn: fn(i64, i64, i64) -> i64,
448 array: &ArrayRef,
449 tz_opt: &Option<Arc<str>>,
450 ) -> Result<ColumnarValue>
451 where
452 T: ArrowTimestampType,
453 {
454 let array = as_primitive_array::<T>(array)?;
455 let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
456 let array: PrimitiveArray<T> = array
457 .unary(apply_stride_fn)
458 .with_timezone_opt(tz_opt.clone());
459
460 Ok(ColumnarValue::Array(Arc::new(array)))
461 }
462
463 match array.data_type() {
464 Timestamp(Nanosecond, tz_opt) => {
465 transform_array_with_stride::<TimestampNanosecondType>(
466 origin, stride, stride_fn, array, tz_opt,
467 )?
468 }
469 Timestamp(Microsecond, tz_opt) => {
470 transform_array_with_stride::<TimestampMicrosecondType>(
471 origin, stride, stride_fn, array, tz_opt,
472 )?
473 }
474 Timestamp(Millisecond, tz_opt) => {
475 transform_array_with_stride::<TimestampMillisecondType>(
476 origin, stride, stride_fn, array, tz_opt,
477 )?
478 }
479 Timestamp(Second, tz_opt) => {
480 transform_array_with_stride::<TimestampSecondType>(
481 origin, stride, stride_fn, array, tz_opt,
482 )?
483 }
484 _ => {
485 return exec_err!(
486 "DATE_BIN expects source argument to be a TIMESTAMP but got {}",
487 array.data_type()
488 );
489 }
490 }
491 }
492 _ => {
493 return exec_err!(
494 "DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
495 );
496 }
497 })
498}
499
500#[cfg(test)]
501mod tests {
502 use std::sync::Arc;
503
504 use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
505 use arrow::array::types::TimestampNanosecondType;
506 use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
507 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
508 use arrow::datatypes::{DataType, TimeUnit};
509
510 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
511 use datafusion_common::ScalarValue;
512 use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
513
514 use chrono::TimeDelta;
515
516 #[test]
517 fn test_date_bin() {
518 let mut args = datafusion_expr::ScalarFunctionArgs {
519 args: vec![
520 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
521 IntervalDayTime {
522 days: 0,
523 milliseconds: 1,
524 },
525 ))),
526 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
527 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
528 ],
529 number_rows: 1,
530 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
531 };
532 let res = DateBinFunc::new().invoke_with_args(args);
533 assert!(res.is_ok());
534
535 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
536 let batch_len = timestamps.len();
537 args = datafusion_expr::ScalarFunctionArgs {
538 args: vec![
539 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
540 IntervalDayTime {
541 days: 0,
542 milliseconds: 1,
543 },
544 ))),
545 ColumnarValue::Array(timestamps),
546 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
547 ],
548 number_rows: batch_len,
549 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
550 };
551 let res = DateBinFunc::new().invoke_with_args(args);
552 assert!(res.is_ok());
553
554 args = datafusion_expr::ScalarFunctionArgs {
555 args: vec![
556 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
557 IntervalDayTime {
558 days: 0,
559 milliseconds: 1,
560 },
561 ))),
562 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
563 ],
564 number_rows: 1,
565 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
566 };
567 let res = DateBinFunc::new().invoke_with_args(args);
568 assert!(res.is_ok());
569
570 args = datafusion_expr::ScalarFunctionArgs {
572 args: vec![
573 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
574 IntervalMonthDayNano {
575 months: 0,
576 days: 0,
577 nanoseconds: 1,
578 },
579 ))),
580 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
581 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
582 ],
583 number_rows: 1,
584 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
585 };
586 let res = DateBinFunc::new().invoke_with_args(args);
587 assert!(res.is_ok());
588
589 args = datafusion_expr::ScalarFunctionArgs {
595 args: vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
596 IntervalDayTime {
597 days: 0,
598 milliseconds: 1,
599 },
600 )))],
601 number_rows: 1,
602 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
603 };
604 let res = DateBinFunc::new().invoke_with_args(args);
605 assert_eq!(
606 res.err().unwrap().strip_backtrace(),
607 "Execution error: DATE_BIN expected two or three arguments"
608 );
609
610 args = datafusion_expr::ScalarFunctionArgs {
612 args: vec![
613 ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
614 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
615 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
616 ],
617 number_rows: 1,
618 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
619 };
620 let res = DateBinFunc::new().invoke_with_args(args);
621 assert_eq!(
622 res.err().unwrap().strip_backtrace(),
623 "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
624 );
625
626 args = datafusion_expr::ScalarFunctionArgs {
629 args: vec![
630 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
631 IntervalDayTime {
632 days: 0,
633 milliseconds: 0,
634 },
635 ))),
636 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
637 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
638 ],
639 number_rows: 1,
640 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
641 };
642
643 let res = DateBinFunc::new().invoke_with_args(args);
644 assert_eq!(
645 res.err().unwrap().strip_backtrace(),
646 "Execution error: DATE_BIN stride must be non-zero"
647 );
648
649 args = datafusion_expr::ScalarFunctionArgs {
651 args: vec![
652 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
653 IntervalDayTime::MAX,
654 ))),
655 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
656 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
657 ],
658 number_rows: 1,
659 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
660 };
661 let res = DateBinFunc::new().invoke_with_args(args);
662 assert_eq!(
663 res.err().unwrap().strip_backtrace(),
664 "Execution error: DATE_BIN stride argument is too large"
665 );
666
667 args = datafusion_expr::ScalarFunctionArgs {
669 args: vec![
670 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
671 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
672 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
673 ],
674 number_rows: 1,
675 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
676 };
677 let res = DateBinFunc::new().invoke_with_args(args);
678 assert_eq!(
679 res.err().unwrap().strip_backtrace(),
680 "Execution error: DATE_BIN stride argument is too large"
681 );
682
683 args = datafusion_expr::ScalarFunctionArgs {
685 args: vec![
686 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
687 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
688 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
689 ],
690 number_rows: 1,
691 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
692 };
693 let res = DateBinFunc::new().invoke_with_args(args);
694 assert_eq!(
695 res.err().unwrap().strip_backtrace(),
696 "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
697 );
698
699 args = datafusion_expr::ScalarFunctionArgs {
701 args: vec![
702 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
703 IntervalDayTime {
704 days: 0,
705 milliseconds: 1,
706 },
707 ))),
708 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
709 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
710 ],
711 number_rows: 1,
712 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
713 };
714 let res = DateBinFunc::new().invoke_with_args(args);
715 assert_eq!(
716 res.err().unwrap().strip_backtrace(),
717 "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(Microsecond, None)"
718 );
719
720 args = datafusion_expr::ScalarFunctionArgs {
721 args: vec![
722 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
723 IntervalDayTime {
724 days: 0,
725 milliseconds: 1,
726 },
727 ))),
728 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
729 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
730 ],
731 number_rows: 1,
732 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
733 };
734 let res = DateBinFunc::new().invoke_with_args(args);
735 assert!(res.is_ok());
736
737 let intervals = Arc::new(
739 (1..6)
740 .map(|x| {
741 Some(IntervalDayTime {
742 days: 0,
743 milliseconds: x,
744 })
745 })
746 .collect::<IntervalDayTimeArray>(),
747 );
748 args = datafusion_expr::ScalarFunctionArgs {
749 args: vec![
750 ColumnarValue::Array(intervals),
751 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
752 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
753 ],
754 number_rows: 1,
755 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
756 };
757 let res = DateBinFunc::new().invoke_with_args(args);
758 assert_eq!(
759 res.err().unwrap().strip_backtrace(),
760 "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
761 );
762
763 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
765 let batch_len = timestamps.len();
766 args = datafusion_expr::ScalarFunctionArgs {
767 args: vec![
768 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
769 IntervalDayTime {
770 days: 0,
771 milliseconds: 1,
772 },
773 ))),
774 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
775 ColumnarValue::Array(timestamps),
776 ],
777 number_rows: batch_len,
778 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
779 };
780 let res = DateBinFunc::new().invoke_with_args(args);
781 assert_eq!(
782 res.err().unwrap().strip_backtrace(),
783 "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
784 );
785 }
786
787 #[test]
788 fn test_date_bin_timezones() {
789 let cases = vec![
790 (
791 vec![
792 "2020-09-08T00:00:00Z",
793 "2020-09-08T01:00:00Z",
794 "2020-09-08T02:00:00Z",
795 "2020-09-08T03:00:00Z",
796 "2020-09-08T04:00:00Z",
797 ],
798 Some("+00".into()),
799 "1970-01-01T00:00:00Z",
800 vec![
801 "2020-09-08T00:00:00Z",
802 "2020-09-08T00:00:00Z",
803 "2020-09-08T00:00:00Z",
804 "2020-09-08T00:00:00Z",
805 "2020-09-08T00:00:00Z",
806 ],
807 ),
808 (
809 vec![
810 "2020-09-08T00:00:00Z",
811 "2020-09-08T01:00:00Z",
812 "2020-09-08T02:00:00Z",
813 "2020-09-08T03:00:00Z",
814 "2020-09-08T04:00:00Z",
815 ],
816 None,
817 "1970-01-01T00:00:00Z",
818 vec![
819 "2020-09-08T00:00:00Z",
820 "2020-09-08T00:00:00Z",
821 "2020-09-08T00:00:00Z",
822 "2020-09-08T00:00:00Z",
823 "2020-09-08T00:00:00Z",
824 ],
825 ),
826 (
827 vec![
828 "2020-09-08T00:00:00Z",
829 "2020-09-08T01:00:00Z",
830 "2020-09-08T02:00:00Z",
831 "2020-09-08T03:00:00Z",
832 "2020-09-08T04:00:00Z",
833 ],
834 Some("-02".into()),
835 "1970-01-01T00:00:00Z",
836 vec![
837 "2020-09-08T00:00:00Z",
838 "2020-09-08T00:00:00Z",
839 "2020-09-08T00:00:00Z",
840 "2020-09-08T00:00:00Z",
841 "2020-09-08T00:00:00Z",
842 ],
843 ),
844 (
845 vec![
846 "2020-09-08T00:00:00+05",
847 "2020-09-08T01:00:00+05",
848 "2020-09-08T02:00:00+05",
849 "2020-09-08T03:00:00+05",
850 "2020-09-08T04:00:00+05",
851 ],
852 Some("+05".into()),
853 "1970-01-01T00:00:00+05",
854 vec![
855 "2020-09-08T00:00:00+05",
856 "2020-09-08T00:00:00+05",
857 "2020-09-08T00:00:00+05",
858 "2020-09-08T00:00:00+05",
859 "2020-09-08T00:00:00+05",
860 ],
861 ),
862 (
863 vec![
864 "2020-09-08T00:00:00+08",
865 "2020-09-08T01:00:00+08",
866 "2020-09-08T02:00:00+08",
867 "2020-09-08T03:00:00+08",
868 "2020-09-08T04:00:00+08",
869 ],
870 Some("+08".into()),
871 "1970-01-01T00:00:00+08",
872 vec![
873 "2020-09-08T00:00:00+08",
874 "2020-09-08T00:00:00+08",
875 "2020-09-08T00:00:00+08",
876 "2020-09-08T00:00:00+08",
877 "2020-09-08T00:00:00+08",
878 ],
879 ),
880 ];
881
882 cases
883 .iter()
884 .for_each(|(original, tz_opt, origin, expected)| {
885 let input = original
886 .iter()
887 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
888 .collect::<TimestampNanosecondArray>()
889 .with_timezone_opt(tz_opt.clone());
890 let right = expected
891 .iter()
892 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
893 .collect::<TimestampNanosecondArray>()
894 .with_timezone_opt(tz_opt.clone());
895 let batch_len = input.len();
896 let args = datafusion_expr::ScalarFunctionArgs {
897 args: vec![
898 ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
899 ColumnarValue::Array(Arc::new(input)),
900 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
901 Some(string_to_timestamp_nanos(origin).unwrap()),
902 tz_opt.clone(),
903 )),
904 ],
905 number_rows: batch_len,
906 return_type: &DataType::Timestamp(
907 TimeUnit::Nanosecond,
908 tz_opt.clone(),
909 ),
910 };
911 let result = DateBinFunc::new().invoke_with_args(args).unwrap();
912 if let ColumnarValue::Array(result) = result {
913 assert_eq!(
914 result.data_type(),
915 &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
916 );
917 let left = arrow::array::cast::as_primitive_array::<
918 TimestampNanosecondType,
919 >(&result);
920 assert_eq!(left, &right);
921 } else {
922 panic!("unexpected column type");
923 }
924 });
925 }
926
927 #[test]
928 fn test_date_bin_single() {
929 let cases = vec![
930 (
931 (
932 TimeDelta::try_minutes(15),
933 "2004-04-09T02:03:04.123456789Z",
934 "2001-01-01T00:00:00",
935 ),
936 "2004-04-09T02:00:00Z",
937 ),
938 (
939 (
940 TimeDelta::try_minutes(15),
941 "2004-04-09T02:03:04.123456789Z",
942 "2001-01-01T00:02:30",
943 ),
944 "2004-04-09T02:02:30Z",
945 ),
946 (
947 (
948 TimeDelta::try_minutes(15),
949 "2004-04-09T02:03:04.123456789Z",
950 "2005-01-01T00:02:30",
951 ),
952 "2004-04-09T02:02:30Z",
953 ),
954 (
955 (
956 TimeDelta::try_hours(1),
957 "2004-04-09T02:03:04.123456789Z",
958 "2001-01-01T00:00:00",
959 ),
960 "2004-04-09T02:00:00Z",
961 ),
962 (
963 (
964 TimeDelta::try_seconds(10),
965 "2004-04-09T02:03:11.123456789Z",
966 "2001-01-01T00:00:00",
967 ),
968 "2004-04-09T02:03:10Z",
969 ),
970 ];
971
972 cases
973 .iter()
974 .for_each(|((stride, source, origin), expected)| {
975 let stride = stride.unwrap();
976 let stride1 = stride.num_nanoseconds().unwrap();
977 let source1 = string_to_timestamp_nanos(source).unwrap();
978 let origin1 = string_to_timestamp_nanos(origin).unwrap();
979
980 let expected1 = string_to_timestamp_nanos(expected).unwrap();
981 let result = date_bin_nanos_interval(stride1, source1, origin1);
982 assert_eq!(result, expected1, "{source} = {expected}");
983 })
984 }
985
986 #[test]
987 fn test_date_bin_before_epoch() {
988 let cases = [
989 (
990 (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
991 "1969-12-31T23:30:00",
992 ),
993 (
994 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
995 "1969-12-31T23:45:00",
996 ),
997 (
998 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
999 "1969-12-31T23:45:00",
1000 ),
1001 ];
1002
1003 cases.iter().for_each(|((stride, source), expected)| {
1004 let stride = stride.unwrap();
1005 let stride1 = stride.num_nanoseconds().unwrap();
1006 let source1 = string_to_timestamp_nanos(source).unwrap();
1007
1008 let expected1 = string_to_timestamp_nanos(expected).unwrap();
1009 let result = date_bin_nanos_interval(stride1, source1, 0);
1010 assert_eq!(result, expected1, "{source} = {expected}");
1011 })
1012 }
1013}