datafusion_functions/core/
coalesce.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{new_null_array, BooleanArray};
19use arrow::compute::kernels::zip::zip;
20use arrow::compute::{and, is_not_null, is_null};
21use arrow::datatypes::DataType;
22use datafusion_common::{exec_err, internal_err, Result};
23use datafusion_expr::binary::try_type_union_resolution;
24use datafusion_expr::{
25    ColumnarValue, Documentation, ReturnInfo, ReturnTypeArgs, ScalarFunctionArgs,
26};
27use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
28use datafusion_macros::user_doc;
29use itertools::Itertools;
30use std::any::Any;
31
32#[user_doc(
33    doc_section(label = "Conditional Functions"),
34    description = "Returns the first of its arguments that is not _null_. Returns _null_ if all arguments are _null_. This function is often used to substitute a default value for _null_ values.",
35    syntax_example = "coalesce(expression1[, ..., expression_n])",
36    sql_example = r#"```sql
37> select coalesce(null, null, 'datafusion');
38+----------------------------------------+
39| coalesce(NULL,NULL,Utf8("datafusion")) |
40+----------------------------------------+
41| datafusion                             |
42+----------------------------------------+
43```"#,
44    argument(
45        name = "expression1, expression_n",
46        description = "Expression to use if previous expressions are _null_. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary."
47    )
48)]
49#[derive(Debug)]
50pub struct CoalesceFunc {
51    signature: Signature,
52}
53
54impl Default for CoalesceFunc {
55    fn default() -> Self {
56        CoalesceFunc::new()
57    }
58}
59
60impl CoalesceFunc {
61    pub fn new() -> Self {
62        Self {
63            signature: Signature::user_defined(Volatility::Immutable),
64        }
65    }
66}
67
68impl ScalarUDFImpl for CoalesceFunc {
69    fn as_any(&self) -> &dyn Any {
70        self
71    }
72
73    fn name(&self) -> &str {
74        "coalesce"
75    }
76
77    fn signature(&self) -> &Signature {
78        &self.signature
79    }
80
81    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
82        internal_err!("return_type_from_args should be called instead")
83    }
84
85    fn return_type_from_args(&self, args: ReturnTypeArgs) -> Result<ReturnInfo> {
86        // If any the arguments in coalesce is non-null, the result is non-null
87        let nullable = args.nullables.iter().all(|&nullable| nullable);
88        let return_type = args
89            .arg_types
90            .iter()
91            .find_or_first(|d| !d.is_null())
92            .unwrap()
93            .clone();
94        Ok(ReturnInfo::new(return_type, nullable))
95    }
96
97    /// coalesce evaluates to the first value which is not NULL
98    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
99        let args = args.args;
100        // do not accept 0 arguments.
101        if args.is_empty() {
102            return exec_err!(
103                "coalesce was called with {} arguments. It requires at least 1.",
104                args.len()
105            );
106        }
107
108        let return_type = args[0].data_type();
109        let mut return_array = args.iter().filter_map(|x| match x {
110            ColumnarValue::Array(array) => Some(array.len()),
111            _ => None,
112        });
113
114        if let Some(size) = return_array.next() {
115            // start with nulls as default output
116            let mut current_value = new_null_array(&return_type, size);
117            let mut remainder = BooleanArray::from(vec![true; size]);
118
119            for arg in args {
120                match arg {
121                    ColumnarValue::Array(ref array) => {
122                        let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
123                        current_value = zip(&to_apply, array, &current_value)?;
124                        remainder = and(&remainder, &is_null(array)?)?;
125                    }
126                    ColumnarValue::Scalar(value) => {
127                        if value.is_null() {
128                            continue;
129                        } else {
130                            let last_value = value.to_scalar()?;
131                            current_value = zip(&remainder, &last_value, &current_value)?;
132                            break;
133                        }
134                    }
135                }
136                if remainder.iter().all(|x| x == Some(false)) {
137                    break;
138                }
139            }
140            Ok(ColumnarValue::Array(current_value))
141        } else {
142            let result = args
143                .iter()
144                .filter_map(|x| match x {
145                    ColumnarValue::Scalar(s) if !s.is_null() => Some(x.clone()),
146                    _ => None,
147                })
148                .next()
149                .unwrap_or_else(|| args[0].clone());
150            Ok(result)
151        }
152    }
153
154    fn short_circuits(&self) -> bool {
155        true
156    }
157
158    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
159        if arg_types.is_empty() {
160            return exec_err!("coalesce must have at least one argument");
161        }
162
163        try_type_union_resolution(arg_types)
164    }
165
166    fn documentation(&self) -> Option<&Documentation> {
167        self.doc()
168    }
169}