datafusion_functions/datetime/
common.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::{
21    Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
22    StringArrayType, StringViewArray,
23};
24use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
25use arrow::datatypes::DataType;
26use chrono::format::{parse, Parsed, StrftimeItems};
27use chrono::LocalResult::Single;
28use chrono::{DateTime, TimeZone, Utc};
29
30use datafusion_common::cast::as_generic_string_array;
31use datafusion_common::{
32    exec_err, unwrap_or_internal_err, DataFusionError, Result, ScalarType, ScalarValue,
33};
34use datafusion_expr::ColumnarValue;
35
36/// Error message if nanosecond conversion request beyond supported interval
37const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
38
39/// Calls string_to_timestamp_nanos and converts the error type
40pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
41    string_to_timestamp_nanos(s).map_err(|e| e.into())
42}
43
44/// Checks that all the arguments from the second are of type [Utf8], [LargeUtf8] or [Utf8View]
45///
46/// [Utf8]: DataType::Utf8
47/// [LargeUtf8]: DataType::LargeUtf8
48/// [Utf8View]: DataType::Utf8View
49pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<()> {
50    for (idx, a) in args.iter().skip(1).enumerate() {
51        match a.data_type() {
52            DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
53                // all good
54            }
55            _ => {
56                return exec_err!(
57                    "{name} function unsupported data type at index {}: {}",
58                    idx + 1,
59                    a.data_type()
60                );
61            }
62        }
63    }
64
65    Ok(())
66}
67
68/// Accepts a string and parses it using the [`chrono::format::strftime`] specifiers
69/// relative to the provided `timezone`
70///
71/// [IANA timezones] are only supported if the `arrow-array/chrono-tz` feature is enabled
72///
73/// * `2023-01-01 040506 America/Los_Angeles`
74///
75/// If a timestamp is ambiguous, for example as a result of daylight-savings time, an error
76/// will be returned
77///
78/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
79/// [IANA timezones]: https://www.iana.org/time-zones
80pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
81    timezone: &T,
82    s: &str,
83    format: &str,
84) -> Result<DateTime<T>, DataFusionError> {
85    let err = |err_ctx: &str| {
86        DataFusionError::Execution(format!(
87            "Error parsing timestamp from '{s}' using format '{format}': {err_ctx}"
88        ))
89    };
90
91    let mut parsed = Parsed::new();
92    parse(&mut parsed, s, StrftimeItems::new(format)).map_err(|e| err(&e.to_string()))?;
93
94    // attempt to parse the string assuming it has a timezone
95    let dt = parsed.to_datetime();
96
97    if let Err(e) = &dt {
98        // no timezone or other failure, try without a timezone
99        let ndt = parsed
100            .to_naive_datetime_with_offset(0)
101            .or_else(|_| parsed.to_naive_date().map(|nd| nd.into()));
102        if let Err(e) = &ndt {
103            return Err(err(&e.to_string()));
104        }
105
106        if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) {
107            Ok(e.to_owned())
108        } else {
109            Err(err(&e.to_string()))
110        }
111    } else {
112        Ok(dt.unwrap().with_timezone(timezone))
113    }
114}
115
116/// Accepts a string with a `chrono` format and converts it to a
117/// nanosecond precision timestamp.
118///
119/// See [`chrono::format::strftime`] for the full set of supported formats.
120///
121/// Implements the `to_timestamp` function to convert a string to a
122/// timestamp, following the model of spark SQL’s to_`timestamp`.
123///
124/// Internally, this function uses the `chrono` library for the
125/// datetime parsing
126///
127/// ## Timestamp Precision
128///
129/// Function uses the maximum precision timestamps supported by
130/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This
131/// means the range of dates that timestamps can represent is ~1677 AD
132/// to 2262 AM
133///
134/// ## Timezone / Offset Handling
135///
136/// Numerical values of timestamps are stored compared to offset UTC.
137///
138/// Any timestamp in the formatting string is handled according to the rules
139/// defined by `chrono`.
140///
141/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
142///
143#[inline]
144pub(crate) fn string_to_timestamp_nanos_formatted(
145    s: &str,
146    format: &str,
147) -> Result<i64, DataFusionError> {
148    string_to_datetime_formatted(&Utc, s, format)?
149        .naive_utc()
150        .and_utc()
151        .timestamp_nanos_opt()
152        .ok_or_else(|| {
153            DataFusionError::Execution(ERR_NANOSECONDS_NOT_SUPPORTED.to_string())
154        })
155}
156
157/// Accepts a string with a `chrono` format and converts it to a
158/// millisecond precision timestamp.
159///
160/// See [`chrono::format::strftime`] for the full set of supported formats.
161///
162/// Internally, this function uses the `chrono` library for the
163/// datetime parsing
164///
165/// ## Timezone / Offset Handling
166///
167/// Numerical values of timestamps are stored compared to offset UTC.
168///
169/// Any timestamp in the formatting string is handled according to the rules
170/// defined by `chrono`.
171///
172/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
173///
174#[inline]
175pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Result<i64> {
176    Ok(string_to_datetime_formatted(&Utc, s, format)?
177        .naive_utc()
178        .and_utc()
179        .timestamp_millis())
180}
181
182pub(crate) fn handle<O, F, S>(
183    args: &[ColumnarValue],
184    op: F,
185    name: &str,
186) -> Result<ColumnarValue>
187where
188    O: ArrowPrimitiveType,
189    S: ScalarType<O::Native>,
190    F: Fn(&str) -> Result<O::Native>,
191{
192    match &args[0] {
193        ColumnarValue::Array(a) => match a.data_type() {
194            DataType::Utf8View => Ok(ColumnarValue::Array(Arc::new(
195                unary_string_to_primitive_function::<&StringViewArray, O, _>(
196                    a.as_ref().as_string_view(),
197                    op,
198                )?,
199            ))),
200            DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
201                unary_string_to_primitive_function::<&GenericStringArray<i64>, O, _>(
202                    a.as_ref().as_string::<i64>(),
203                    op,
204                )?,
205            ))),
206            DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
207                unary_string_to_primitive_function::<&GenericStringArray<i32>, O, _>(
208                    a.as_ref().as_string::<i32>(),
209                    op,
210                )?,
211            ))),
212            other => exec_err!("Unsupported data type {other:?} for function {name}"),
213        },
214        ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
215            Some(a) => {
216                let result = a.as_ref().map(|x| op(x)).transpose()?;
217                Ok(ColumnarValue::Scalar(S::scalar(result)))
218            }
219            _ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
220        },
221    }
222}
223
224// Given a function that maps a `&str`, `&str` to an arrow native type,
225// returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
226// depending on the `args`'s variant.
227pub(crate) fn handle_multiple<O, F, S, M>(
228    args: &[ColumnarValue],
229    op: F,
230    op2: M,
231    name: &str,
232) -> Result<ColumnarValue>
233where
234    O: ArrowPrimitiveType,
235    S: ScalarType<O::Native>,
236    F: Fn(&str, &str) -> Result<O::Native>,
237    M: Fn(O::Native) -> O::Native,
238{
239    match &args[0] {
240        ColumnarValue::Array(a) => match a.data_type() {
241            DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
242                // validate the column types
243                for (pos, arg) in args.iter().enumerate() {
244                    match arg {
245                        ColumnarValue::Array(arg) => match arg.data_type() {
246                            DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
247                                // all good
248                            }
249                            other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
250                        },
251                        ColumnarValue::Scalar(arg) => {
252                            match arg.data_type() {
253                                DataType::Utf8View| DataType::LargeUtf8 | DataType::Utf8 => {
254                                    // all good
255                                }
256                                other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
257                            }
258                        }
259                    }
260                }
261
262                Ok(ColumnarValue::Array(Arc::new(
263                    strings_to_primitive_function::<O, _, _>(args, op, op2, name)?,
264                )))
265            }
266            other => {
267                exec_err!("Unsupported data type {other:?} for function {name}")
268            }
269        },
270        // if the first argument is a scalar utf8 all arguments are expected to be scalar utf8
271        ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
272            Some(a) => {
273                let a = a.as_ref();
274                // ASK: Why do we trust `a` to be non-null at this point?
275                let a = unwrap_or_internal_err!(a);
276
277                let mut ret = None;
278
279                for (pos, v) in args.iter().enumerate().skip(1) {
280                    let ColumnarValue::Scalar(
281                        ScalarValue::Utf8View(x)
282                        | ScalarValue::LargeUtf8(x)
283                        | ScalarValue::Utf8(x),
284                    ) = v
285                    else {
286                        return exec_err!("Unsupported data type {v:?} for function {name}, arg # {pos}");
287                    };
288
289                    if let Some(s) = x {
290                        match op(a, s.as_str()) {
291                            Ok(r) => {
292                                ret = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(
293                                    op2(r),
294                                )))));
295                                break;
296                            }
297                            Err(e) => ret = Some(Err(e)),
298                        }
299                    }
300                }
301
302                unwrap_or_internal_err!(ret)
303            }
304            other => {
305                exec_err!("Unsupported data type {other:?} for function {name}")
306            }
307        },
308    }
309}
310
311/// given a function `op` that maps `&str`, `&str` to the first successful Result
312/// of an arrow native type, returns a `PrimitiveArray` after the application of the
313/// function to `args` and the subsequence application of the `op2` function to any
314/// successful result. This function calls the `op` function with the first and second
315/// argument and if not successful continues with first and third, first and fourth,
316/// etc until the result was successful or no more arguments are present.
317/// # Errors
318/// This function errors iff:
319/// * the number of arguments is not > 1 or
320/// * the function `op` errors for all input
321pub(crate) fn strings_to_primitive_function<O, F, F2>(
322    args: &[ColumnarValue],
323    op: F,
324    op2: F2,
325    name: &str,
326) -> Result<PrimitiveArray<O>>
327where
328    O: ArrowPrimitiveType,
329    F: Fn(&str, &str) -> Result<O::Native>,
330    F2: Fn(O::Native) -> O::Native,
331{
332    if args.len() < 2 {
333        return exec_err!(
334            "{:?} args were supplied but {} takes 2 or more arguments",
335            args.len(),
336            name
337        );
338    }
339
340    match &args[0] {
341        ColumnarValue::Array(a) => match a.data_type() {
342            DataType::Utf8View => {
343                let string_array = a.as_string_view();
344                handle_array_op::<O, &StringViewArray, F, F2>(
345                    &string_array,
346                    &args[1..],
347                    op,
348                    op2,
349                )
350            }
351            DataType::LargeUtf8 => {
352                let string_array = as_generic_string_array::<i64>(&a)?;
353                handle_array_op::<O, &GenericStringArray<i64>, F, F2>(
354                    &string_array,
355                    &args[1..],
356                    op,
357                    op2,
358                )
359            }
360            DataType::Utf8 => {
361                let string_array = as_generic_string_array::<i32>(&a)?;
362                handle_array_op::<O, &GenericStringArray<i32>, F, F2>(
363                    &string_array,
364                    &args[1..],
365                    op,
366                    op2,
367                )
368            }
369            other => exec_err!(
370                "Unsupported data type {other:?} for function substr,\
371                    expected Utf8View, Utf8 or LargeUtf8."
372            ),
373        },
374        other => exec_err!(
375            "Received {} data type, expected only array",
376            other.data_type()
377        ),
378    }
379}
380
381fn handle_array_op<'a, O, V, F, F2>(
382    first: &V,
383    args: &[ColumnarValue],
384    op: F,
385    op2: F2,
386) -> Result<PrimitiveArray<O>>
387where
388    V: StringArrayType<'a>,
389    O: ArrowPrimitiveType,
390    F: Fn(&str, &str) -> Result<O::Native>,
391    F2: Fn(O::Native) -> O::Native,
392{
393    first
394        .iter()
395        .enumerate()
396        .map(|(pos, x)| {
397            let mut val = None;
398            if let Some(x) = x {
399                for arg in args {
400                    let v = match arg {
401                        ColumnarValue::Array(a) => match a.data_type() {
402                            DataType::Utf8View => Ok(a.as_string_view().value(pos)),
403                            DataType::LargeUtf8 => Ok(a.as_string::<i64>().value(pos)),
404                            DataType::Utf8 => Ok(a.as_string::<i32>().value(pos)),
405                            other => exec_err!("Unexpected type encountered '{other}'"),
406                        },
407                        ColumnarValue::Scalar(s) => match s.try_as_str() {
408                            Some(Some(v)) => Ok(v),
409                            Some(None) => continue, // null string
410                            None => exec_err!("Unexpected scalar type encountered '{s}'"),
411                        },
412                    }?;
413
414                    let r = op(x, v);
415                    if r.is_ok() {
416                        val = Some(Ok(op2(r.unwrap())));
417                        break;
418                    } else {
419                        val = Some(r);
420                    }
421                }
422            };
423
424            val.transpose()
425        })
426        .collect()
427}
428
429/// given a function `op` that maps a `&str` to a Result of an arrow native type,
430/// returns a `PrimitiveArray` after the application
431/// of the function to `args[0]`.
432/// # Errors
433/// This function errors iff:
434/// * the number of arguments is not 1 or
435/// * the function `op` errors
436fn unary_string_to_primitive_function<'a, StringArrType, O, F>(
437    array: StringArrType,
438    op: F,
439) -> Result<PrimitiveArray<O>>
440where
441    StringArrType: StringArrayType<'a>,
442    O: ArrowPrimitiveType,
443    F: Fn(&'a str) -> Result<O::Native>,
444{
445    // first map is the iterator, second is for the `Option<_>`
446    array.iter().map(|x| x.map(&op).transpose()).collect()
447}