datafusion_functions/crypto/
basic.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! "crypto" DataFusion functions
19
20use arrow::array::{
21    Array, ArrayRef, BinaryArray, BinaryArrayType, BinaryViewArray, GenericBinaryArray,
22    OffsetSizeTrait,
23};
24use arrow::array::{AsArray, GenericStringArray, StringArray, StringViewArray};
25use arrow::datatypes::DataType;
26use blake2::{Blake2b512, Blake2s256, Digest};
27use blake3::Hasher as Blake3;
28use datafusion_common::cast::as_binary_array;
29
30use arrow::compute::StringArrayType;
31use datafusion_common::{
32    exec_err, internal_err, plan_err, utils::take_function_args, DataFusionError, Result,
33    ScalarValue,
34};
35use datafusion_expr::ColumnarValue;
36use md5::Md5;
37use sha2::{Sha224, Sha256, Sha384, Sha512};
38use std::fmt::{self, Write};
39use std::str::FromStr;
40use std::sync::Arc;
41
42macro_rules! define_digest_function {
43    ($NAME: ident, $METHOD: ident, $DOC: expr) => {
44        #[doc = $DOC]
45        pub fn $NAME(args: &[ColumnarValue]) -> Result<ColumnarValue> {
46            let [data] = take_function_args(&DigestAlgorithm::$METHOD.to_string(), args)?;
47            digest_process(data, DigestAlgorithm::$METHOD)
48        }
49    };
50}
51define_digest_function!(
52    sha224,
53    Sha224,
54    "computes sha224 hash digest of the given input"
55);
56define_digest_function!(
57    sha256,
58    Sha256,
59    "computes sha256 hash digest of the given input"
60);
61define_digest_function!(
62    sha384,
63    Sha384,
64    "computes sha384 hash digest of the given input"
65);
66define_digest_function!(
67    sha512,
68    Sha512,
69    "computes sha512 hash digest of the given input"
70);
71define_digest_function!(
72    blake2b,
73    Blake2b,
74    "computes blake2b hash digest of the given input"
75);
76define_digest_function!(
77    blake2s,
78    Blake2s,
79    "computes blake2s hash digest of the given input"
80);
81define_digest_function!(
82    blake3,
83    Blake3,
84    "computes blake3 hash digest of the given input"
85);
86
87macro_rules! digest_to_scalar {
88    ($METHOD: ident, $INPUT:expr) => {{
89        ScalarValue::Binary($INPUT.as_ref().map(|v| {
90            let mut digest = $METHOD::default();
91            digest.update(v);
92            digest.finalize().as_slice().to_vec()
93        }))
94    }};
95}
96
97#[derive(Debug, Copy, Clone)]
98pub enum DigestAlgorithm {
99    Md5,
100    Sha224,
101    Sha256,
102    Sha384,
103    Sha512,
104    Blake2s,
105    Blake2b,
106    Blake3,
107}
108
109/// Digest computes a binary hash of the given data, accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`].
110/// Second argument is the algorithm to use.
111/// Standard algorithms are md5, sha1, sha224, sha256, sha384 and sha512.
112pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
113    let [data, digest_algorithm] = take_function_args("digest", args)?;
114    let digest_algorithm = match digest_algorithm {
115        ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
116            Some(Some(method)) => method.parse::<DigestAlgorithm>(),
117            _ => exec_err!("Unsupported data type {scalar:?} for function digest"),
118        },
119        ColumnarValue::Array(_) => {
120            internal_err!("Digest using dynamically decided method is not yet supported")
121        }
122    }?;
123    digest_process(data, digest_algorithm)
124}
125
126impl FromStr for DigestAlgorithm {
127    type Err = DataFusionError;
128    fn from_str(name: &str) -> Result<DigestAlgorithm> {
129        Ok(match name {
130            "md5" => Self::Md5,
131            "sha224" => Self::Sha224,
132            "sha256" => Self::Sha256,
133            "sha384" => Self::Sha384,
134            "sha512" => Self::Sha512,
135            "blake2b" => Self::Blake2b,
136            "blake2s" => Self::Blake2s,
137            "blake3" => Self::Blake3,
138            _ => {
139                let options = [
140                    Self::Md5,
141                    Self::Sha224,
142                    Self::Sha256,
143                    Self::Sha384,
144                    Self::Sha512,
145                    Self::Blake2s,
146                    Self::Blake2b,
147                    Self::Blake3,
148                ]
149                .iter()
150                .map(|i| i.to_string())
151                .collect::<Vec<_>>()
152                .join(", ");
153                return plan_err!(
154                    "There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
155                );
156            }
157        })
158    }
159}
160
161impl fmt::Display for DigestAlgorithm {
162    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
163        write!(f, "{}", format!("{self:?}").to_lowercase())
164    }
165}
166
167/// computes md5 hash digest of the given input
168pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
169    let [data] = take_function_args("md5", args)?;
170    let value = digest_process(data, DigestAlgorithm::Md5)?;
171
172    // md5 requires special handling because of its unique utf8 return type
173    Ok(match value {
174        ColumnarValue::Array(array) => {
175            let binary_array = as_binary_array(&array)?;
176            let string_array: StringArray = binary_array
177                .iter()
178                .map(|opt| opt.map(hex_encode::<_>))
179                .collect();
180            ColumnarValue::Array(Arc::new(string_array))
181        }
182        ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
183            ColumnarValue::Scalar(ScalarValue::Utf8(opt.map(hex_encode::<_>)))
184        }
185        _ => return exec_err!("Impossibly got invalid results from digest"),
186    })
187}
188
189/// this function exists so that we do not need to pull in the crate hex. it is only used by md5
190/// function below
191#[inline]
192fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
193    let mut s = String::with_capacity(data.as_ref().len() * 2);
194    for b in data.as_ref() {
195        // Writing to a string never errors, so we can unwrap here.
196        write!(&mut s, "{b:02x}").unwrap();
197    }
198    s
199}
200pub fn utf8_or_binary_to_binary_type(
201    arg_type: &DataType,
202    name: &str,
203) -> Result<DataType> {
204    Ok(match arg_type {
205        DataType::Utf8View
206        | DataType::LargeUtf8
207        | DataType::Utf8
208        | DataType::Binary
209        | DataType::BinaryView
210        | DataType::LargeBinary => DataType::Binary,
211        DataType::Null => DataType::Null,
212        _ => {
213            return plan_err!(
214                "The {name:?} function can only accept strings or binary arrays."
215            );
216        }
217    })
218}
219macro_rules! digest_to_array {
220    ($METHOD:ident, $INPUT:expr) => {{
221        let binary_array: BinaryArray = $INPUT
222            .iter()
223            .map(|x| {
224                x.map(|x| {
225                    let mut digest = $METHOD::default();
226                    digest.update(x);
227                    digest.finalize()
228                })
229            })
230            .collect();
231        Arc::new(binary_array)
232    }};
233}
234impl DigestAlgorithm {
235    /// digest an optional string to its hash value, null values are returned as is
236    pub fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
237        ColumnarValue::Scalar(match self {
238            Self::Md5 => digest_to_scalar!(Md5, value),
239            Self::Sha224 => digest_to_scalar!(Sha224, value),
240            Self::Sha256 => digest_to_scalar!(Sha256, value),
241            Self::Sha384 => digest_to_scalar!(Sha384, value),
242            Self::Sha512 => digest_to_scalar!(Sha512, value),
243            Self::Blake2b => digest_to_scalar!(Blake2b512, value),
244            Self::Blake2s => digest_to_scalar!(Blake2s256, value),
245            Self::Blake3 => ScalarValue::Binary(value.map(|v| {
246                let mut digest = Blake3::default();
247                digest.update(v);
248                Blake3::finalize(&digest).as_bytes().to_vec()
249            })),
250        })
251    }
252
253    /// digest a binary array to their hash values
254    pub fn digest_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
255    where
256        T: OffsetSizeTrait,
257    {
258        let array = match value.data_type() {
259            DataType::Binary | DataType::LargeBinary => {
260                let v = value.as_binary::<T>();
261                self.digest_binary_array_impl::<&GenericBinaryArray<T>>(v)
262            }
263            DataType::BinaryView => {
264                let v = value.as_binary_view();
265                self.digest_binary_array_impl::<&BinaryViewArray>(v)
266            }
267            other => {
268                return exec_err!("unsupported type for digest_utf_array: {other:?}")
269            }
270        };
271        Ok(ColumnarValue::Array(array))
272    }
273
274    /// digest a string array to their hash values
275    pub fn digest_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
276    where
277        T: OffsetSizeTrait,
278    {
279        let array = match value.data_type() {
280            DataType::Utf8 | DataType::LargeUtf8 => {
281                let v = value.as_string::<T>();
282                self.digest_utf8_array_impl::<&GenericStringArray<T>>(v)
283            }
284            DataType::Utf8View => {
285                let v = value.as_string_view();
286                self.digest_utf8_array_impl::<&StringViewArray>(v)
287            }
288            other => {
289                return exec_err!("unsupported type for digest_utf_array: {other:?}")
290            }
291        };
292        Ok(ColumnarValue::Array(array))
293    }
294
295    pub fn digest_utf8_array_impl<'a, StringArrType>(
296        self,
297        input_value: StringArrType,
298    ) -> ArrayRef
299    where
300        StringArrType: StringArrayType<'a>,
301    {
302        match self {
303            Self::Md5 => digest_to_array!(Md5, input_value),
304            Self::Sha224 => digest_to_array!(Sha224, input_value),
305            Self::Sha256 => digest_to_array!(Sha256, input_value),
306            Self::Sha384 => digest_to_array!(Sha384, input_value),
307            Self::Sha512 => digest_to_array!(Sha512, input_value),
308            Self::Blake2b => digest_to_array!(Blake2b512, input_value),
309            Self::Blake2s => digest_to_array!(Blake2s256, input_value),
310            Self::Blake3 => {
311                let binary_array: BinaryArray = input_value
312                    .iter()
313                    .map(|opt| {
314                        opt.map(|x| {
315                            let mut digest = Blake3::default();
316                            digest.update(x.as_bytes());
317                            Blake3::finalize(&digest).as_bytes().to_vec()
318                        })
319                    })
320                    .collect();
321                Arc::new(binary_array)
322            }
323        }
324    }
325
326    pub fn digest_binary_array_impl<'a, BinaryArrType>(
327        self,
328        input_value: BinaryArrType,
329    ) -> ArrayRef
330    where
331        BinaryArrType: BinaryArrayType<'a>,
332    {
333        match self {
334            Self::Md5 => digest_to_array!(Md5, input_value),
335            Self::Sha224 => digest_to_array!(Sha224, input_value),
336            Self::Sha256 => digest_to_array!(Sha256, input_value),
337            Self::Sha384 => digest_to_array!(Sha384, input_value),
338            Self::Sha512 => digest_to_array!(Sha512, input_value),
339            Self::Blake2b => digest_to_array!(Blake2b512, input_value),
340            Self::Blake2s => digest_to_array!(Blake2s256, input_value),
341            Self::Blake3 => {
342                let binary_array: BinaryArray = input_value
343                    .iter()
344                    .map(|opt| {
345                        opt.map(|x| {
346                            let mut digest = Blake3::default();
347                            digest.update(x);
348                            Blake3::finalize(&digest).as_bytes().to_vec()
349                        })
350                    })
351                    .collect();
352                Arc::new(binary_array)
353            }
354        }
355    }
356}
357pub fn digest_process(
358    value: &ColumnarValue,
359    digest_algorithm: DigestAlgorithm,
360) -> Result<ColumnarValue> {
361    match value {
362        ColumnarValue::Array(a) => match a.data_type() {
363            DataType::Utf8View => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
364            DataType::Utf8 => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
365            DataType::LargeUtf8 => digest_algorithm.digest_utf8_array::<i64>(a.as_ref()),
366            DataType::Binary => digest_algorithm.digest_binary_array::<i32>(a.as_ref()),
367            DataType::LargeBinary => {
368                digest_algorithm.digest_binary_array::<i64>(a.as_ref())
369            }
370            DataType::BinaryView => {
371                digest_algorithm.digest_binary_array::<i32>(a.as_ref())
372            }
373            other => exec_err!(
374                "Unsupported data type {other:?} for function {digest_algorithm}"
375            ),
376        },
377        ColumnarValue::Scalar(scalar) => {
378            match scalar {
379                ScalarValue::Utf8View(a)
380                | ScalarValue::Utf8(a)
381                | ScalarValue::LargeUtf8(a) => Ok(digest_algorithm
382                    .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))),
383                ScalarValue::Binary(a)
384                | ScalarValue::LargeBinary(a)
385                | ScalarValue::BinaryView(a) => Ok(digest_algorithm
386                    .digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
387                other => exec_err!(
388                    "Unsupported data type {other:?} for function {digest_algorithm}"
389                ),
390            }
391        }
392    }
393}