datafusion_functions/crypto/
basic.rs1use arrow::array::{Array, ArrayRef, BinaryArray, OffsetSizeTrait};
21use arrow::array::{AsArray, GenericStringArray, StringArray, StringViewArray};
22use arrow::datatypes::DataType;
23use blake2::{Blake2b512, Blake2s256, Digest};
24use blake3::Hasher as Blake3;
25use datafusion_common::cast::as_binary_array;
26
27use arrow::compute::StringArrayType;
28use datafusion_common::{
29 cast::as_generic_binary_array, exec_err, internal_err, plan_err,
30 utils::take_function_args, DataFusionError, Result, ScalarValue,
31};
32use datafusion_expr::ColumnarValue;
33use md5::Md5;
34use sha2::{Sha224, Sha256, Sha384, Sha512};
35use std::fmt::{self, Write};
36use std::str::FromStr;
37use std::sync::Arc;
38
39macro_rules! define_digest_function {
40 ($NAME: ident, $METHOD: ident, $DOC: expr) => {
41 #[doc = $DOC]
42 pub fn $NAME(args: &[ColumnarValue]) -> Result<ColumnarValue> {
43 let [data] = take_function_args(&DigestAlgorithm::$METHOD.to_string(), args)?;
44 digest_process(data, DigestAlgorithm::$METHOD)
45 }
46 };
47}
48define_digest_function!(
49 sha224,
50 Sha224,
51 "computes sha224 hash digest of the given input"
52);
53define_digest_function!(
54 sha256,
55 Sha256,
56 "computes sha256 hash digest of the given input"
57);
58define_digest_function!(
59 sha384,
60 Sha384,
61 "computes sha384 hash digest of the given input"
62);
63define_digest_function!(
64 sha512,
65 Sha512,
66 "computes sha512 hash digest of the given input"
67);
68define_digest_function!(
69 blake2b,
70 Blake2b,
71 "computes blake2b hash digest of the given input"
72);
73define_digest_function!(
74 blake2s,
75 Blake2s,
76 "computes blake2s hash digest of the given input"
77);
78define_digest_function!(
79 blake3,
80 Blake3,
81 "computes blake3 hash digest of the given input"
82);
83
84macro_rules! digest_to_scalar {
85 ($METHOD: ident, $INPUT:expr) => {{
86 ScalarValue::Binary($INPUT.as_ref().map(|v| {
87 let mut digest = $METHOD::default();
88 digest.update(v);
89 digest.finalize().as_slice().to_vec()
90 }))
91 }};
92}
93
94#[derive(Debug, Copy, Clone)]
95pub enum DigestAlgorithm {
96 Md5,
97 Sha224,
98 Sha256,
99 Sha384,
100 Sha512,
101 Blake2s,
102 Blake2b,
103 Blake3,
104}
105
106pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
110 let [data, digest_algorithm] = take_function_args("digest", args)?;
111 let digest_algorithm = match digest_algorithm {
112 ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
113 Some(Some(method)) => method.parse::<DigestAlgorithm>(),
114 _ => exec_err!("Unsupported data type {scalar:?} for function digest"),
115 },
116 ColumnarValue::Array(_) => {
117 internal_err!("Digest using dynamically decided method is not yet supported")
118 }
119 }?;
120 digest_process(data, digest_algorithm)
121}
122
123impl FromStr for DigestAlgorithm {
124 type Err = DataFusionError;
125 fn from_str(name: &str) -> Result<DigestAlgorithm> {
126 Ok(match name {
127 "md5" => Self::Md5,
128 "sha224" => Self::Sha224,
129 "sha256" => Self::Sha256,
130 "sha384" => Self::Sha384,
131 "sha512" => Self::Sha512,
132 "blake2b" => Self::Blake2b,
133 "blake2s" => Self::Blake2s,
134 "blake3" => Self::Blake3,
135 _ => {
136 let options = [
137 Self::Md5,
138 Self::Sha224,
139 Self::Sha256,
140 Self::Sha384,
141 Self::Sha512,
142 Self::Blake2s,
143 Self::Blake2b,
144 Self::Blake3,
145 ]
146 .iter()
147 .map(|i| i.to_string())
148 .collect::<Vec<_>>()
149 .join(", ");
150 return plan_err!(
151 "There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
152 );
153 }
154 })
155 }
156}
157
158impl fmt::Display for DigestAlgorithm {
159 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160 write!(f, "{}", format!("{self:?}").to_lowercase())
161 }
162}
163
164pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
166 let [data] = take_function_args("md5", args)?;
167 let value = digest_process(data, DigestAlgorithm::Md5)?;
168
169 Ok(match value {
171 ColumnarValue::Array(array) => {
172 let binary_array = as_binary_array(&array)?;
173 let string_array: StringArray = binary_array
174 .iter()
175 .map(|opt| opt.map(hex_encode::<_>))
176 .collect();
177 ColumnarValue::Array(Arc::new(string_array))
178 }
179 ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
180 ColumnarValue::Scalar(ScalarValue::Utf8(opt.map(hex_encode::<_>)))
181 }
182 _ => return exec_err!("Impossibly got invalid results from digest"),
183 })
184}
185
186#[inline]
189fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
190 let mut s = String::with_capacity(data.as_ref().len() * 2);
191 for b in data.as_ref() {
192 write!(&mut s, "{b:02x}").unwrap();
194 }
195 s
196}
197pub fn utf8_or_binary_to_binary_type(
198 arg_type: &DataType,
199 name: &str,
200) -> Result<DataType> {
201 Ok(match arg_type {
202 DataType::Utf8View
203 | DataType::LargeUtf8
204 | DataType::Utf8
205 | DataType::Binary
206 | DataType::LargeBinary => DataType::Binary,
207 DataType::Null => DataType::Null,
208 _ => {
209 return plan_err!(
210 "The {name:?} function can only accept strings or binary arrays."
211 );
212 }
213 })
214}
215macro_rules! digest_to_array {
216 ($METHOD:ident, $INPUT:expr) => {{
217 let binary_array: BinaryArray = $INPUT
218 .iter()
219 .map(|x| {
220 x.map(|x| {
221 let mut digest = $METHOD::default();
222 digest.update(x);
223 digest.finalize()
224 })
225 })
226 .collect();
227 Arc::new(binary_array)
228 }};
229}
230impl DigestAlgorithm {
231 pub fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
233 ColumnarValue::Scalar(match self {
234 Self::Md5 => digest_to_scalar!(Md5, value),
235 Self::Sha224 => digest_to_scalar!(Sha224, value),
236 Self::Sha256 => digest_to_scalar!(Sha256, value),
237 Self::Sha384 => digest_to_scalar!(Sha384, value),
238 Self::Sha512 => digest_to_scalar!(Sha512, value),
239 Self::Blake2b => digest_to_scalar!(Blake2b512, value),
240 Self::Blake2s => digest_to_scalar!(Blake2s256, value),
241 Self::Blake3 => ScalarValue::Binary(value.map(|v| {
242 let mut digest = Blake3::default();
243 digest.update(v);
244 Blake3::finalize(&digest).as_bytes().to_vec()
245 })),
246 })
247 }
248
249 pub fn digest_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
251 where
252 T: OffsetSizeTrait,
253 {
254 let input_value = as_generic_binary_array::<T>(value)?;
255 let array: ArrayRef = match self {
256 Self::Md5 => digest_to_array!(Md5, input_value),
257 Self::Sha224 => digest_to_array!(Sha224, input_value),
258 Self::Sha256 => digest_to_array!(Sha256, input_value),
259 Self::Sha384 => digest_to_array!(Sha384, input_value),
260 Self::Sha512 => digest_to_array!(Sha512, input_value),
261 Self::Blake2b => digest_to_array!(Blake2b512, input_value),
262 Self::Blake2s => digest_to_array!(Blake2s256, input_value),
263 Self::Blake3 => {
264 let binary_array: BinaryArray = input_value
265 .iter()
266 .map(|opt| {
267 opt.map(|x| {
268 let mut digest = Blake3::default();
269 digest.update(x);
270 Blake3::finalize(&digest).as_bytes().to_vec()
271 })
272 })
273 .collect();
274 Arc::new(binary_array)
275 }
276 };
277 Ok(ColumnarValue::Array(array))
278 }
279
280 pub fn digest_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
282 where
283 T: OffsetSizeTrait,
284 {
285 let array = match value.data_type() {
286 DataType::Utf8 | DataType::LargeUtf8 => {
287 let v = value.as_string::<T>();
288 self.digest_utf8_array_impl::<&GenericStringArray<T>>(v)
289 }
290 DataType::Utf8View => {
291 let v = value.as_string_view();
292 self.digest_utf8_array_impl::<&StringViewArray>(v)
293 }
294 other => {
295 return exec_err!("unsupported type for digest_utf_array: {other:?}")
296 }
297 };
298 Ok(ColumnarValue::Array(array))
299 }
300
301 pub fn digest_utf8_array_impl<'a, StringArrType>(
302 self,
303 input_value: StringArrType,
304 ) -> ArrayRef
305 where
306 StringArrType: StringArrayType<'a>,
307 {
308 match self {
309 Self::Md5 => digest_to_array!(Md5, input_value),
310 Self::Sha224 => digest_to_array!(Sha224, input_value),
311 Self::Sha256 => digest_to_array!(Sha256, input_value),
312 Self::Sha384 => digest_to_array!(Sha384, input_value),
313 Self::Sha512 => digest_to_array!(Sha512, input_value),
314 Self::Blake2b => digest_to_array!(Blake2b512, input_value),
315 Self::Blake2s => digest_to_array!(Blake2s256, input_value),
316 Self::Blake3 => {
317 let binary_array: BinaryArray = input_value
318 .iter()
319 .map(|opt| {
320 opt.map(|x| {
321 let mut digest = Blake3::default();
322 digest.update(x.as_bytes());
323 Blake3::finalize(&digest).as_bytes().to_vec()
324 })
325 })
326 .collect();
327 Arc::new(binary_array)
328 }
329 }
330 }
331}
332pub fn digest_process(
333 value: &ColumnarValue,
334 digest_algorithm: DigestAlgorithm,
335) -> Result<ColumnarValue> {
336 match value {
337 ColumnarValue::Array(a) => match a.data_type() {
338 DataType::Utf8View => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
339 DataType::Utf8 => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
340 DataType::LargeUtf8 => digest_algorithm.digest_utf8_array::<i64>(a.as_ref()),
341 DataType::Binary => digest_algorithm.digest_binary_array::<i32>(a.as_ref()),
342 DataType::LargeBinary => {
343 digest_algorithm.digest_binary_array::<i64>(a.as_ref())
344 }
345 other => exec_err!(
346 "Unsupported data type {other:?} for function {digest_algorithm}"
347 ),
348 },
349 ColumnarValue::Scalar(scalar) => match scalar {
350 ScalarValue::Utf8View(a)
351 | ScalarValue::Utf8(a)
352 | ScalarValue::LargeUtf8(a) => {
353 Ok(digest_algorithm
354 .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes())))
355 }
356 ScalarValue::Binary(a) | ScalarValue::LargeBinary(a) => Ok(digest_algorithm
357 .digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
358 other => exec_err!(
359 "Unsupported data type {other:?} for function {digest_algorithm}"
360 ),
361 },
362 }
363}