datafusion_functions/crypto/
basic.rs1use arrow::array::{
21 Array, ArrayRef, BinaryArray, BinaryArrayType, BinaryViewArray, GenericBinaryArray,
22 OffsetSizeTrait,
23};
24use arrow::array::{AsArray, GenericStringArray, StringArray, StringViewArray};
25use arrow::datatypes::DataType;
26use blake2::{Blake2b512, Blake2s256, Digest};
27use blake3::Hasher as Blake3;
28use datafusion_common::cast::as_binary_array;
29
30use arrow::compute::StringArrayType;
31use datafusion_common::{
32 exec_err, internal_err, plan_err, utils::take_function_args, DataFusionError, Result,
33 ScalarValue,
34};
35use datafusion_expr::ColumnarValue;
36use md5::Md5;
37use sha2::{Sha224, Sha256, Sha384, Sha512};
38use std::fmt::{self, Write};
39use std::str::FromStr;
40use std::sync::Arc;
41
42macro_rules! define_digest_function {
43 ($NAME: ident, $METHOD: ident, $DOC: expr) => {
44 #[doc = $DOC]
45 pub fn $NAME(args: &[ColumnarValue]) -> Result<ColumnarValue> {
46 let [data] = take_function_args(&DigestAlgorithm::$METHOD.to_string(), args)?;
47 digest_process(data, DigestAlgorithm::$METHOD)
48 }
49 };
50}
51define_digest_function!(
52 sha224,
53 Sha224,
54 "computes sha224 hash digest of the given input"
55);
56define_digest_function!(
57 sha256,
58 Sha256,
59 "computes sha256 hash digest of the given input"
60);
61define_digest_function!(
62 sha384,
63 Sha384,
64 "computes sha384 hash digest of the given input"
65);
66define_digest_function!(
67 sha512,
68 Sha512,
69 "computes sha512 hash digest of the given input"
70);
71define_digest_function!(
72 blake2b,
73 Blake2b,
74 "computes blake2b hash digest of the given input"
75);
76define_digest_function!(
77 blake2s,
78 Blake2s,
79 "computes blake2s hash digest of the given input"
80);
81define_digest_function!(
82 blake3,
83 Blake3,
84 "computes blake3 hash digest of the given input"
85);
86
87macro_rules! digest_to_scalar {
88 ($METHOD: ident, $INPUT:expr) => {{
89 ScalarValue::Binary($INPUT.as_ref().map(|v| {
90 let mut digest = $METHOD::default();
91 digest.update(v);
92 digest.finalize().as_slice().to_vec()
93 }))
94 }};
95}
96
97#[derive(Debug, Copy, Clone)]
98pub enum DigestAlgorithm {
99 Md5,
100 Sha224,
101 Sha256,
102 Sha384,
103 Sha512,
104 Blake2s,
105 Blake2b,
106 Blake3,
107}
108
109pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
113 let [data, digest_algorithm] = take_function_args("digest", args)?;
114 let digest_algorithm = match digest_algorithm {
115 ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
116 Some(Some(method)) => method.parse::<DigestAlgorithm>(),
117 _ => exec_err!("Unsupported data type {scalar:?} for function digest"),
118 },
119 ColumnarValue::Array(_) => {
120 internal_err!("Digest using dynamically decided method is not yet supported")
121 }
122 }?;
123 digest_process(data, digest_algorithm)
124}
125
126impl FromStr for DigestAlgorithm {
127 type Err = DataFusionError;
128 fn from_str(name: &str) -> Result<DigestAlgorithm> {
129 Ok(match name {
130 "md5" => Self::Md5,
131 "sha224" => Self::Sha224,
132 "sha256" => Self::Sha256,
133 "sha384" => Self::Sha384,
134 "sha512" => Self::Sha512,
135 "blake2b" => Self::Blake2b,
136 "blake2s" => Self::Blake2s,
137 "blake3" => Self::Blake3,
138 _ => {
139 let options = [
140 Self::Md5,
141 Self::Sha224,
142 Self::Sha256,
143 Self::Sha384,
144 Self::Sha512,
145 Self::Blake2s,
146 Self::Blake2b,
147 Self::Blake3,
148 ]
149 .iter()
150 .map(|i| i.to_string())
151 .collect::<Vec<_>>()
152 .join(", ");
153 return plan_err!(
154 "There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
155 );
156 }
157 })
158 }
159}
160
161impl fmt::Display for DigestAlgorithm {
162 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
163 write!(f, "{}", format!("{self:?}").to_lowercase())
164 }
165}
166
167pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
169 let [data] = take_function_args("md5", args)?;
170 let value = digest_process(data, DigestAlgorithm::Md5)?;
171
172 Ok(match value {
174 ColumnarValue::Array(array) => {
175 let binary_array = as_binary_array(&array)?;
176 let string_array: StringArray = binary_array
177 .iter()
178 .map(|opt| opt.map(hex_encode::<_>))
179 .collect();
180 ColumnarValue::Array(Arc::new(string_array))
181 }
182 ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
183 ColumnarValue::Scalar(ScalarValue::Utf8(opt.map(hex_encode::<_>)))
184 }
185 _ => return exec_err!("Impossibly got invalid results from digest"),
186 })
187}
188
189#[inline]
192fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
193 let mut s = String::with_capacity(data.as_ref().len() * 2);
194 for b in data.as_ref() {
195 write!(&mut s, "{b:02x}").unwrap();
197 }
198 s
199}
200pub fn utf8_or_binary_to_binary_type(
201 arg_type: &DataType,
202 name: &str,
203) -> Result<DataType> {
204 Ok(match arg_type {
205 DataType::Utf8View
206 | DataType::LargeUtf8
207 | DataType::Utf8
208 | DataType::Binary
209 | DataType::BinaryView
210 | DataType::LargeBinary => DataType::Binary,
211 DataType::Null => DataType::Null,
212 _ => {
213 return plan_err!(
214 "The {name:?} function can only accept strings or binary arrays."
215 );
216 }
217 })
218}
219macro_rules! digest_to_array {
220 ($METHOD:ident, $INPUT:expr) => {{
221 let binary_array: BinaryArray = $INPUT
222 .iter()
223 .map(|x| {
224 x.map(|x| {
225 let mut digest = $METHOD::default();
226 digest.update(x);
227 digest.finalize()
228 })
229 })
230 .collect();
231 Arc::new(binary_array)
232 }};
233}
234impl DigestAlgorithm {
235 pub fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
237 ColumnarValue::Scalar(match self {
238 Self::Md5 => digest_to_scalar!(Md5, value),
239 Self::Sha224 => digest_to_scalar!(Sha224, value),
240 Self::Sha256 => digest_to_scalar!(Sha256, value),
241 Self::Sha384 => digest_to_scalar!(Sha384, value),
242 Self::Sha512 => digest_to_scalar!(Sha512, value),
243 Self::Blake2b => digest_to_scalar!(Blake2b512, value),
244 Self::Blake2s => digest_to_scalar!(Blake2s256, value),
245 Self::Blake3 => ScalarValue::Binary(value.map(|v| {
246 let mut digest = Blake3::default();
247 digest.update(v);
248 Blake3::finalize(&digest).as_bytes().to_vec()
249 })),
250 })
251 }
252
253 pub fn digest_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
255 where
256 T: OffsetSizeTrait,
257 {
258 let array = match value.data_type() {
259 DataType::Binary | DataType::LargeBinary => {
260 let v = value.as_binary::<T>();
261 self.digest_binary_array_impl::<&GenericBinaryArray<T>>(v)
262 }
263 DataType::BinaryView => {
264 let v = value.as_binary_view();
265 self.digest_binary_array_impl::<&BinaryViewArray>(v)
266 }
267 other => {
268 return exec_err!("unsupported type for digest_utf_array: {other:?}")
269 }
270 };
271 Ok(ColumnarValue::Array(array))
272 }
273
274 pub fn digest_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
276 where
277 T: OffsetSizeTrait,
278 {
279 let array = match value.data_type() {
280 DataType::Utf8 | DataType::LargeUtf8 => {
281 let v = value.as_string::<T>();
282 self.digest_utf8_array_impl::<&GenericStringArray<T>>(v)
283 }
284 DataType::Utf8View => {
285 let v = value.as_string_view();
286 self.digest_utf8_array_impl::<&StringViewArray>(v)
287 }
288 other => {
289 return exec_err!("unsupported type for digest_utf_array: {other:?}")
290 }
291 };
292 Ok(ColumnarValue::Array(array))
293 }
294
295 pub fn digest_utf8_array_impl<'a, StringArrType>(
296 self,
297 input_value: StringArrType,
298 ) -> ArrayRef
299 where
300 StringArrType: StringArrayType<'a>,
301 {
302 match self {
303 Self::Md5 => digest_to_array!(Md5, input_value),
304 Self::Sha224 => digest_to_array!(Sha224, input_value),
305 Self::Sha256 => digest_to_array!(Sha256, input_value),
306 Self::Sha384 => digest_to_array!(Sha384, input_value),
307 Self::Sha512 => digest_to_array!(Sha512, input_value),
308 Self::Blake2b => digest_to_array!(Blake2b512, input_value),
309 Self::Blake2s => digest_to_array!(Blake2s256, input_value),
310 Self::Blake3 => {
311 let binary_array: BinaryArray = input_value
312 .iter()
313 .map(|opt| {
314 opt.map(|x| {
315 let mut digest = Blake3::default();
316 digest.update(x.as_bytes());
317 Blake3::finalize(&digest).as_bytes().to_vec()
318 })
319 })
320 .collect();
321 Arc::new(binary_array)
322 }
323 }
324 }
325
326 pub fn digest_binary_array_impl<'a, BinaryArrType>(
327 self,
328 input_value: BinaryArrType,
329 ) -> ArrayRef
330 where
331 BinaryArrType: BinaryArrayType<'a>,
332 {
333 match self {
334 Self::Md5 => digest_to_array!(Md5, input_value),
335 Self::Sha224 => digest_to_array!(Sha224, input_value),
336 Self::Sha256 => digest_to_array!(Sha256, input_value),
337 Self::Sha384 => digest_to_array!(Sha384, input_value),
338 Self::Sha512 => digest_to_array!(Sha512, input_value),
339 Self::Blake2b => digest_to_array!(Blake2b512, input_value),
340 Self::Blake2s => digest_to_array!(Blake2s256, input_value),
341 Self::Blake3 => {
342 let binary_array: BinaryArray = input_value
343 .iter()
344 .map(|opt| {
345 opt.map(|x| {
346 let mut digest = Blake3::default();
347 digest.update(x);
348 Blake3::finalize(&digest).as_bytes().to_vec()
349 })
350 })
351 .collect();
352 Arc::new(binary_array)
353 }
354 }
355 }
356}
357pub fn digest_process(
358 value: &ColumnarValue,
359 digest_algorithm: DigestAlgorithm,
360) -> Result<ColumnarValue> {
361 match value {
362 ColumnarValue::Array(a) => match a.data_type() {
363 DataType::Utf8View => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
364 DataType::Utf8 => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
365 DataType::LargeUtf8 => digest_algorithm.digest_utf8_array::<i64>(a.as_ref()),
366 DataType::Binary => digest_algorithm.digest_binary_array::<i32>(a.as_ref()),
367 DataType::LargeBinary => {
368 digest_algorithm.digest_binary_array::<i64>(a.as_ref())
369 }
370 DataType::BinaryView => {
371 digest_algorithm.digest_binary_array::<i32>(a.as_ref())
372 }
373 other => exec_err!(
374 "Unsupported data type {other:?} for function {digest_algorithm}"
375 ),
376 },
377 ColumnarValue::Scalar(scalar) => {
378 match scalar {
379 ScalarValue::Utf8View(a)
380 | ScalarValue::Utf8(a)
381 | ScalarValue::LargeUtf8(a) => Ok(digest_algorithm
382 .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))),
383 ScalarValue::Binary(a)
384 | ScalarValue::LargeBinary(a)
385 | ScalarValue::BinaryView(a) => Ok(digest_algorithm
386 .digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
387 other => exec_err!(
388 "Unsupported data type {other:?} for function {digest_algorithm}"
389 ),
390 }
391 }
392 }
393}