datafusion_functions/datetime/
date_bin.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23    ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25    TimestampSecondType,
26};
27use arrow::array::{ArrayRef, PrimitiveArray};
28use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{DataType, TimeUnit};
32
33use datafusion_common::cast::as_primitive_array;
34use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
35use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
36use datafusion_expr::TypeSignature::Exact;
37use datafusion_expr::{
38    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
39};
40use datafusion_macros::user_doc;
41
42use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
43
44#[user_doc(
45    doc_section(label = "Time and Date Functions"),
46    description = r#"
47Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
48
49For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
50"#,
51    syntax_example = "date_bin(interval, expression, origin-timestamp)",
52    sql_example = r#"```sql
53-- Bin the timestamp into 1 day intervals
54> SELECT date_bin(interval '1 day', time) as bin
55FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
56+---------------------+
57| bin                 |
58+---------------------+
59| 2023-01-01T00:00:00 |
60| 2023-01-03T00:00:00 |
61+---------------------+
622 row(s) fetched.
63
64-- Bin the timestamp into 1 day intervals starting at 3AM on  2023-01-01
65> SELECT date_bin(interval '1 day', time,  '2023-01-01T03:00:00') as bin
66FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
67+---------------------+
68| bin                 |
69+---------------------+
70| 2023-01-01T03:00:00 |
71| 2023-01-03T03:00:00 |
72+---------------------+
732 row(s) fetched.
74```"#,
75    argument(name = "interval", description = "Bin interval."),
76    argument(
77        name = "expression",
78        description = "Time expression to operate on. Can be a constant, column, or function."
79    ),
80    argument(
81        name = "origin-timestamp",
82        description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
83
84    - nanoseconds
85    - microseconds
86    - milliseconds
87    - seconds
88    - minutes
89    - hours
90    - days
91    - weeks
92    - months
93    - years
94    - century
95"#
96    )
97)]
98#[derive(Debug)]
99pub struct DateBinFunc {
100    signature: Signature,
101}
102
103impl Default for DateBinFunc {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109impl DateBinFunc {
110    pub fn new() -> Self {
111        let base_sig = |array_type: TimeUnit| {
112            vec![
113                Exact(vec![
114                    DataType::Interval(MonthDayNano),
115                    Timestamp(array_type, None),
116                    Timestamp(Nanosecond, None),
117                ]),
118                Exact(vec![
119                    DataType::Interval(MonthDayNano),
120                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
121                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
122                ]),
123                Exact(vec![
124                    DataType::Interval(DayTime),
125                    Timestamp(array_type, None),
126                    Timestamp(Nanosecond, None),
127                ]),
128                Exact(vec![
129                    DataType::Interval(DayTime),
130                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
131                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
132                ]),
133                Exact(vec![
134                    DataType::Interval(MonthDayNano),
135                    Timestamp(array_type, None),
136                ]),
137                Exact(vec![
138                    DataType::Interval(MonthDayNano),
139                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
140                ]),
141                Exact(vec![
142                    DataType::Interval(DayTime),
143                    Timestamp(array_type, None),
144                ]),
145                Exact(vec![
146                    DataType::Interval(DayTime),
147                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
148                ]),
149            ]
150        };
151
152        let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
153            .into_iter()
154            .map(base_sig)
155            .collect::<Vec<_>>()
156            .concat();
157
158        Self {
159            signature: Signature::one_of(full_sig, Volatility::Immutable),
160        }
161    }
162}
163
164impl ScalarUDFImpl for DateBinFunc {
165    fn as_any(&self) -> &dyn Any {
166        self
167    }
168
169    fn name(&self) -> &str {
170        "date_bin"
171    }
172
173    fn signature(&self) -> &Signature {
174        &self.signature
175    }
176
177    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
178        match &arg_types[1] {
179            Timestamp(Nanosecond, None) | Utf8 | Null => Ok(Timestamp(Nanosecond, None)),
180            Timestamp(Nanosecond, tz_opt) => Ok(Timestamp(Nanosecond, tz_opt.clone())),
181            Timestamp(Microsecond, tz_opt) => Ok(Timestamp(Microsecond, tz_opt.clone())),
182            Timestamp(Millisecond, tz_opt) => Ok(Timestamp(Millisecond, tz_opt.clone())),
183            Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
184            _ => plan_err!(
185                "The date_bin function can only accept timestamp as the second arg."
186            ),
187        }
188    }
189
190    fn invoke_with_args(
191        &self,
192        args: datafusion_expr::ScalarFunctionArgs,
193    ) -> Result<ColumnarValue> {
194        let args = &args.args;
195        if args.len() == 2 {
196            // Default to unix EPOCH
197            let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
198                Some(0),
199                Some("+00:00".into()),
200            ));
201            date_bin_impl(&args[0], &args[1], &origin)
202        } else if args.len() == 3 {
203            date_bin_impl(&args[0], &args[1], &args[2])
204        } else {
205            exec_err!("DATE_BIN expected two or three arguments")
206        }
207    }
208
209    fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
210        // The DATE_BIN function preserves the order of its second argument.
211        let step = &input[0];
212        let date_value = &input[1];
213        let reference = input.get(2);
214
215        if step.sort_properties.eq(&SortProperties::Singleton)
216            && reference
217                .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
218                .unwrap_or(true)
219        {
220            Ok(date_value.sort_properties)
221        } else {
222            Ok(SortProperties::Unordered)
223        }
224    }
225    fn documentation(&self) -> Option<&Documentation> {
226        self.doc()
227    }
228}
229
230enum Interval {
231    Nanoseconds(i64),
232    Months(i64),
233}
234
235impl Interval {
236    /// Returns (`stride_nanos`, `fn`) where
237    ///
238    /// 1. `stride_nanos` is a width, in nanoseconds
239    /// 2. `fn` is a function that takes (stride_nanos, source, origin)
240    ///
241    /// `source` is the timestamp being binned
242    ///
243    /// `origin`  is the time, in nanoseconds, where windows are measured from
244    fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
245        match self {
246            Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
247            Interval::Months(months) => (*months, date_bin_months_interval),
248        }
249    }
250}
251
252// return time in nanoseconds that the source timestamp falls into based on the stride and origin
253fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 {
254    let time_diff = source - origin;
255
256    // distance from origin to bin
257    let time_delta = compute_distance(time_diff, stride_nanos);
258
259    origin + time_delta
260}
261
262// distance from origin to bin
263fn compute_distance(time_diff: i64, stride: i64) -> i64 {
264    let time_delta = time_diff - (time_diff % stride);
265
266    if time_diff < 0 && stride > 1 && time_delta != time_diff {
267        // The origin is later than the source timestamp, round down to the previous bin
268        time_delta - stride
269    } else {
270        time_delta
271    }
272}
273
274// return time in nanoseconds that the source timestamp falls into based on the stride and origin
275fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64 {
276    // convert source and origin to DateTime<Utc>
277    let source_date = to_utc_date_time(source);
278    let origin_date = to_utc_date_time(origin);
279
280    // calculate the number of months between the source and origin
281    let month_diff = (source_date.year() - origin_date.year()) * 12
282        + source_date.month() as i32
283        - origin_date.month() as i32;
284
285    // distance from origin to bin
286    let month_delta = compute_distance(month_diff as i64, stride_months);
287
288    let mut bin_time = if month_delta < 0 {
289        origin_date - Months::new(month_delta.unsigned_abs() as u32)
290    } else {
291        origin_date + Months::new(month_delta as u32)
292    };
293
294    // If origin is not midnight of first date of the month, the bin_time may be larger than the source
295    // In this case, we need to move back to previous bin
296    if bin_time > source_date {
297        let month_delta = month_delta - stride_months;
298        bin_time = if month_delta < 0 {
299            origin_date - Months::new(month_delta.unsigned_abs() as u32)
300        } else {
301            origin_date + Months::new(month_delta as u32)
302        };
303    }
304
305    bin_time.timestamp_nanos_opt().unwrap()
306}
307
308fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
309    let secs = nanos / 1_000_000_000;
310    let nsec = (nanos % 1_000_000_000) as u32;
311    DateTime::from_timestamp(secs, nsec).unwrap()
312}
313
314// Supported intervals:
315//  1. IntervalDayTime: this means that the stride is in days, hours, minutes, seconds and milliseconds
316//     We will assume month interval won't be converted into this type
317//     TODO (my next PR): without `INTERVAL` keyword, the stride was converted into ScalarValue::IntervalDayTime somewhere
318//             for month interval. I need to find that and make it ScalarValue::IntervalMonthDayNano instead
319// 2. IntervalMonthDayNano
320fn date_bin_impl(
321    stride: &ColumnarValue,
322    array: &ColumnarValue,
323    origin: &ColumnarValue,
324) -> Result<ColumnarValue> {
325    let stride = match stride {
326        ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
327            let (days, ms) = IntervalDayTimeType::to_parts(*v);
328            let nanos = (TimeDelta::try_days(days as i64).unwrap()
329                + TimeDelta::try_milliseconds(ms as i64).unwrap())
330            .num_nanoseconds();
331
332            match nanos {
333                Some(v) => Interval::Nanoseconds(v),
334                _ => return exec_err!("DATE_BIN stride argument is too large"),
335            }
336        }
337        ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
338            let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
339
340            // If interval is months, its origin must be midnight of first date of the month
341            if months != 0 {
342                // Return error if days or nanos is not zero
343                if days != 0 || nanos != 0 {
344                    return not_impl_err!(
345                        "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
346                    );
347                } else {
348                    Interval::Months(months as i64)
349                }
350            } else {
351                let nanos = (TimeDelta::try_days(days as i64).unwrap()
352                    + Duration::nanoseconds(nanos))
353                .num_nanoseconds();
354                match nanos {
355                    Some(v) => Interval::Nanoseconds(v),
356                    _ => return exec_err!("DATE_BIN stride argument is too large"),
357                }
358            }
359        }
360        ColumnarValue::Scalar(v) => {
361            return exec_err!(
362                "DATE_BIN expects stride argument to be an INTERVAL but got {}",
363                v.data_type()
364            );
365        }
366        ColumnarValue::Array(_) => {
367            return not_impl_err!(
368            "DATE_BIN only supports literal values for the stride argument, not arrays"
369        );
370        }
371    };
372
373    let origin = match origin {
374        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
375        ColumnarValue::Scalar(v) => {
376            return exec_err!(
377                "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got {}",
378                v.data_type()
379            );
380        }
381        ColumnarValue::Array(_) => {
382            return not_impl_err!(
383            "DATE_BIN only supports literal values for the origin argument, not arrays"
384        );
385        }
386    };
387
388    let (stride, stride_fn) = stride.bin_fn();
389
390    // Return error if stride is 0
391    if stride == 0 {
392        return exec_err!("DATE_BIN stride must be non-zero");
393    }
394
395    fn stride_map_fn<T: ArrowTimestampType>(
396        origin: i64,
397        stride: i64,
398        stride_fn: fn(i64, i64, i64) -> i64,
399    ) -> impl Fn(i64) -> i64 {
400        let scale = match T::UNIT {
401            Nanosecond => 1,
402            Microsecond => NANOSECONDS / 1_000_000,
403            Millisecond => NANOSECONDS / 1_000,
404            Second => NANOSECONDS,
405        };
406        move |x: i64| stride_fn(stride, x * scale, origin) / scale
407    }
408
409    Ok(match array {
410        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
411            let apply_stride_fn =
412                stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
413            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
414                v.map(apply_stride_fn),
415                tz_opt.clone(),
416            ))
417        }
418        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
419            let apply_stride_fn =
420                stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
421            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
422                v.map(apply_stride_fn),
423                tz_opt.clone(),
424            ))
425        }
426        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
427            let apply_stride_fn =
428                stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
429            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
430                v.map(apply_stride_fn),
431                tz_opt.clone(),
432            ))
433        }
434        ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
435            let apply_stride_fn =
436                stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
437            ColumnarValue::Scalar(ScalarValue::TimestampSecond(
438                v.map(apply_stride_fn),
439                tz_opt.clone(),
440            ))
441        }
442
443        ColumnarValue::Array(array) => {
444            fn transform_array_with_stride<T>(
445                origin: i64,
446                stride: i64,
447                stride_fn: fn(i64, i64, i64) -> i64,
448                array: &ArrayRef,
449                tz_opt: &Option<Arc<str>>,
450            ) -> Result<ColumnarValue>
451            where
452                T: ArrowTimestampType,
453            {
454                let array = as_primitive_array::<T>(array)?;
455                let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
456                let array: PrimitiveArray<T> = array
457                    .unary(apply_stride_fn)
458                    .with_timezone_opt(tz_opt.clone());
459
460                Ok(ColumnarValue::Array(Arc::new(array)))
461            }
462
463            match array.data_type() {
464                Timestamp(Nanosecond, tz_opt) => {
465                    transform_array_with_stride::<TimestampNanosecondType>(
466                        origin, stride, stride_fn, array, tz_opt,
467                    )?
468                }
469                Timestamp(Microsecond, tz_opt) => {
470                    transform_array_with_stride::<TimestampMicrosecondType>(
471                        origin, stride, stride_fn, array, tz_opt,
472                    )?
473                }
474                Timestamp(Millisecond, tz_opt) => {
475                    transform_array_with_stride::<TimestampMillisecondType>(
476                        origin, stride, stride_fn, array, tz_opt,
477                    )?
478                }
479                Timestamp(Second, tz_opt) => {
480                    transform_array_with_stride::<TimestampSecondType>(
481                        origin, stride, stride_fn, array, tz_opt,
482                    )?
483                }
484                _ => {
485                    return exec_err!(
486                        "DATE_BIN expects source argument to be a TIMESTAMP but got {}",
487                        array.data_type()
488                    );
489                }
490            }
491        }
492        _ => {
493            return exec_err!(
494                "DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
495            );
496        }
497    })
498}
499
500#[cfg(test)]
501mod tests {
502    use std::sync::Arc;
503
504    use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
505    use arrow::array::types::TimestampNanosecondType;
506    use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
507    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
508    use arrow::datatypes::{DataType, TimeUnit};
509
510    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
511    use datafusion_common::ScalarValue;
512    use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
513
514    use chrono::TimeDelta;
515
516    #[test]
517    fn test_date_bin() {
518        let mut args = datafusion_expr::ScalarFunctionArgs {
519            args: vec![
520                ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
521                    IntervalDayTime {
522                        days: 0,
523                        milliseconds: 1,
524                    },
525                ))),
526                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
527                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
528            ],
529            number_rows: 1,
530            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
531        };
532        let res = DateBinFunc::new().invoke_with_args(args);
533        assert!(res.is_ok());
534
535        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
536        let batch_len = timestamps.len();
537        args = datafusion_expr::ScalarFunctionArgs {
538            args: vec![
539                ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
540                    IntervalDayTime {
541                        days: 0,
542                        milliseconds: 1,
543                    },
544                ))),
545                ColumnarValue::Array(timestamps),
546                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
547            ],
548            number_rows: batch_len,
549            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
550        };
551        let res = DateBinFunc::new().invoke_with_args(args);
552        assert!(res.is_ok());
553
554        args = datafusion_expr::ScalarFunctionArgs {
555            args: vec![
556                ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
557                    IntervalDayTime {
558                        days: 0,
559                        milliseconds: 1,
560                    },
561                ))),
562                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
563            ],
564            number_rows: 1,
565            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
566        };
567        let res = DateBinFunc::new().invoke_with_args(args);
568        assert!(res.is_ok());
569
570        // stride supports month-day-nano
571        args = datafusion_expr::ScalarFunctionArgs {
572            args: vec![
573                ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
574                    IntervalMonthDayNano {
575                        months: 0,
576                        days: 0,
577                        nanoseconds: 1,
578                    },
579                ))),
580                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
581                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
582            ],
583            number_rows: 1,
584            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
585        };
586        let res = DateBinFunc::new().invoke_with_args(args);
587        assert!(res.is_ok());
588
589        //
590        // Fallible test cases
591        //
592
593        // invalid number of arguments
594        args = datafusion_expr::ScalarFunctionArgs {
595            args: vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
596                IntervalDayTime {
597                    days: 0,
598                    milliseconds: 1,
599                },
600            )))],
601            number_rows: 1,
602            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
603        };
604        let res = DateBinFunc::new().invoke_with_args(args);
605        assert_eq!(
606            res.err().unwrap().strip_backtrace(),
607            "Execution error: DATE_BIN expected two or three arguments"
608        );
609
610        // stride: invalid type
611        args = datafusion_expr::ScalarFunctionArgs {
612            args: vec![
613                ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
614                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
615                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
616            ],
617            number_rows: 1,
618            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
619        };
620        let res = DateBinFunc::new().invoke_with_args(args);
621        assert_eq!(
622            res.err().unwrap().strip_backtrace(),
623            "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
624        );
625
626        // stride: invalid value
627
628        args = datafusion_expr::ScalarFunctionArgs {
629            args: vec![
630                ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
631                    IntervalDayTime {
632                        days: 0,
633                        milliseconds: 0,
634                    },
635                ))),
636                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
637                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
638            ],
639            number_rows: 1,
640            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
641        };
642
643        let res = DateBinFunc::new().invoke_with_args(args);
644        assert_eq!(
645            res.err().unwrap().strip_backtrace(),
646            "Execution error: DATE_BIN stride must be non-zero"
647        );
648
649        // stride: overflow of day-time interval
650        args = datafusion_expr::ScalarFunctionArgs {
651            args: vec![
652                ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
653                    IntervalDayTime::MAX,
654                ))),
655                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
656                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
657            ],
658            number_rows: 1,
659            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
660        };
661        let res = DateBinFunc::new().invoke_with_args(args);
662        assert_eq!(
663            res.err().unwrap().strip_backtrace(),
664            "Execution error: DATE_BIN stride argument is too large"
665        );
666
667        // stride: overflow of month-day-nano interval
668        args = datafusion_expr::ScalarFunctionArgs {
669            args: vec![
670                ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
671                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
672                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
673            ],
674            number_rows: 1,
675            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
676        };
677        let res = DateBinFunc::new().invoke_with_args(args);
678        assert_eq!(
679            res.err().unwrap().strip_backtrace(),
680            "Execution error: DATE_BIN stride argument is too large"
681        );
682
683        // stride: month intervals
684        args = datafusion_expr::ScalarFunctionArgs {
685            args: vec![
686                ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
687                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
688                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
689            ],
690            number_rows: 1,
691            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
692        };
693        let res = DateBinFunc::new().invoke_with_args(args);
694        assert_eq!(
695            res.err().unwrap().strip_backtrace(),
696            "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
697        );
698
699        // origin: invalid type
700        args = datafusion_expr::ScalarFunctionArgs {
701            args: vec![
702                ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
703                    IntervalDayTime {
704                        days: 0,
705                        milliseconds: 1,
706                    },
707                ))),
708                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
709                ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
710            ],
711            number_rows: 1,
712            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
713        };
714        let res = DateBinFunc::new().invoke_with_args(args);
715        assert_eq!(
716            res.err().unwrap().strip_backtrace(),
717            "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(Microsecond, None)"
718        );
719
720        args = datafusion_expr::ScalarFunctionArgs {
721            args: vec![
722                ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
723                    IntervalDayTime {
724                        days: 0,
725                        milliseconds: 1,
726                    },
727                ))),
728                ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
729                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
730            ],
731            number_rows: 1,
732            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
733        };
734        let res = DateBinFunc::new().invoke_with_args(args);
735        assert!(res.is_ok());
736
737        // unsupported array type for stride
738        let intervals = Arc::new(
739            (1..6)
740                .map(|x| {
741                    Some(IntervalDayTime {
742                        days: 0,
743                        milliseconds: x,
744                    })
745                })
746                .collect::<IntervalDayTimeArray>(),
747        );
748        args = datafusion_expr::ScalarFunctionArgs {
749            args: vec![
750                ColumnarValue::Array(intervals),
751                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
752                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
753            ],
754            number_rows: 1,
755            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
756        };
757        let res = DateBinFunc::new().invoke_with_args(args);
758        assert_eq!(
759            res.err().unwrap().strip_backtrace(),
760            "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
761        );
762
763        // unsupported array type for origin
764        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
765        let batch_len = timestamps.len();
766        args = datafusion_expr::ScalarFunctionArgs {
767            args: vec![
768                ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
769                    IntervalDayTime {
770                        days: 0,
771                        milliseconds: 1,
772                    },
773                ))),
774                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
775                ColumnarValue::Array(timestamps),
776            ],
777            number_rows: batch_len,
778            return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
779        };
780        let res = DateBinFunc::new().invoke_with_args(args);
781        assert_eq!(
782            res.err().unwrap().strip_backtrace(),
783            "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
784        );
785    }
786
787    #[test]
788    fn test_date_bin_timezones() {
789        let cases = vec![
790            (
791                vec![
792                    "2020-09-08T00:00:00Z",
793                    "2020-09-08T01:00:00Z",
794                    "2020-09-08T02:00:00Z",
795                    "2020-09-08T03:00:00Z",
796                    "2020-09-08T04:00:00Z",
797                ],
798                Some("+00".into()),
799                "1970-01-01T00:00:00Z",
800                vec![
801                    "2020-09-08T00:00:00Z",
802                    "2020-09-08T00:00:00Z",
803                    "2020-09-08T00:00:00Z",
804                    "2020-09-08T00:00:00Z",
805                    "2020-09-08T00:00:00Z",
806                ],
807            ),
808            (
809                vec![
810                    "2020-09-08T00:00:00Z",
811                    "2020-09-08T01:00:00Z",
812                    "2020-09-08T02:00:00Z",
813                    "2020-09-08T03:00:00Z",
814                    "2020-09-08T04:00:00Z",
815                ],
816                None,
817                "1970-01-01T00:00:00Z",
818                vec![
819                    "2020-09-08T00:00:00Z",
820                    "2020-09-08T00:00:00Z",
821                    "2020-09-08T00:00:00Z",
822                    "2020-09-08T00:00:00Z",
823                    "2020-09-08T00:00:00Z",
824                ],
825            ),
826            (
827                vec![
828                    "2020-09-08T00:00:00Z",
829                    "2020-09-08T01:00:00Z",
830                    "2020-09-08T02:00:00Z",
831                    "2020-09-08T03:00:00Z",
832                    "2020-09-08T04:00:00Z",
833                ],
834                Some("-02".into()),
835                "1970-01-01T00:00:00Z",
836                vec![
837                    "2020-09-08T00:00:00Z",
838                    "2020-09-08T00:00:00Z",
839                    "2020-09-08T00:00:00Z",
840                    "2020-09-08T00:00:00Z",
841                    "2020-09-08T00:00:00Z",
842                ],
843            ),
844            (
845                vec![
846                    "2020-09-08T00:00:00+05",
847                    "2020-09-08T01:00:00+05",
848                    "2020-09-08T02:00:00+05",
849                    "2020-09-08T03:00:00+05",
850                    "2020-09-08T04:00:00+05",
851                ],
852                Some("+05".into()),
853                "1970-01-01T00:00:00+05",
854                vec![
855                    "2020-09-08T00:00:00+05",
856                    "2020-09-08T00:00:00+05",
857                    "2020-09-08T00:00:00+05",
858                    "2020-09-08T00:00:00+05",
859                    "2020-09-08T00:00:00+05",
860                ],
861            ),
862            (
863                vec![
864                    "2020-09-08T00:00:00+08",
865                    "2020-09-08T01:00:00+08",
866                    "2020-09-08T02:00:00+08",
867                    "2020-09-08T03:00:00+08",
868                    "2020-09-08T04:00:00+08",
869                ],
870                Some("+08".into()),
871                "1970-01-01T00:00:00+08",
872                vec![
873                    "2020-09-08T00:00:00+08",
874                    "2020-09-08T00:00:00+08",
875                    "2020-09-08T00:00:00+08",
876                    "2020-09-08T00:00:00+08",
877                    "2020-09-08T00:00:00+08",
878                ],
879            ),
880        ];
881
882        cases
883            .iter()
884            .for_each(|(original, tz_opt, origin, expected)| {
885                let input = original
886                    .iter()
887                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
888                    .collect::<TimestampNanosecondArray>()
889                    .with_timezone_opt(tz_opt.clone());
890                let right = expected
891                    .iter()
892                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
893                    .collect::<TimestampNanosecondArray>()
894                    .with_timezone_opt(tz_opt.clone());
895                let batch_len = input.len();
896                let args = datafusion_expr::ScalarFunctionArgs {
897                    args: vec![
898                        ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
899                        ColumnarValue::Array(Arc::new(input)),
900                        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
901                            Some(string_to_timestamp_nanos(origin).unwrap()),
902                            tz_opt.clone(),
903                        )),
904                    ],
905                    number_rows: batch_len,
906                    return_type: &DataType::Timestamp(
907                        TimeUnit::Nanosecond,
908                        tz_opt.clone(),
909                    ),
910                };
911                let result = DateBinFunc::new().invoke_with_args(args).unwrap();
912                if let ColumnarValue::Array(result) = result {
913                    assert_eq!(
914                        result.data_type(),
915                        &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
916                    );
917                    let left = arrow::array::cast::as_primitive_array::<
918                        TimestampNanosecondType,
919                    >(&result);
920                    assert_eq!(left, &right);
921                } else {
922                    panic!("unexpected column type");
923                }
924            });
925    }
926
927    #[test]
928    fn test_date_bin_single() {
929        let cases = vec![
930            (
931                (
932                    TimeDelta::try_minutes(15),
933                    "2004-04-09T02:03:04.123456789Z",
934                    "2001-01-01T00:00:00",
935                ),
936                "2004-04-09T02:00:00Z",
937            ),
938            (
939                (
940                    TimeDelta::try_minutes(15),
941                    "2004-04-09T02:03:04.123456789Z",
942                    "2001-01-01T00:02:30",
943                ),
944                "2004-04-09T02:02:30Z",
945            ),
946            (
947                (
948                    TimeDelta::try_minutes(15),
949                    "2004-04-09T02:03:04.123456789Z",
950                    "2005-01-01T00:02:30",
951                ),
952                "2004-04-09T02:02:30Z",
953            ),
954            (
955                (
956                    TimeDelta::try_hours(1),
957                    "2004-04-09T02:03:04.123456789Z",
958                    "2001-01-01T00:00:00",
959                ),
960                "2004-04-09T02:00:00Z",
961            ),
962            (
963                (
964                    TimeDelta::try_seconds(10),
965                    "2004-04-09T02:03:11.123456789Z",
966                    "2001-01-01T00:00:00",
967                ),
968                "2004-04-09T02:03:10Z",
969            ),
970        ];
971
972        cases
973            .iter()
974            .for_each(|((stride, source, origin), expected)| {
975                let stride = stride.unwrap();
976                let stride1 = stride.num_nanoseconds().unwrap();
977                let source1 = string_to_timestamp_nanos(source).unwrap();
978                let origin1 = string_to_timestamp_nanos(origin).unwrap();
979
980                let expected1 = string_to_timestamp_nanos(expected).unwrap();
981                let result = date_bin_nanos_interval(stride1, source1, origin1);
982                assert_eq!(result, expected1, "{source} = {expected}");
983            })
984    }
985
986    #[test]
987    fn test_date_bin_before_epoch() {
988        let cases = [
989            (
990                (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
991                "1969-12-31T23:30:00",
992            ),
993            (
994                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
995                "1969-12-31T23:45:00",
996            ),
997            (
998                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
999                "1969-12-31T23:45:00",
1000            ),
1001        ];
1002
1003        cases.iter().for_each(|((stride, source), expected)| {
1004            let stride = stride.unwrap();
1005            let stride1 = stride.num_nanoseconds().unwrap();
1006            let source1 = string_to_timestamp_nanos(source).unwrap();
1007
1008            let expected1 = string_to_timestamp_nanos(expected).unwrap();
1009            let result = date_bin_nanos_interval(stride1, source1, 0);
1010            assert_eq!(result, expected1, "{source} = {expected}");
1011        })
1012    }
1013}