1use std::sync::Arc;
19
20use arrow::array::{
21 Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
22 StringArrayType, StringViewArray,
23};
24use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
25use arrow::datatypes::DataType;
26use chrono::format::{parse, Parsed, StrftimeItems};
27use chrono::LocalResult::Single;
28use chrono::{DateTime, TimeZone, Utc};
29
30use datafusion_common::cast::as_generic_string_array;
31use datafusion_common::{
32 exec_err, unwrap_or_internal_err, DataFusionError, Result, ScalarType, ScalarValue,
33};
34use datafusion_expr::ColumnarValue;
35
36const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
38
39pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
41 string_to_timestamp_nanos(s).map_err(|e| e.into())
42}
43
44pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<()> {
50 for (idx, a) in args.iter().skip(1).enumerate() {
51 match a.data_type() {
52 DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
53 }
55 _ => {
56 return exec_err!(
57 "{name} function unsupported data type at index {}: {}",
58 idx + 1,
59 a.data_type()
60 );
61 }
62 }
63 }
64
65 Ok(())
66}
67
68pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
81 timezone: &T,
82 s: &str,
83 format: &str,
84) -> Result<DateTime<T>, DataFusionError> {
85 let err = |err_ctx: &str| {
86 DataFusionError::Execution(format!(
87 "Error parsing timestamp from '{s}' using format '{format}': {err_ctx}"
88 ))
89 };
90
91 let mut parsed = Parsed::new();
92 parse(&mut parsed, s, StrftimeItems::new(format)).map_err(|e| err(&e.to_string()))?;
93
94 let dt = parsed.to_datetime();
96
97 if let Err(e) = &dt {
98 let ndt = parsed
100 .to_naive_datetime_with_offset(0)
101 .or_else(|_| parsed.to_naive_date().map(|nd| nd.into()));
102 if let Err(e) = &ndt {
103 return Err(err(&e.to_string()));
104 }
105
106 if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) {
107 Ok(e.to_owned())
108 } else {
109 Err(err(&e.to_string()))
110 }
111 } else {
112 Ok(dt.unwrap().with_timezone(timezone))
113 }
114}
115
116#[inline]
144pub(crate) fn string_to_timestamp_nanos_formatted(
145 s: &str,
146 format: &str,
147) -> Result<i64, DataFusionError> {
148 string_to_datetime_formatted(&Utc, s, format)?
149 .naive_utc()
150 .and_utc()
151 .timestamp_nanos_opt()
152 .ok_or_else(|| {
153 DataFusionError::Execution(ERR_NANOSECONDS_NOT_SUPPORTED.to_string())
154 })
155}
156
157#[inline]
175pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Result<i64> {
176 Ok(string_to_datetime_formatted(&Utc, s, format)?
177 .naive_utc()
178 .and_utc()
179 .timestamp_millis())
180}
181
182pub(crate) fn handle<O, F, S>(
183 args: &[ColumnarValue],
184 op: F,
185 name: &str,
186) -> Result<ColumnarValue>
187where
188 O: ArrowPrimitiveType,
189 S: ScalarType<O::Native>,
190 F: Fn(&str) -> Result<O::Native>,
191{
192 match &args[0] {
193 ColumnarValue::Array(a) => match a.data_type() {
194 DataType::Utf8View => Ok(ColumnarValue::Array(Arc::new(
195 unary_string_to_primitive_function::<&StringViewArray, O, _>(
196 a.as_ref().as_string_view(),
197 op,
198 )?,
199 ))),
200 DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
201 unary_string_to_primitive_function::<&GenericStringArray<i64>, O, _>(
202 a.as_ref().as_string::<i64>(),
203 op,
204 )?,
205 ))),
206 DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
207 unary_string_to_primitive_function::<&GenericStringArray<i32>, O, _>(
208 a.as_ref().as_string::<i32>(),
209 op,
210 )?,
211 ))),
212 other => exec_err!("Unsupported data type {other:?} for function {name}"),
213 },
214 ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
215 Some(a) => {
216 let result = a.as_ref().map(|x| op(x)).transpose()?;
217 Ok(ColumnarValue::Scalar(S::scalar(result)))
218 }
219 _ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
220 },
221 }
222}
223
224pub(crate) fn handle_multiple<O, F, S, M>(
228 args: &[ColumnarValue],
229 op: F,
230 op2: M,
231 name: &str,
232) -> Result<ColumnarValue>
233where
234 O: ArrowPrimitiveType,
235 S: ScalarType<O::Native>,
236 F: Fn(&str, &str) -> Result<O::Native>,
237 M: Fn(O::Native) -> O::Native,
238{
239 match &args[0] {
240 ColumnarValue::Array(a) => match a.data_type() {
241 DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
242 for (pos, arg) in args.iter().enumerate() {
244 match arg {
245 ColumnarValue::Array(arg) => match arg.data_type() {
246 DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
247 }
249 other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
250 },
251 ColumnarValue::Scalar(arg) => {
252 match arg.data_type() {
253 DataType::Utf8View| DataType::LargeUtf8 | DataType::Utf8 => {
254 }
256 other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
257 }
258 }
259 }
260 }
261
262 Ok(ColumnarValue::Array(Arc::new(
263 strings_to_primitive_function::<O, _, _>(args, op, op2, name)?,
264 )))
265 }
266 other => {
267 exec_err!("Unsupported data type {other:?} for function {name}")
268 }
269 },
270 ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
272 Some(a) => {
273 let a = a.as_ref();
274 let a = unwrap_or_internal_err!(a);
276
277 let mut ret = None;
278
279 for (pos, v) in args.iter().enumerate().skip(1) {
280 let ColumnarValue::Scalar(
281 ScalarValue::Utf8View(x)
282 | ScalarValue::LargeUtf8(x)
283 | ScalarValue::Utf8(x),
284 ) = v
285 else {
286 return exec_err!("Unsupported data type {v:?} for function {name}, arg # {pos}");
287 };
288
289 if let Some(s) = x {
290 match op(a, s.as_str()) {
291 Ok(r) => {
292 ret = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(
293 op2(r),
294 )))));
295 break;
296 }
297 Err(e) => ret = Some(Err(e)),
298 }
299 }
300 }
301
302 unwrap_or_internal_err!(ret)
303 }
304 other => {
305 exec_err!("Unsupported data type {other:?} for function {name}")
306 }
307 },
308 }
309}
310
311pub(crate) fn strings_to_primitive_function<O, F, F2>(
322 args: &[ColumnarValue],
323 op: F,
324 op2: F2,
325 name: &str,
326) -> Result<PrimitiveArray<O>>
327where
328 O: ArrowPrimitiveType,
329 F: Fn(&str, &str) -> Result<O::Native>,
330 F2: Fn(O::Native) -> O::Native,
331{
332 if args.len() < 2 {
333 return exec_err!(
334 "{:?} args were supplied but {} takes 2 or more arguments",
335 args.len(),
336 name
337 );
338 }
339
340 match &args[0] {
341 ColumnarValue::Array(a) => match a.data_type() {
342 DataType::Utf8View => {
343 let string_array = a.as_string_view();
344 handle_array_op::<O, &StringViewArray, F, F2>(
345 &string_array,
346 &args[1..],
347 op,
348 op2,
349 )
350 }
351 DataType::LargeUtf8 => {
352 let string_array = as_generic_string_array::<i64>(&a)?;
353 handle_array_op::<O, &GenericStringArray<i64>, F, F2>(
354 &string_array,
355 &args[1..],
356 op,
357 op2,
358 )
359 }
360 DataType::Utf8 => {
361 let string_array = as_generic_string_array::<i32>(&a)?;
362 handle_array_op::<O, &GenericStringArray<i32>, F, F2>(
363 &string_array,
364 &args[1..],
365 op,
366 op2,
367 )
368 }
369 other => exec_err!(
370 "Unsupported data type {other:?} for function substr,\
371 expected Utf8View, Utf8 or LargeUtf8."
372 ),
373 },
374 other => exec_err!(
375 "Received {} data type, expected only array",
376 other.data_type()
377 ),
378 }
379}
380
381fn handle_array_op<'a, O, V, F, F2>(
382 first: &V,
383 args: &[ColumnarValue],
384 op: F,
385 op2: F2,
386) -> Result<PrimitiveArray<O>>
387where
388 V: StringArrayType<'a>,
389 O: ArrowPrimitiveType,
390 F: Fn(&str, &str) -> Result<O::Native>,
391 F2: Fn(O::Native) -> O::Native,
392{
393 first
394 .iter()
395 .enumerate()
396 .map(|(pos, x)| {
397 let mut val = None;
398 if let Some(x) = x {
399 for arg in args {
400 let v = match arg {
401 ColumnarValue::Array(a) => match a.data_type() {
402 DataType::Utf8View => Ok(a.as_string_view().value(pos)),
403 DataType::LargeUtf8 => Ok(a.as_string::<i64>().value(pos)),
404 DataType::Utf8 => Ok(a.as_string::<i32>().value(pos)),
405 other => exec_err!("Unexpected type encountered '{other}'"),
406 },
407 ColumnarValue::Scalar(s) => match s.try_as_str() {
408 Some(Some(v)) => Ok(v),
409 Some(None) => continue, None => exec_err!("Unexpected scalar type encountered '{s}'"),
411 },
412 }?;
413
414 let r = op(x, v);
415 if r.is_ok() {
416 val = Some(Ok(op2(r.unwrap())));
417 break;
418 } else {
419 val = Some(r);
420 }
421 }
422 };
423
424 val.transpose()
425 })
426 .collect()
427}
428
429fn unary_string_to_primitive_function<'a, StringArrType, O, F>(
437 array: StringArrType,
438 op: F,
439) -> Result<PrimitiveArray<O>>
440where
441 StringArrType: StringArrayType<'a>,
442 O: ArrowPrimitiveType,
443 F: Fn(&'a str) -> Result<O::Native>,
444{
445 array.iter().map(|x| x.map(&op).transpose()).collect()
447}