datafusion_expr/registry.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionRegistry trait
19
20use crate::expr_rewriter::FunctionRewrite;
21use crate::planner::ExprPlanner;
22use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
23use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result};
24use std::collections::HashSet;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28/// A registry knows how to build logical expressions out of user-defined function' names
29pub trait FunctionRegistry {
30 /// Set of all available udfs.
31 fn udfs(&self) -> HashSet<String>;
32
33 /// Returns a reference to the user defined scalar function (udf) named
34 /// `name`.
35 fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>>;
36
37 /// Returns a reference to the user defined aggregate function (udaf) named
38 /// `name`.
39 fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>>;
40
41 /// Returns a reference to the user defined window function (udwf) named
42 /// `name`.
43 fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>>;
44
45 /// Registers a new [`ScalarUDF`], returning any previously registered
46 /// implementation.
47 ///
48 /// Returns an error (the default) if the function can not be registered,
49 /// for example if the registry is read only.
50 fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
51 not_impl_err!("Registering ScalarUDF")
52 }
53 /// Registers a new [`AggregateUDF`], returning any previously registered
54 /// implementation.
55 ///
56 /// Returns an error (the default) if the function can not be registered,
57 /// for example if the registry is read only.
58 fn register_udaf(
59 &mut self,
60 _udaf: Arc<AggregateUDF>,
61 ) -> Result<Option<Arc<AggregateUDF>>> {
62 not_impl_err!("Registering AggregateUDF")
63 }
64 /// Registers a new [`WindowUDF`], returning any previously registered
65 /// implementation.
66 ///
67 /// Returns an error (the default) if the function can not be registered,
68 /// for example if the registry is read only.
69 fn register_udwf(&mut self, _udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
70 not_impl_err!("Registering WindowUDF")
71 }
72
73 /// Deregisters a [`ScalarUDF`], returning the implementation that was
74 /// deregistered.
75 ///
76 /// Returns an error (the default) if the function can not be deregistered,
77 /// for example if the registry is read only.
78 fn deregister_udf(&mut self, _name: &str) -> Result<Option<Arc<ScalarUDF>>> {
79 not_impl_err!("Deregistering ScalarUDF")
80 }
81
82 /// Deregisters a [`AggregateUDF`], returning the implementation that was
83 /// deregistered.
84 ///
85 /// Returns an error (the default) if the function can not be deregistered,
86 /// for example if the registry is read only.
87 fn deregister_udaf(&mut self, _name: &str) -> Result<Option<Arc<AggregateUDF>>> {
88 not_impl_err!("Deregistering AggregateUDF")
89 }
90
91 /// Deregisters a [`WindowUDF`], returning the implementation that was
92 /// deregistered.
93 ///
94 /// Returns an error (the default) if the function can not be deregistered,
95 /// for example if the registry is read only.
96 fn deregister_udwf(&mut self, _name: &str) -> Result<Option<Arc<WindowUDF>>> {
97 not_impl_err!("Deregistering WindowUDF")
98 }
99
100 /// Registers a new [`FunctionRewrite`] with the registry.
101 ///
102 /// `FunctionRewrite` rules are used to rewrite certain / operators in the
103 /// logical plan to function calls. For example `a || b` might be written to
104 /// `array_concat(a, b)`.
105 ///
106 /// This allows the behavior of operators to be customized by the user.
107 fn register_function_rewrite(
108 &mut self,
109 _rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
110 ) -> Result<()> {
111 not_impl_err!("Registering FunctionRewrite")
112 }
113
114 /// Set of all registered [`ExprPlanner`]s
115 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>>;
116
117 /// Registers a new [`ExprPlanner`] with the registry.
118 fn register_expr_planner(
119 &mut self,
120 _expr_planner: Arc<dyn ExprPlanner>,
121 ) -> Result<()> {
122 not_impl_err!("Registering ExprPlanner")
123 }
124}
125
126/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
127pub trait SerializerRegistry: Debug + Send + Sync {
128 /// Serialize this node to a byte array. This serialization should not include
129 /// input plans.
130 fn serialize_logical_plan(
131 &self,
132 node: &dyn UserDefinedLogicalNode,
133 ) -> Result<Vec<u8>>;
134
135 /// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from
136 /// bytes.
137 fn deserialize_logical_plan(
138 &self,
139 name: &str,
140 bytes: &[u8],
141 ) -> Result<Arc<dyn UserDefinedLogicalNode>>;
142}
143
144/// A [`FunctionRegistry`] that uses in memory [`HashMap`]s
145#[derive(Default, Debug)]
146pub struct MemoryFunctionRegistry {
147 /// Scalar Functions
148 udfs: HashMap<String, Arc<ScalarUDF>>,
149 /// Aggregate Functions
150 udafs: HashMap<String, Arc<AggregateUDF>>,
151 /// Window Functions
152 udwfs: HashMap<String, Arc<WindowUDF>>,
153}
154
155impl MemoryFunctionRegistry {
156 pub fn new() -> Self {
157 Self::default()
158 }
159}
160
161impl FunctionRegistry for MemoryFunctionRegistry {
162 fn udfs(&self) -> HashSet<String> {
163 self.udfs.keys().cloned().collect()
164 }
165
166 fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
167 self.udfs
168 .get(name)
169 .cloned()
170 .ok_or_else(|| plan_datafusion_err!("Function {name} not found"))
171 }
172
173 fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
174 self.udafs
175 .get(name)
176 .cloned()
177 .ok_or_else(|| plan_datafusion_err!("Aggregate Function {name} not found"))
178 }
179
180 fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
181 self.udwfs
182 .get(name)
183 .cloned()
184 .ok_or_else(|| plan_datafusion_err!("Window Function {name} not found"))
185 }
186
187 fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
188 Ok(self.udfs.insert(udf.name().to_string(), udf))
189 }
190 fn register_udaf(
191 &mut self,
192 udaf: Arc<AggregateUDF>,
193 ) -> Result<Option<Arc<AggregateUDF>>> {
194 Ok(self.udafs.insert(udaf.name().into(), udaf))
195 }
196 fn register_udwf(&mut self, udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
197 Ok(self.udwfs.insert(udaf.name().into(), udaf))
198 }
199
200 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
201 vec![]
202 }
203}