datafusion_expr/
registry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionRegistry trait
19
20use crate::expr_rewriter::FunctionRewrite;
21use crate::planner::ExprPlanner;
22use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
23use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result};
24use std::collections::HashSet;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28/// A registry knows how to build logical expressions out of user-defined function' names
29pub trait FunctionRegistry {
30    /// Set of all available udfs.
31    fn udfs(&self) -> HashSet<String>;
32
33    /// Returns a reference to the user defined scalar function (udf) named
34    /// `name`.
35    fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>>;
36
37    /// Returns a reference to the user defined aggregate function (udaf) named
38    /// `name`.
39    fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>>;
40
41    /// Returns a reference to the user defined window function (udwf) named
42    /// `name`.
43    fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>>;
44
45    /// Registers a new [`ScalarUDF`], returning any previously registered
46    /// implementation.
47    ///
48    /// Returns an error (the default) if the function can not be registered,
49    /// for example if the registry is read only.
50    fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
51        not_impl_err!("Registering ScalarUDF")
52    }
53    /// Registers a new [`AggregateUDF`], returning any previously registered
54    /// implementation.
55    ///
56    /// Returns an error (the default) if the function can not be registered,
57    /// for example if the registry is read only.
58    fn register_udaf(
59        &mut self,
60        _udaf: Arc<AggregateUDF>,
61    ) -> Result<Option<Arc<AggregateUDF>>> {
62        not_impl_err!("Registering AggregateUDF")
63    }
64    /// Registers a new [`WindowUDF`], returning any previously registered
65    /// implementation.
66    ///
67    /// Returns an error (the default) if the function can not be registered,
68    /// for example if the registry is read only.
69    fn register_udwf(&mut self, _udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
70        not_impl_err!("Registering WindowUDF")
71    }
72
73    /// Deregisters a [`ScalarUDF`], returning the implementation that was
74    /// deregistered.
75    ///
76    /// Returns an error (the default) if the function can not be deregistered,
77    /// for example if the registry is read only.
78    fn deregister_udf(&mut self, _name: &str) -> Result<Option<Arc<ScalarUDF>>> {
79        not_impl_err!("Deregistering ScalarUDF")
80    }
81
82    /// Deregisters a [`AggregateUDF`], returning the implementation that was
83    /// deregistered.
84    ///
85    /// Returns an error (the default) if the function can not be deregistered,
86    /// for example if the registry is read only.
87    fn deregister_udaf(&mut self, _name: &str) -> Result<Option<Arc<AggregateUDF>>> {
88        not_impl_err!("Deregistering AggregateUDF")
89    }
90
91    /// Deregisters a [`WindowUDF`], returning the implementation that was
92    /// deregistered.
93    ///
94    /// Returns an error (the default) if the function can not be deregistered,
95    /// for example if the registry is read only.
96    fn deregister_udwf(&mut self, _name: &str) -> Result<Option<Arc<WindowUDF>>> {
97        not_impl_err!("Deregistering WindowUDF")
98    }
99
100    /// Registers a new [`FunctionRewrite`] with the registry.
101    ///
102    /// `FunctionRewrite` rules are used to rewrite certain / operators in the
103    /// logical plan to function calls.  For example `a || b` might be written to
104    /// `array_concat(a, b)`.
105    ///
106    /// This allows the behavior of operators to be customized by the user.
107    fn register_function_rewrite(
108        &mut self,
109        _rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
110    ) -> Result<()> {
111        not_impl_err!("Registering FunctionRewrite")
112    }
113
114    /// Set of all registered [`ExprPlanner`]s
115    fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>>;
116
117    /// Registers a new [`ExprPlanner`] with the registry.
118    fn register_expr_planner(
119        &mut self,
120        _expr_planner: Arc<dyn ExprPlanner>,
121    ) -> Result<()> {
122        not_impl_err!("Registering ExprPlanner")
123    }
124}
125
126/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
127pub trait SerializerRegistry: Debug + Send + Sync {
128    /// Serialize this node to a byte array. This serialization should not include
129    /// input plans.
130    fn serialize_logical_plan(
131        &self,
132        node: &dyn UserDefinedLogicalNode,
133    ) -> Result<Vec<u8>>;
134
135    /// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from
136    /// bytes.
137    fn deserialize_logical_plan(
138        &self,
139        name: &str,
140        bytes: &[u8],
141    ) -> Result<Arc<dyn UserDefinedLogicalNode>>;
142}
143
144/// A  [`FunctionRegistry`] that uses in memory [`HashMap`]s
145#[derive(Default, Debug)]
146pub struct MemoryFunctionRegistry {
147    /// Scalar Functions
148    udfs: HashMap<String, Arc<ScalarUDF>>,
149    /// Aggregate Functions
150    udafs: HashMap<String, Arc<AggregateUDF>>,
151    /// Window Functions
152    udwfs: HashMap<String, Arc<WindowUDF>>,
153}
154
155impl MemoryFunctionRegistry {
156    pub fn new() -> Self {
157        Self::default()
158    }
159}
160
161impl FunctionRegistry for MemoryFunctionRegistry {
162    fn udfs(&self) -> HashSet<String> {
163        self.udfs.keys().cloned().collect()
164    }
165
166    fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
167        self.udfs
168            .get(name)
169            .cloned()
170            .ok_or_else(|| plan_datafusion_err!("Function {name} not found"))
171    }
172
173    fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
174        self.udafs
175            .get(name)
176            .cloned()
177            .ok_or_else(|| plan_datafusion_err!("Aggregate Function {name} not found"))
178    }
179
180    fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
181        self.udwfs
182            .get(name)
183            .cloned()
184            .ok_or_else(|| plan_datafusion_err!("Window Function {name} not found"))
185    }
186
187    fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
188        Ok(self.udfs.insert(udf.name().to_string(), udf))
189    }
190    fn register_udaf(
191        &mut self,
192        udaf: Arc<AggregateUDF>,
193    ) -> Result<Option<Arc<AggregateUDF>>> {
194        Ok(self.udafs.insert(udaf.name().into(), udaf))
195    }
196    fn register_udwf(&mut self, udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
197        Ok(self.udwfs.insert(udaf.name().into(), udaf))
198    }
199
200    fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
201        vec![]
202    }
203}