datafusion_functions/core/
coalesce.rs1use arrow::array::{new_null_array, BooleanArray};
19use arrow::compute::kernels::zip::zip;
20use arrow::compute::{and, is_not_null, is_null};
21use arrow::datatypes::DataType;
22use datafusion_common::{exec_err, internal_err, Result};
23use datafusion_expr::binary::try_type_union_resolution;
24use datafusion_expr::{
25 ColumnarValue, Documentation, ReturnInfo, ReturnTypeArgs, ScalarFunctionArgs,
26};
27use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
28use datafusion_macros::user_doc;
29use itertools::Itertools;
30use std::any::Any;
31
32#[user_doc(
33 doc_section(label = "Conditional Functions"),
34 description = "Returns the first of its arguments that is not _null_. Returns _null_ if all arguments are _null_. This function is often used to substitute a default value for _null_ values.",
35 syntax_example = "coalesce(expression1[, ..., expression_n])",
36 sql_example = r#"```sql
37> select coalesce(null, null, 'datafusion');
38+----------------------------------------+
39| coalesce(NULL,NULL,Utf8("datafusion")) |
40+----------------------------------------+
41| datafusion |
42+----------------------------------------+
43```"#,
44 argument(
45 name = "expression1, expression_n",
46 description = "Expression to use if previous expressions are _null_. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary."
47 )
48)]
49#[derive(Debug)]
50pub struct CoalesceFunc {
51 signature: Signature,
52}
53
54impl Default for CoalesceFunc {
55 fn default() -> Self {
56 CoalesceFunc::new()
57 }
58}
59
60impl CoalesceFunc {
61 pub fn new() -> Self {
62 Self {
63 signature: Signature::user_defined(Volatility::Immutable),
64 }
65 }
66}
67
68impl ScalarUDFImpl for CoalesceFunc {
69 fn as_any(&self) -> &dyn Any {
70 self
71 }
72
73 fn name(&self) -> &str {
74 "coalesce"
75 }
76
77 fn signature(&self) -> &Signature {
78 &self.signature
79 }
80
81 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
82 internal_err!("return_type_from_args should be called instead")
83 }
84
85 fn return_type_from_args(&self, args: ReturnTypeArgs) -> Result<ReturnInfo> {
86 let nullable = args.nullables.iter().all(|&nullable| nullable);
88 let return_type = args
89 .arg_types
90 .iter()
91 .find_or_first(|d| !d.is_null())
92 .unwrap()
93 .clone();
94 Ok(ReturnInfo::new(return_type, nullable))
95 }
96
97 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
99 let args = args.args;
100 if args.is_empty() {
102 return exec_err!(
103 "coalesce was called with {} arguments. It requires at least 1.",
104 args.len()
105 );
106 }
107
108 let return_type = args[0].data_type();
109 let mut return_array = args.iter().filter_map(|x| match x {
110 ColumnarValue::Array(array) => Some(array.len()),
111 _ => None,
112 });
113
114 if let Some(size) = return_array.next() {
115 let mut current_value = new_null_array(&return_type, size);
117 let mut remainder = BooleanArray::from(vec![true; size]);
118
119 for arg in args {
120 match arg {
121 ColumnarValue::Array(ref array) => {
122 let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
123 current_value = zip(&to_apply, array, ¤t_value)?;
124 remainder = and(&remainder, &is_null(array)?)?;
125 }
126 ColumnarValue::Scalar(value) => {
127 if value.is_null() {
128 continue;
129 } else {
130 let last_value = value.to_scalar()?;
131 current_value = zip(&remainder, &last_value, ¤t_value)?;
132 break;
133 }
134 }
135 }
136 if remainder.iter().all(|x| x == Some(false)) {
137 break;
138 }
139 }
140 Ok(ColumnarValue::Array(current_value))
141 } else {
142 let result = args
143 .iter()
144 .filter_map(|x| match x {
145 ColumnarValue::Scalar(s) if !s.is_null() => Some(x.clone()),
146 _ => None,
147 })
148 .next()
149 .unwrap_or_else(|| args[0].clone());
150 Ok(result)
151 }
152 }
153
154 fn short_circuits(&self) -> bool {
155 true
156 }
157
158 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
159 if arg_types.is_empty() {
160 return exec_err!("coalesce must have at least one argument");
161 }
162
163 try_type_union_resolution(arg_types)
164 }
165
166 fn documentation(&self) -> Option<&Documentation> {
167 self.doc()
168 }
169}