datafusion_functions/crypto/
basic.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! "crypto" DataFusion functions
19
20use arrow::array::{Array, ArrayRef, BinaryArray, OffsetSizeTrait};
21use arrow::array::{AsArray, GenericStringArray, StringArray, StringViewArray};
22use arrow::datatypes::DataType;
23use blake2::{Blake2b512, Blake2s256, Digest};
24use blake3::Hasher as Blake3;
25use datafusion_common::cast::as_binary_array;
26
27use arrow::compute::StringArrayType;
28use datafusion_common::{
29    cast::as_generic_binary_array, exec_err, internal_err, plan_err,
30    utils::take_function_args, DataFusionError, Result, ScalarValue,
31};
32use datafusion_expr::ColumnarValue;
33use md5::Md5;
34use sha2::{Sha224, Sha256, Sha384, Sha512};
35use std::fmt::{self, Write};
36use std::str::FromStr;
37use std::sync::Arc;
38
39macro_rules! define_digest_function {
40    ($NAME: ident, $METHOD: ident, $DOC: expr) => {
41        #[doc = $DOC]
42        pub fn $NAME(args: &[ColumnarValue]) -> Result<ColumnarValue> {
43            let [data] = take_function_args(&DigestAlgorithm::$METHOD.to_string(), args)?;
44            digest_process(data, DigestAlgorithm::$METHOD)
45        }
46    };
47}
48define_digest_function!(
49    sha224,
50    Sha224,
51    "computes sha224 hash digest of the given input"
52);
53define_digest_function!(
54    sha256,
55    Sha256,
56    "computes sha256 hash digest of the given input"
57);
58define_digest_function!(
59    sha384,
60    Sha384,
61    "computes sha384 hash digest of the given input"
62);
63define_digest_function!(
64    sha512,
65    Sha512,
66    "computes sha512 hash digest of the given input"
67);
68define_digest_function!(
69    blake2b,
70    Blake2b,
71    "computes blake2b hash digest of the given input"
72);
73define_digest_function!(
74    blake2s,
75    Blake2s,
76    "computes blake2s hash digest of the given input"
77);
78define_digest_function!(
79    blake3,
80    Blake3,
81    "computes blake3 hash digest of the given input"
82);
83
84macro_rules! digest_to_scalar {
85    ($METHOD: ident, $INPUT:expr) => {{
86        ScalarValue::Binary($INPUT.as_ref().map(|v| {
87            let mut digest = $METHOD::default();
88            digest.update(v);
89            digest.finalize().as_slice().to_vec()
90        }))
91    }};
92}
93
94#[derive(Debug, Copy, Clone)]
95pub enum DigestAlgorithm {
96    Md5,
97    Sha224,
98    Sha256,
99    Sha384,
100    Sha512,
101    Blake2s,
102    Blake2b,
103    Blake3,
104}
105
106/// Digest computes a binary hash of the given data, accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`].
107/// Second argument is the algorithm to use.
108/// Standard algorithms are md5, sha1, sha224, sha256, sha384 and sha512.
109pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
110    let [data, digest_algorithm] = take_function_args("digest", args)?;
111    let digest_algorithm = match digest_algorithm {
112        ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
113            Some(Some(method)) => method.parse::<DigestAlgorithm>(),
114            _ => exec_err!("Unsupported data type {scalar:?} for function digest"),
115        },
116        ColumnarValue::Array(_) => {
117            internal_err!("Digest using dynamically decided method is not yet supported")
118        }
119    }?;
120    digest_process(data, digest_algorithm)
121}
122
123impl FromStr for DigestAlgorithm {
124    type Err = DataFusionError;
125    fn from_str(name: &str) -> Result<DigestAlgorithm> {
126        Ok(match name {
127            "md5" => Self::Md5,
128            "sha224" => Self::Sha224,
129            "sha256" => Self::Sha256,
130            "sha384" => Self::Sha384,
131            "sha512" => Self::Sha512,
132            "blake2b" => Self::Blake2b,
133            "blake2s" => Self::Blake2s,
134            "blake3" => Self::Blake3,
135            _ => {
136                let options = [
137                    Self::Md5,
138                    Self::Sha224,
139                    Self::Sha256,
140                    Self::Sha384,
141                    Self::Sha512,
142                    Self::Blake2s,
143                    Self::Blake2b,
144                    Self::Blake3,
145                ]
146                .iter()
147                .map(|i| i.to_string())
148                .collect::<Vec<_>>()
149                .join(", ");
150                return plan_err!(
151                    "There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
152                );
153            }
154        })
155    }
156}
157
158impl fmt::Display for DigestAlgorithm {
159    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160        write!(f, "{}", format!("{self:?}").to_lowercase())
161    }
162}
163
164/// computes md5 hash digest of the given input
165pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
166    let [data] = take_function_args("md5", args)?;
167    let value = digest_process(data, DigestAlgorithm::Md5)?;
168
169    // md5 requires special handling because of its unique utf8 return type
170    Ok(match value {
171        ColumnarValue::Array(array) => {
172            let binary_array = as_binary_array(&array)?;
173            let string_array: StringArray = binary_array
174                .iter()
175                .map(|opt| opt.map(hex_encode::<_>))
176                .collect();
177            ColumnarValue::Array(Arc::new(string_array))
178        }
179        ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
180            ColumnarValue::Scalar(ScalarValue::Utf8(opt.map(hex_encode::<_>)))
181        }
182        _ => return exec_err!("Impossibly got invalid results from digest"),
183    })
184}
185
186/// this function exists so that we do not need to pull in the crate hex. it is only used by md5
187/// function below
188#[inline]
189fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
190    let mut s = String::with_capacity(data.as_ref().len() * 2);
191    for b in data.as_ref() {
192        // Writing to a string never errors, so we can unwrap here.
193        write!(&mut s, "{b:02x}").unwrap();
194    }
195    s
196}
197pub fn utf8_or_binary_to_binary_type(
198    arg_type: &DataType,
199    name: &str,
200) -> Result<DataType> {
201    Ok(match arg_type {
202        DataType::Utf8View
203        | DataType::LargeUtf8
204        | DataType::Utf8
205        | DataType::Binary
206        | DataType::LargeBinary => DataType::Binary,
207        DataType::Null => DataType::Null,
208        _ => {
209            return plan_err!(
210                "The {name:?} function can only accept strings or binary arrays."
211            );
212        }
213    })
214}
215macro_rules! digest_to_array {
216    ($METHOD:ident, $INPUT:expr) => {{
217        let binary_array: BinaryArray = $INPUT
218            .iter()
219            .map(|x| {
220                x.map(|x| {
221                    let mut digest = $METHOD::default();
222                    digest.update(x);
223                    digest.finalize()
224                })
225            })
226            .collect();
227        Arc::new(binary_array)
228    }};
229}
230impl DigestAlgorithm {
231    /// digest an optional string to its hash value, null values are returned as is
232    pub fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
233        ColumnarValue::Scalar(match self {
234            Self::Md5 => digest_to_scalar!(Md5, value),
235            Self::Sha224 => digest_to_scalar!(Sha224, value),
236            Self::Sha256 => digest_to_scalar!(Sha256, value),
237            Self::Sha384 => digest_to_scalar!(Sha384, value),
238            Self::Sha512 => digest_to_scalar!(Sha512, value),
239            Self::Blake2b => digest_to_scalar!(Blake2b512, value),
240            Self::Blake2s => digest_to_scalar!(Blake2s256, value),
241            Self::Blake3 => ScalarValue::Binary(value.map(|v| {
242                let mut digest = Blake3::default();
243                digest.update(v);
244                Blake3::finalize(&digest).as_bytes().to_vec()
245            })),
246        })
247    }
248
249    /// digest a binary array to their hash values
250    pub fn digest_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
251    where
252        T: OffsetSizeTrait,
253    {
254        let input_value = as_generic_binary_array::<T>(value)?;
255        let array: ArrayRef = match self {
256            Self::Md5 => digest_to_array!(Md5, input_value),
257            Self::Sha224 => digest_to_array!(Sha224, input_value),
258            Self::Sha256 => digest_to_array!(Sha256, input_value),
259            Self::Sha384 => digest_to_array!(Sha384, input_value),
260            Self::Sha512 => digest_to_array!(Sha512, input_value),
261            Self::Blake2b => digest_to_array!(Blake2b512, input_value),
262            Self::Blake2s => digest_to_array!(Blake2s256, input_value),
263            Self::Blake3 => {
264                let binary_array: BinaryArray = input_value
265                    .iter()
266                    .map(|opt| {
267                        opt.map(|x| {
268                            let mut digest = Blake3::default();
269                            digest.update(x);
270                            Blake3::finalize(&digest).as_bytes().to_vec()
271                        })
272                    })
273                    .collect();
274                Arc::new(binary_array)
275            }
276        };
277        Ok(ColumnarValue::Array(array))
278    }
279
280    /// digest a string array to their hash values
281    pub fn digest_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
282    where
283        T: OffsetSizeTrait,
284    {
285        let array = match value.data_type() {
286            DataType::Utf8 | DataType::LargeUtf8 => {
287                let v = value.as_string::<T>();
288                self.digest_utf8_array_impl::<&GenericStringArray<T>>(v)
289            }
290            DataType::Utf8View => {
291                let v = value.as_string_view();
292                self.digest_utf8_array_impl::<&StringViewArray>(v)
293            }
294            other => {
295                return exec_err!("unsupported type for digest_utf_array: {other:?}")
296            }
297        };
298        Ok(ColumnarValue::Array(array))
299    }
300
301    pub fn digest_utf8_array_impl<'a, StringArrType>(
302        self,
303        input_value: StringArrType,
304    ) -> ArrayRef
305    where
306        StringArrType: StringArrayType<'a>,
307    {
308        match self {
309            Self::Md5 => digest_to_array!(Md5, input_value),
310            Self::Sha224 => digest_to_array!(Sha224, input_value),
311            Self::Sha256 => digest_to_array!(Sha256, input_value),
312            Self::Sha384 => digest_to_array!(Sha384, input_value),
313            Self::Sha512 => digest_to_array!(Sha512, input_value),
314            Self::Blake2b => digest_to_array!(Blake2b512, input_value),
315            Self::Blake2s => digest_to_array!(Blake2s256, input_value),
316            Self::Blake3 => {
317                let binary_array: BinaryArray = input_value
318                    .iter()
319                    .map(|opt| {
320                        opt.map(|x| {
321                            let mut digest = Blake3::default();
322                            digest.update(x.as_bytes());
323                            Blake3::finalize(&digest).as_bytes().to_vec()
324                        })
325                    })
326                    .collect();
327                Arc::new(binary_array)
328            }
329        }
330    }
331}
332pub fn digest_process(
333    value: &ColumnarValue,
334    digest_algorithm: DigestAlgorithm,
335) -> Result<ColumnarValue> {
336    match value {
337        ColumnarValue::Array(a) => match a.data_type() {
338            DataType::Utf8View => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
339            DataType::Utf8 => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
340            DataType::LargeUtf8 => digest_algorithm.digest_utf8_array::<i64>(a.as_ref()),
341            DataType::Binary => digest_algorithm.digest_binary_array::<i32>(a.as_ref()),
342            DataType::LargeBinary => {
343                digest_algorithm.digest_binary_array::<i64>(a.as_ref())
344            }
345            other => exec_err!(
346                "Unsupported data type {other:?} for function {digest_algorithm}"
347            ),
348        },
349        ColumnarValue::Scalar(scalar) => match scalar {
350            ScalarValue::Utf8View(a)
351            | ScalarValue::Utf8(a)
352            | ScalarValue::LargeUtf8(a) => {
353                Ok(digest_algorithm
354                    .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes())))
355            }
356            ScalarValue::Binary(a) | ScalarValue::LargeBinary(a) => Ok(digest_algorithm
357                .digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
358            other => exec_err!(
359                "Unsupported data type {other:?} for function {digest_algorithm}"
360            ),
361        },
362    }
363}