1use std::any::Any;
19use std::ops::Add;
20use std::sync::Arc;
21
22use arrow::array::timezone::Tz;
23use arrow::array::{Array, ArrayRef, PrimitiveBuilder};
24use arrow::datatypes::DataType::Timestamp;
25use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
26use arrow::datatypes::{
27 ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
28 TimestampNanosecondType, TimestampSecondType,
29};
30use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
31
32use datafusion_common::cast::as_primitive_array;
33use datafusion_common::{
34 exec_err, plan_err, utils::take_function_args, DataFusionError, Result, ScalarValue,
35};
36use datafusion_expr::{
37 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
38};
39use datafusion_macros::user_doc;
40
41#[user_doc(
45 doc_section(label = "Time and Date Functions"),
46 description = "Converts a timestamp with a timezone to a timestamp without a timezone (with no offset or timezone information). This function handles daylight saving time changes.",
47 syntax_example = "to_local_time(expression)",
48 sql_example = r#"```sql
49> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp);
50+---------------------------------------------+
51| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
52+---------------------------------------------+
53| 2024-04-01T00:00:20 |
54+---------------------------------------------+
55
56> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels');
57+---------------------------------------------+
58| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
59+---------------------------------------------+
60| 2024-04-01T00:00:20 |
61+---------------------------------------------+
62
63> SELECT
64 time,
65 arrow_typeof(time) as type,
66 to_local_time(time) as to_local_time,
67 arrow_typeof(to_local_time(time)) as to_local_time_type
68FROM (
69 SELECT '2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels' AS time
70);
71+---------------------------+------------------------------------------------+---------------------+-----------------------------+
72| time | type | to_local_time | to_local_time_type |
73+---------------------------+------------------------------------------------+---------------------+-----------------------------+
74| 2024-04-01T00:00:20+02:00 | Timestamp(Nanosecond, Some("Europe/Brussels")) | 2024-04-01T00:00:20 | Timestamp(Nanosecond, None) |
75+---------------------------+------------------------------------------------+---------------------+-----------------------------+
76
77# combine `to_local_time()` with `date_bin()` to bin on boundaries in the timezone rather
78# than UTC boundaries
79
80> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AS date_bin;
81+---------------------+
82| date_bin |
83+---------------------+
84| 2024-04-01T00:00:00 |
85+---------------------+
86
87> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AT TIME ZONE 'Europe/Brussels' AS date_bin_with_timezone;
88+---------------------------+
89| date_bin_with_timezone |
90+---------------------------+
91| 2024-04-01T00:00:00+02:00 |
92+---------------------------+
93```"#,
94 argument(
95 name = "expression",
96 description = "Time expression to operate on. Can be a constant, column, or function."
97 )
98)]
99#[derive(Debug)]
100pub struct ToLocalTimeFunc {
101 signature: Signature,
102}
103
104impl Default for ToLocalTimeFunc {
105 fn default() -> Self {
106 Self::new()
107 }
108}
109
110impl ToLocalTimeFunc {
111 pub fn new() -> Self {
112 Self {
113 signature: Signature::user_defined(Volatility::Immutable),
114 }
115 }
116
117 fn to_local_time(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
118 let [time_value] = take_function_args(self.name(), args)?;
119
120 let arg_type = time_value.data_type();
121 match arg_type {
122 Timestamp(_, None) => {
123 Ok(time_value.clone())
125 }
126 Timestamp(_, Some(timezone)) => {
133 let tz: Tz = timezone.parse()?;
134
135 match time_value {
136 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
137 Some(ts),
138 Some(_),
139 )) => {
140 let adjusted_ts =
141 adjust_to_local_time::<TimestampNanosecondType>(*ts, tz)?;
142 Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
143 Some(adjusted_ts),
144 None,
145 )))
146 }
147 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
148 Some(ts),
149 Some(_),
150 )) => {
151 let adjusted_ts =
152 adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz)?;
153 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
154 Some(adjusted_ts),
155 None,
156 )))
157 }
158 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
159 Some(ts),
160 Some(_),
161 )) => {
162 let adjusted_ts =
163 adjust_to_local_time::<TimestampMillisecondType>(*ts, tz)?;
164 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
165 Some(adjusted_ts),
166 None,
167 )))
168 }
169 ColumnarValue::Scalar(ScalarValue::TimestampSecond(
170 Some(ts),
171 Some(_),
172 )) => {
173 let adjusted_ts =
174 adjust_to_local_time::<TimestampSecondType>(*ts, tz)?;
175 Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
176 Some(adjusted_ts),
177 None,
178 )))
179 }
180 ColumnarValue::Array(array) => {
181 fn transform_array<T: ArrowTimestampType>(
182 array: &ArrayRef,
183 tz: Tz,
184 ) -> Result<ColumnarValue> {
185 let mut builder = PrimitiveBuilder::<T>::new();
186
187 let primitive_array = as_primitive_array::<T>(array)?;
188 for ts_opt in primitive_array.iter() {
189 match ts_opt {
190 None => builder.append_null(),
191 Some(ts) => {
192 let adjusted_ts: i64 =
193 adjust_to_local_time::<T>(ts, tz)?;
194 builder.append_value(adjusted_ts)
195 }
196 }
197 }
198
199 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
200 }
201
202 match array.data_type() {
203 Timestamp(_, None) => {
204 Ok(time_value.clone())
206 }
207 Timestamp(Nanosecond, Some(_)) => {
208 transform_array::<TimestampNanosecondType>(array, tz)
209 }
210 Timestamp(Microsecond, Some(_)) => {
211 transform_array::<TimestampMicrosecondType>(array, tz)
212 }
213 Timestamp(Millisecond, Some(_)) => {
214 transform_array::<TimestampMillisecondType>(array, tz)
215 }
216 Timestamp(Second, Some(_)) => {
217 transform_array::<TimestampSecondType>(array, tz)
218 }
219 _ => {
220 exec_err!("to_local_time function requires timestamp argument in array, got {:?}", array.data_type())
221 }
222 }
223 }
224 _ => {
225 exec_err!(
226 "to_local_time function requires timestamp argument, got {:?}",
227 time_value.data_type()
228 )
229 }
230 }
231 }
232 _ => {
233 exec_err!(
234 "to_local_time function requires timestamp argument, got {:?}",
235 arg_type
236 )
237 }
238 }
239 }
240}
241
242fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
296 fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
297 where
298 F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
299 {
300 match converter(ts) {
301 MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
302 "Ambiguous timestamp. Do you mean {:?} or {:?}",
303 earliest,
304 latest
305 ),
306 MappedLocalTime::None => exec_err!(
307 "The local time does not exist because there is a gap in the local time."
308 ),
309 MappedLocalTime::Single(date_time) => Ok(date_time),
310 }
311 }
312
313 let date_time = match T::UNIT {
314 Nanosecond => Utc.timestamp_nanos(ts),
315 Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
316 Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
317 Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
318 };
319
320 let offset_seconds: i64 = tz
321 .offset_from_utc_datetime(&date_time.naive_utc())
322 .fix()
323 .local_minus_utc() as i64;
324
325 let adjusted_date_time = date_time.add(
326 TimeDelta::try_seconds(offset_seconds)
329 .ok_or(DataFusionError::Internal("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000".to_string()))?,
330 );
331
332 match T::UNIT {
334 Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or(
335 DataFusionError::Internal(
336 "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807".to_string(),
337 ),
338 ),
339 Microsecond => Ok(adjusted_date_time.timestamp_micros()),
340 Millisecond => Ok(adjusted_date_time.timestamp_millis()),
341 Second => Ok(adjusted_date_time.timestamp()),
342 }
343}
344
345impl ScalarUDFImpl for ToLocalTimeFunc {
346 fn as_any(&self) -> &dyn Any {
347 self
348 }
349
350 fn name(&self) -> &str {
351 "to_local_time"
352 }
353
354 fn signature(&self) -> &Signature {
355 &self.signature
356 }
357
358 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
359 let [time_value] = take_function_args(self.name(), arg_types)?;
360
361 match time_value {
362 Timestamp(timeunit, _) => Ok(Timestamp(*timeunit, None)),
363 _ => exec_err!(
364 "The to_local_time function can only accept timestamp as the arg, got {:?}", time_value
365 )
366 }
367 }
368
369 fn invoke_with_args(
370 &self,
371 args: datafusion_expr::ScalarFunctionArgs,
372 ) -> Result<ColumnarValue> {
373 let [time_value] = take_function_args(self.name(), args.args)?;
374
375 self.to_local_time(&[time_value.clone()])
376 }
377
378 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
379 if arg_types.len() != 1 {
380 return plan_err!(
381 "to_local_time function requires 1 argument, got {:?}",
382 arg_types.len()
383 );
384 }
385
386 let first_arg = arg_types[0].clone();
387 match &first_arg {
388 Timestamp(Nanosecond, timezone) => {
389 Ok(vec![Timestamp(Nanosecond, timezone.clone())])
390 }
391 Timestamp(Microsecond, timezone) => {
392 Ok(vec![Timestamp(Microsecond, timezone.clone())])
393 }
394 Timestamp(Millisecond, timezone) => {
395 Ok(vec![Timestamp(Millisecond, timezone.clone())])
396 }
397 Timestamp(Second, timezone) => Ok(vec![Timestamp(Second, timezone.clone())]),
398 _ => plan_err!("The to_local_time function can only accept Timestamp as the arg got {first_arg}"),
399 }
400 }
401 fn documentation(&self) -> Option<&Documentation> {
402 self.doc()
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use std::sync::Arc;
409
410 use arrow::array::{types::TimestampNanosecondType, TimestampNanosecondArray};
411 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
412 use arrow::datatypes::{DataType, TimeUnit};
413 use chrono::NaiveDateTime;
414 use datafusion_common::ScalarValue;
415 use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
416
417 use super::{adjust_to_local_time, ToLocalTimeFunc};
418
419 #[test]
420 fn test_adjust_to_local_time() {
421 let timestamp_str = "2020-03-31T13:40:00";
422 let tz: arrow::array::timezone::Tz =
423 "America/New_York".parse().expect("Invalid timezone");
424
425 let timestamp = timestamp_str
426 .parse::<NaiveDateTime>()
427 .unwrap()
428 .and_local_timezone(tz) .unwrap()
430 .timestamp_nanos_opt()
431 .unwrap();
432
433 let expected_timestamp = timestamp_str
434 .parse::<NaiveDateTime>()
435 .unwrap()
436 .and_utc() .timestamp_nanos_opt()
438 .unwrap();
439
440 let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz).unwrap();
441 assert_eq!(res, expected_timestamp);
442 }
443
444 #[test]
445 fn test_to_local_time_scalar() {
446 let timezone = Some("Europe/Brussels".into());
447 let timestamps_with_timezone = vec![
448 (
449 ScalarValue::TimestampNanosecond(
450 Some(1_123_123_000_000_000_000),
451 timezone.clone(),
452 ),
453 ScalarValue::TimestampNanosecond(Some(1_123_130_200_000_000_000), None),
454 ),
455 (
456 ScalarValue::TimestampMicrosecond(
457 Some(1_123_123_000_000_000),
458 timezone.clone(),
459 ),
460 ScalarValue::TimestampMicrosecond(Some(1_123_130_200_000_000), None),
461 ),
462 (
463 ScalarValue::TimestampMillisecond(
464 Some(1_123_123_000_000),
465 timezone.clone(),
466 ),
467 ScalarValue::TimestampMillisecond(Some(1_123_130_200_000), None),
468 ),
469 (
470 ScalarValue::TimestampSecond(Some(1_123_123_000), timezone),
471 ScalarValue::TimestampSecond(Some(1_123_130_200), None),
472 ),
473 ];
474
475 for (input, expected) in timestamps_with_timezone {
476 test_to_local_time_helper(input, expected);
477 }
478 }
479
480 #[test]
481 fn test_timezone_with_daylight_savings() {
482 let timezone_str = "America/New_York";
483 let tz: arrow::array::timezone::Tz =
484 timezone_str.parse().expect("Invalid timezone");
485
486 let test_cases = vec![
493 (
494 "2020-03-31T13:40:00",
496 1_585_676_400_000_000_000,
497 1_585_662_000_000_000_000,
498 ),
499 (
500 "2020-11-04T14:06:40",
502 1_604_516_800_000_000_000,
503 1_604_498_800_000_000_000,
504 ),
505 ];
506
507 for (
508 input_timestamp_str,
509 expected_input_timestamp,
510 expected_adjusted_timestamp,
511 ) in test_cases
512 {
513 let input_timestamp = input_timestamp_str
514 .parse::<NaiveDateTime>()
515 .unwrap()
516 .and_local_timezone(tz) .unwrap()
518 .timestamp_nanos_opt()
519 .unwrap();
520 assert_eq!(input_timestamp, expected_input_timestamp);
521
522 let expected_timestamp = input_timestamp_str
523 .parse::<NaiveDateTime>()
524 .unwrap()
525 .and_utc() .timestamp_nanos_opt()
527 .unwrap();
528 assert_eq!(expected_timestamp, expected_adjusted_timestamp);
529
530 let input = ScalarValue::TimestampNanosecond(
531 Some(input_timestamp),
532 Some(timezone_str.into()),
533 );
534 let expected =
535 ScalarValue::TimestampNanosecond(Some(expected_timestamp), None);
536 test_to_local_time_helper(input, expected)
537 }
538 }
539
540 fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
541 let res = ToLocalTimeFunc::new()
542 .invoke_with_args(ScalarFunctionArgs {
543 args: vec![ColumnarValue::Scalar(input)],
544 number_rows: 1,
545 return_type: &expected.data_type(),
546 })
547 .unwrap();
548 match res {
549 ColumnarValue::Scalar(res) => {
550 assert_eq!(res, expected);
551 }
552 _ => panic!("unexpected return type"),
553 }
554 }
555
556 #[test]
557 fn test_to_local_time_timezones_array() {
558 let cases = [
559 (
560 vec![
561 "2020-09-08T00:00:00",
562 "2020-09-08T01:00:00",
563 "2020-09-08T02:00:00",
564 "2020-09-08T03:00:00",
565 "2020-09-08T04:00:00",
566 ],
567 None::<Arc<str>>,
568 vec![
569 "2020-09-08T00:00:00",
570 "2020-09-08T01:00:00",
571 "2020-09-08T02:00:00",
572 "2020-09-08T03:00:00",
573 "2020-09-08T04:00:00",
574 ],
575 ),
576 (
577 vec![
578 "2020-09-08T00:00:00",
579 "2020-09-08T01:00:00",
580 "2020-09-08T02:00:00",
581 "2020-09-08T03:00:00",
582 "2020-09-08T04:00:00",
583 ],
584 Some("+01:00".into()),
585 vec![
586 "2020-09-08T00:00:00",
587 "2020-09-08T01:00:00",
588 "2020-09-08T02:00:00",
589 "2020-09-08T03:00:00",
590 "2020-09-08T04:00:00",
591 ],
592 ),
593 ];
594
595 cases.iter().for_each(|(source, _tz_opt, expected)| {
596 let input = source
597 .iter()
598 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
599 .collect::<TimestampNanosecondArray>();
600 let right = expected
601 .iter()
602 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
603 .collect::<TimestampNanosecondArray>();
604 let batch_size = input.len();
605 let args = ScalarFunctionArgs {
606 args: vec![ColumnarValue::Array(Arc::new(input))],
607 number_rows: batch_size,
608 return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
609 };
610 let result = ToLocalTimeFunc::new().invoke_with_args(args).unwrap();
611 if let ColumnarValue::Array(result) = result {
612 assert_eq!(
613 result.data_type(),
614 &DataType::Timestamp(TimeUnit::Nanosecond, None)
615 );
616 let left = arrow::array::cast::as_primitive_array::<
617 TimestampNanosecondType,
618 >(&result);
619 assert_eq!(left, &right);
620 } else {
621 panic!("unexpected column type");
622 }
623 });
624 }
625}