datafusion_functions/datetime/
to_local_time.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::ops::Add;
20use std::sync::Arc;
21
22use arrow::array::timezone::Tz;
23use arrow::array::{Array, ArrayRef, PrimitiveBuilder};
24use arrow::datatypes::DataType::Timestamp;
25use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
26use arrow::datatypes::{
27    ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
28    TimestampNanosecondType, TimestampSecondType,
29};
30use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
31
32use datafusion_common::cast::as_primitive_array;
33use datafusion_common::{
34    exec_err, plan_err, utils::take_function_args, DataFusionError, Result, ScalarValue,
35};
36use datafusion_expr::{
37    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
38};
39use datafusion_macros::user_doc;
40
41/// A UDF function that converts a timezone-aware timestamp to local time (with no offset or
42/// timezone information). In other words, this function strips off the timezone from the timestamp,
43/// while keep the display value of the timestamp the same.
44#[user_doc(
45    doc_section(label = "Time and Date Functions"),
46    description = "Converts a timestamp with a timezone to a timestamp without a timezone (with no offset or timezone information). This function handles daylight saving time changes.",
47    syntax_example = "to_local_time(expression)",
48    sql_example = r#"```sql
49> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp);
50+---------------------------------------------+
51| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
52+---------------------------------------------+
53| 2024-04-01T00:00:20                         |
54+---------------------------------------------+
55
56> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels');
57+---------------------------------------------+
58| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
59+---------------------------------------------+
60| 2024-04-01T00:00:20                         |
61+---------------------------------------------+
62
63> SELECT
64  time,
65  arrow_typeof(time) as type,
66  to_local_time(time) as to_local_time,
67  arrow_typeof(to_local_time(time)) as to_local_time_type
68FROM (
69  SELECT '2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels' AS time
70);
71+---------------------------+------------------------------------------------+---------------------+-----------------------------+
72| time                      | type                                           | to_local_time       | to_local_time_type          |
73+---------------------------+------------------------------------------------+---------------------+-----------------------------+
74| 2024-04-01T00:00:20+02:00 | Timestamp(Nanosecond, Some("Europe/Brussels")) | 2024-04-01T00:00:20 | Timestamp(Nanosecond, None) |
75+---------------------------+------------------------------------------------+---------------------+-----------------------------+
76
77# combine `to_local_time()` with `date_bin()` to bin on boundaries in the timezone rather
78# than UTC boundaries
79
80> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AS date_bin;
81+---------------------+
82| date_bin            |
83+---------------------+
84| 2024-04-01T00:00:00 |
85+---------------------+
86
87> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AT TIME ZONE 'Europe/Brussels' AS date_bin_with_timezone;
88+---------------------------+
89| date_bin_with_timezone    |
90+---------------------------+
91| 2024-04-01T00:00:00+02:00 |
92+---------------------------+
93```"#,
94    argument(
95        name = "expression",
96        description = "Time expression to operate on. Can be a constant, column, or function."
97    )
98)]
99#[derive(Debug)]
100pub struct ToLocalTimeFunc {
101    signature: Signature,
102}
103
104impl Default for ToLocalTimeFunc {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl ToLocalTimeFunc {
111    pub fn new() -> Self {
112        Self {
113            signature: Signature::user_defined(Volatility::Immutable),
114        }
115    }
116
117    fn to_local_time(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
118        let [time_value] = take_function_args(self.name(), args)?;
119
120        let arg_type = time_value.data_type();
121        match arg_type {
122            Timestamp(_, None) => {
123                // if no timezone specified, just return the input
124                Ok(time_value.clone())
125            }
126            // If has timezone, adjust the underlying time value. The current time value
127            // is stored as i64 in UTC, even though the timezone may not be in UTC. Therefore,
128            // we need to adjust the time value to the local time. See [`adjust_to_local_time`]
129            // for more details.
130            //
131            // Then remove the timezone in return type, i.e. return None
132            Timestamp(_, Some(timezone)) => {
133                let tz: Tz = timezone.parse()?;
134
135                match time_value {
136                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
137                        Some(ts),
138                        Some(_),
139                    )) => {
140                        let adjusted_ts =
141                            adjust_to_local_time::<TimestampNanosecondType>(*ts, tz)?;
142                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
143                            Some(adjusted_ts),
144                            None,
145                        )))
146                    }
147                    ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
148                        Some(ts),
149                        Some(_),
150                    )) => {
151                        let adjusted_ts =
152                            adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz)?;
153                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
154                            Some(adjusted_ts),
155                            None,
156                        )))
157                    }
158                    ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
159                        Some(ts),
160                        Some(_),
161                    )) => {
162                        let adjusted_ts =
163                            adjust_to_local_time::<TimestampMillisecondType>(*ts, tz)?;
164                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
165                            Some(adjusted_ts),
166                            None,
167                        )))
168                    }
169                    ColumnarValue::Scalar(ScalarValue::TimestampSecond(
170                        Some(ts),
171                        Some(_),
172                    )) => {
173                        let adjusted_ts =
174                            adjust_to_local_time::<TimestampSecondType>(*ts, tz)?;
175                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
176                            Some(adjusted_ts),
177                            None,
178                        )))
179                    }
180                    ColumnarValue::Array(array) => {
181                        fn transform_array<T: ArrowTimestampType>(
182                            array: &ArrayRef,
183                            tz: Tz,
184                        ) -> Result<ColumnarValue> {
185                            let mut builder = PrimitiveBuilder::<T>::new();
186
187                            let primitive_array = as_primitive_array::<T>(array)?;
188                            for ts_opt in primitive_array.iter() {
189                                match ts_opt {
190                                    None => builder.append_null(),
191                                    Some(ts) => {
192                                        let adjusted_ts: i64 =
193                                            adjust_to_local_time::<T>(ts, tz)?;
194                                        builder.append_value(adjusted_ts)
195                                    }
196                                }
197                            }
198
199                            Ok(ColumnarValue::Array(Arc::new(builder.finish())))
200                        }
201
202                        match array.data_type() {
203                            Timestamp(_, None) => {
204                                // if no timezone specified, just return the input
205                                Ok(time_value.clone())
206                            }
207                            Timestamp(Nanosecond, Some(_)) => {
208                                transform_array::<TimestampNanosecondType>(array, tz)
209                            }
210                            Timestamp(Microsecond, Some(_)) => {
211                                transform_array::<TimestampMicrosecondType>(array, tz)
212                            }
213                            Timestamp(Millisecond, Some(_)) => {
214                                transform_array::<TimestampMillisecondType>(array, tz)
215                            }
216                            Timestamp(Second, Some(_)) => {
217                                transform_array::<TimestampSecondType>(array, tz)
218                            }
219                            _ => {
220                                exec_err!("to_local_time function requires timestamp argument in array, got {:?}", array.data_type())
221                            }
222                        }
223                    }
224                    _ => {
225                        exec_err!(
226                        "to_local_time function requires timestamp argument, got {:?}",
227                        time_value.data_type()
228                    )
229                    }
230                }
231            }
232            _ => {
233                exec_err!(
234                    "to_local_time function requires timestamp argument, got {:?}",
235                    arg_type
236                )
237            }
238        }
239    }
240}
241
242/// This function converts a timestamp with a timezone to a timestamp without a timezone.
243/// The display value of the adjusted timestamp remain the same, but the underlying timestamp
244/// representation is adjusted according to the relative timezone offset to UTC.
245///
246/// This function uses chrono to handle daylight saving time changes.
247///
248/// For example,
249///
250/// ```text
251/// '2019-03-31T01:00:00Z'::timestamp at time zone 'Europe/Brussels'
252/// ```
253///
254/// is displayed as follows in datafusion-cli:
255///
256/// ```text
257/// 2019-03-31T01:00:00+01:00
258/// ```
259///
260/// and is represented in DataFusion as:
261///
262/// ```text
263/// TimestampNanosecond(Some(1_553_990_400_000_000_000), Some("Europe/Brussels"))
264/// ```
265///
266/// To strip off the timezone while keeping the display value the same, we need to
267/// adjust the underlying timestamp with the timezone offset value using `adjust_to_local_time()`
268///
269/// ```text
270/// adjust_to_local_time(1_553_990_400_000_000_000, "Europe/Brussels") --> 1_553_994_000_000_000_000
271/// ```
272///
273/// The difference between `1_553_990_400_000_000_000` and `1_553_994_000_000_000_000` is
274/// `3600_000_000_000` ns, which corresponds to 1 hour. This matches with the timezone
275/// offset for "Europe/Brussels" for this date.
276///
277/// Note that the offset varies with daylight savings time (DST), which makes this tricky! For
278/// example, timezone "Europe/Brussels" has a 2-hour offset during DST and a 1-hour offset
279/// when DST ends.
280///
281/// Consequently, DataFusion can represent the timestamp in local time (with no offset or
282/// timezone information) as
283///
284/// ```text
285/// TimestampNanosecond(Some(1_553_994_000_000_000_000), None)
286/// ```
287///
288/// which is displayed as follows in datafusion-cli:
289///
290/// ```text
291/// 2019-03-31T01:00:00
292/// ```
293///
294/// See `test_adjust_to_local_time()` for example
295fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
296    fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
297    where
298        F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
299    {
300        match converter(ts) {
301            MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
302                "Ambiguous timestamp. Do you mean {:?} or {:?}",
303                earliest,
304                latest
305            ),
306            MappedLocalTime::None => exec_err!(
307                "The local time does not exist because there is a gap in the local time."
308            ),
309            MappedLocalTime::Single(date_time) => Ok(date_time),
310        }
311    }
312
313    let date_time = match T::UNIT {
314        Nanosecond => Utc.timestamp_nanos(ts),
315        Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
316        Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
317        Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
318    };
319
320    let offset_seconds: i64 = tz
321        .offset_from_utc_datetime(&date_time.naive_utc())
322        .fix()
323        .local_minus_utc() as i64;
324
325    let adjusted_date_time = date_time.add(
326        // This should not fail under normal circumstances as the
327        // maximum possible offset is 26 hours (93,600 seconds)
328        TimeDelta::try_seconds(offset_seconds)
329            .ok_or(DataFusionError::Internal("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000".to_string()))?,
330    );
331
332    // convert the naive datetime back to i64
333    match T::UNIT {
334        Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or(
335            DataFusionError::Internal(
336                "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807".to_string(),
337            ),
338        ),
339        Microsecond => Ok(adjusted_date_time.timestamp_micros()),
340        Millisecond => Ok(adjusted_date_time.timestamp_millis()),
341        Second => Ok(adjusted_date_time.timestamp()),
342    }
343}
344
345impl ScalarUDFImpl for ToLocalTimeFunc {
346    fn as_any(&self) -> &dyn Any {
347        self
348    }
349
350    fn name(&self) -> &str {
351        "to_local_time"
352    }
353
354    fn signature(&self) -> &Signature {
355        &self.signature
356    }
357
358    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
359        let [time_value] = take_function_args(self.name(), arg_types)?;
360
361        match time_value {
362            Timestamp(timeunit, _) => Ok(Timestamp(*timeunit, None)),
363            _ => exec_err!(
364                "The to_local_time function can only accept timestamp as the arg, got {:?}", time_value
365            )
366        }
367    }
368
369    fn invoke_with_args(
370        &self,
371        args: datafusion_expr::ScalarFunctionArgs,
372    ) -> Result<ColumnarValue> {
373        let [time_value] = take_function_args(self.name(), args.args)?;
374
375        self.to_local_time(&[time_value.clone()])
376    }
377
378    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
379        if arg_types.len() != 1 {
380            return plan_err!(
381                "to_local_time function requires 1 argument, got {:?}",
382                arg_types.len()
383            );
384        }
385
386        let first_arg = arg_types[0].clone();
387        match &first_arg {
388            Timestamp(Nanosecond, timezone) => {
389                Ok(vec![Timestamp(Nanosecond, timezone.clone())])
390            }
391            Timestamp(Microsecond, timezone) => {
392                Ok(vec![Timestamp(Microsecond, timezone.clone())])
393            }
394            Timestamp(Millisecond, timezone) => {
395                Ok(vec![Timestamp(Millisecond, timezone.clone())])
396            }
397            Timestamp(Second, timezone) => Ok(vec![Timestamp(Second, timezone.clone())]),
398            _ => plan_err!("The to_local_time function can only accept Timestamp as the arg got {first_arg}"),
399        }
400    }
401    fn documentation(&self) -> Option<&Documentation> {
402        self.doc()
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use std::sync::Arc;
409
410    use arrow::array::{types::TimestampNanosecondType, TimestampNanosecondArray};
411    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
412    use arrow::datatypes::{DataType, TimeUnit};
413    use chrono::NaiveDateTime;
414    use datafusion_common::ScalarValue;
415    use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
416
417    use super::{adjust_to_local_time, ToLocalTimeFunc};
418
419    #[test]
420    fn test_adjust_to_local_time() {
421        let timestamp_str = "2020-03-31T13:40:00";
422        let tz: arrow::array::timezone::Tz =
423            "America/New_York".parse().expect("Invalid timezone");
424
425        let timestamp = timestamp_str
426            .parse::<NaiveDateTime>()
427            .unwrap()
428            .and_local_timezone(tz) // this is in a local timezone
429            .unwrap()
430            .timestamp_nanos_opt()
431            .unwrap();
432
433        let expected_timestamp = timestamp_str
434            .parse::<NaiveDateTime>()
435            .unwrap()
436            .and_utc() // this is in UTC
437            .timestamp_nanos_opt()
438            .unwrap();
439
440        let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz).unwrap();
441        assert_eq!(res, expected_timestamp);
442    }
443
444    #[test]
445    fn test_to_local_time_scalar() {
446        let timezone = Some("Europe/Brussels".into());
447        let timestamps_with_timezone = vec![
448            (
449                ScalarValue::TimestampNanosecond(
450                    Some(1_123_123_000_000_000_000),
451                    timezone.clone(),
452                ),
453                ScalarValue::TimestampNanosecond(Some(1_123_130_200_000_000_000), None),
454            ),
455            (
456                ScalarValue::TimestampMicrosecond(
457                    Some(1_123_123_000_000_000),
458                    timezone.clone(),
459                ),
460                ScalarValue::TimestampMicrosecond(Some(1_123_130_200_000_000), None),
461            ),
462            (
463                ScalarValue::TimestampMillisecond(
464                    Some(1_123_123_000_000),
465                    timezone.clone(),
466                ),
467                ScalarValue::TimestampMillisecond(Some(1_123_130_200_000), None),
468            ),
469            (
470                ScalarValue::TimestampSecond(Some(1_123_123_000), timezone),
471                ScalarValue::TimestampSecond(Some(1_123_130_200), None),
472            ),
473        ];
474
475        for (input, expected) in timestamps_with_timezone {
476            test_to_local_time_helper(input, expected);
477        }
478    }
479
480    #[test]
481    fn test_timezone_with_daylight_savings() {
482        let timezone_str = "America/New_York";
483        let tz: arrow::array::timezone::Tz =
484            timezone_str.parse().expect("Invalid timezone");
485
486        // Test data:
487        // (
488        //    the string display of the input timestamp,
489        //    the i64 representation of the timestamp before adjustment in nanosecond,
490        //    the i64 representation of the timestamp after adjustment in nanosecond,
491        // )
492        let test_cases = vec![
493            (
494                // DST time
495                "2020-03-31T13:40:00",
496                1_585_676_400_000_000_000,
497                1_585_662_000_000_000_000,
498            ),
499            (
500                // End of DST
501                "2020-11-04T14:06:40",
502                1_604_516_800_000_000_000,
503                1_604_498_800_000_000_000,
504            ),
505        ];
506
507        for (
508            input_timestamp_str,
509            expected_input_timestamp,
510            expected_adjusted_timestamp,
511        ) in test_cases
512        {
513            let input_timestamp = input_timestamp_str
514                .parse::<NaiveDateTime>()
515                .unwrap()
516                .and_local_timezone(tz) // this is in a local timezone
517                .unwrap()
518                .timestamp_nanos_opt()
519                .unwrap();
520            assert_eq!(input_timestamp, expected_input_timestamp);
521
522            let expected_timestamp = input_timestamp_str
523                .parse::<NaiveDateTime>()
524                .unwrap()
525                .and_utc() // this is in UTC
526                .timestamp_nanos_opt()
527                .unwrap();
528            assert_eq!(expected_timestamp, expected_adjusted_timestamp);
529
530            let input = ScalarValue::TimestampNanosecond(
531                Some(input_timestamp),
532                Some(timezone_str.into()),
533            );
534            let expected =
535                ScalarValue::TimestampNanosecond(Some(expected_timestamp), None);
536            test_to_local_time_helper(input, expected)
537        }
538    }
539
540    fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
541        let res = ToLocalTimeFunc::new()
542            .invoke_with_args(ScalarFunctionArgs {
543                args: vec![ColumnarValue::Scalar(input)],
544                number_rows: 1,
545                return_type: &expected.data_type(),
546            })
547            .unwrap();
548        match res {
549            ColumnarValue::Scalar(res) => {
550                assert_eq!(res, expected);
551            }
552            _ => panic!("unexpected return type"),
553        }
554    }
555
556    #[test]
557    fn test_to_local_time_timezones_array() {
558        let cases = [
559            (
560                vec![
561                    "2020-09-08T00:00:00",
562                    "2020-09-08T01:00:00",
563                    "2020-09-08T02:00:00",
564                    "2020-09-08T03:00:00",
565                    "2020-09-08T04:00:00",
566                ],
567                None::<Arc<str>>,
568                vec![
569                    "2020-09-08T00:00:00",
570                    "2020-09-08T01:00:00",
571                    "2020-09-08T02:00:00",
572                    "2020-09-08T03:00:00",
573                    "2020-09-08T04:00:00",
574                ],
575            ),
576            (
577                vec![
578                    "2020-09-08T00:00:00",
579                    "2020-09-08T01:00:00",
580                    "2020-09-08T02:00:00",
581                    "2020-09-08T03:00:00",
582                    "2020-09-08T04:00:00",
583                ],
584                Some("+01:00".into()),
585                vec![
586                    "2020-09-08T00:00:00",
587                    "2020-09-08T01:00:00",
588                    "2020-09-08T02:00:00",
589                    "2020-09-08T03:00:00",
590                    "2020-09-08T04:00:00",
591                ],
592            ),
593        ];
594
595        cases.iter().for_each(|(source, _tz_opt, expected)| {
596            let input = source
597                .iter()
598                .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
599                .collect::<TimestampNanosecondArray>();
600            let right = expected
601                .iter()
602                .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
603                .collect::<TimestampNanosecondArray>();
604            let batch_size = input.len();
605            let args = ScalarFunctionArgs {
606                args: vec![ColumnarValue::Array(Arc::new(input))],
607                number_rows: batch_size,
608                return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
609            };
610            let result = ToLocalTimeFunc::new().invoke_with_args(args).unwrap();
611            if let ColumnarValue::Array(result) = result {
612                assert_eq!(
613                    result.data_type(),
614                    &DataType::Timestamp(TimeUnit::Nanosecond, None)
615                );
616                let left = arrow::array::cast::as_primitive_array::<
617                    TimestampNanosecondType,
618                >(&result);
619                assert_eq!(left, &right);
620            } else {
621                panic!("unexpected column type");
622            }
623        });
624    }
625}