From 3102dd67d2464732751fd6e80e6989067b16d68e Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Fri, 9 Jan 2026 22:47:59 +0800 Subject: [PATCH 1/8] add spark_hex bench --- datafusion/spark/Cargo.toml | 4 + datafusion/spark/benches/hex.rs | 152 ++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 datafusion/spark/benches/hex.rs diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 673b62c5c3485..0783f8803039b 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -65,3 +65,7 @@ name = "char" [[bench]] harness = false name = "space" + +[[bench]] +harness = false +name = "hex" diff --git a/datafusion/spark/benches/hex.rs b/datafusion/spark/benches/hex.rs new file mode 100644 index 0000000000000..756352b034c34 --- /dev/null +++ b/datafusion/spark/benches/hex.rs @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use arrow::array::*; +use arrow::datatypes::*; +use criterion::{Criterion, criterion_group, criterion_main}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_spark::function::math::hex::SparkHex; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; + +fn seedable_rng() -> StdRng { + StdRng::seed_from_u64(42) +} + +fn generate_int64_data(size: usize, null_density: f32) -> PrimitiveArray { + let mut rng = seedable_rng(); + (0..size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range::(-999_999_999_999..999_999_999_999)) + } + }) + .collect() +} + +fn generate_utf8_data(size: usize, null_density: f32) -> StringArray { + let mut rng = seedable_rng(); + let mut builder = StringBuilder::new(); + for _ in 0..size { + if rng.random::() < null_density { + builder.append_null(); + } else { + let len = rng.random_range::(1..=100); + let s: String = + std::iter::repeat_with(|| rng.random_range(b'a'..=b'z') as char) + .take(len) + .collect(); + builder.append_value(&s); + } + } + builder.finish() +} + +fn generate_binary_data(size: usize, null_density: f32) -> BinaryArray { + let mut rng = seedable_rng(); + let mut builder = BinaryBuilder::new(); + for _ in 0..size { + if rng.random::() < null_density { + builder.append_null(); + } else { + let len = rng.random_range::(1..=100); + let bytes: Vec = (0..len).map(|_| rng.random()).collect(); + builder.append_value(&bytes); + } + } + builder.finish() +} + +fn generate_int64_dict_data( + size: usize, + null_density: f32, +) -> DictionaryArray { + let mut rng = seedable_rng(); + let mut builder = PrimitiveDictionaryBuilder::::new(); + for _ in 0..size { + if rng.random::() < null_density { + builder.append_null(); + } else { + builder.append_value( + rng.random_range::(-999_999_999_999..999_999_999_999), + ); + } + } + builder.finish() +} + +fn run_benchmark(c: &mut Criterion, name: &str, size: usize, array: Arc) { + let hex_func = SparkHex::new(); + let args = vec![ColumnarValue::Array(array)]; + let arg_fields: Vec<_> = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect(); + let config_options = Arc::new(ConfigOptions::default()); + + c.bench_function(&format!("{name}/size={size}"), |b| { + b.iter(|| { + black_box( + hex_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: size, + return_field: Arc::new(Field::new("f", DataType::Utf8, true)), + config_options: Arc::clone(&config_options), + }) + .unwrap(), + ) + }) + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + let sizes = vec![1024, 4096, 8192]; + let null_density = 0.1; + + for &size in &sizes { + let data = generate_int64_data(size, null_density); + run_benchmark(c, "hex_int64", size, Arc::new(data)); + } + + for &size in &sizes { + let data = generate_utf8_data(size, null_density); + run_benchmark(c, "hex_utf8", size, Arc::new(data)); + } + + for &size in &sizes { + let data = generate_binary_data(size, null_density); + run_benchmark(c, "hex_binary", size, Arc::new(data)); + } + + for &size in &sizes { + let data = generate_int64_dict_data(size, null_density); + run_benchmark(c, "hex_int64_dict", size, Arc::new(data)); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From 268a9f1ac85572364e6982294cd7571baa9f342e Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Fri, 9 Jan 2026 23:41:56 +0800 Subject: [PATCH 2/8] optimize hex --- datafusion/spark/src/function/math/hex.rs | 232 +++++++++++++--------- 1 file changed, 138 insertions(+), 94 deletions(-) diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs index ef62b08fb03d2..3582c58f2241e 100644 --- a/datafusion/spark/src/function/math/hex.rs +++ b/datafusion/spark/src/function/math/hex.rs @@ -16,9 +16,10 @@ // under the License. use std::any::Any; +use std::str::from_utf8_unchecked; use std::sync::Arc; -use arrow::array::{Array, StringArray}; +use arrow::array::{Array, StringBuilder}; use arrow::datatypes::DataType; use arrow::{ array::{as_dictionary_array, as_largestring_array, as_string_array}, @@ -110,8 +111,26 @@ impl ScalarUDFImpl for SparkHex { } } -fn hex_int64(num: i64) -> String { - format!("{num:X}") +#[inline] +fn hex_int64(num: i64, buffer: &mut Vec) { + const HEX_CHARS: &[u8; 16] = b"0123456789ABCDEF"; + + if num == 0 { + buffer.push(HEX_CHARS[0]); + return; + } + + let mut n = num; + let mut temp = [0u8; 16]; + let mut i = 16; + while n != 0 && i > 0 { + i -= 1; + let digest = (n & 0xF) as u8; + temp[i] = HEX_CHARS[digest as usize]; + n >>= 4; + } + + buffer.extend_from_slice(&temp[i..]); } /// Hex encoding lookup tables for fast byte-to-hex conversion @@ -119,28 +138,77 @@ const HEX_CHARS_LOWER: &[u8; 16] = b"0123456789abcdef"; const HEX_CHARS_UPPER: &[u8; 16] = b"0123456789ABCDEF"; #[inline] -fn hex_encode>(data: T, lower_case: bool) -> String { +fn hex_encode>(data: T, lower_case: bool, buffer: &mut Vec) { let bytes = data.as_ref(); - let mut s = String::with_capacity(bytes.len() * 2); let hex_chars = if lower_case { HEX_CHARS_LOWER } else { HEX_CHARS_UPPER }; for &b in bytes { - s.push(hex_chars[(b >> 4) as usize] as char); - s.push(hex_chars[(b & 0x0f) as usize] as char); + buffer.push(hex_chars[(b >> 4) as usize]); + buffer.push(hex_chars[(b & 0x0f) as usize]); } - s } -#[inline(always)] -fn hex_bytes>( - bytes: T, +/// Generic hex encoding for byte array types +fn hex_encode_bytes<'a, I, T>( + iter: I, lowercase: bool, -) -> Result { - let hex_string = hex_encode(bytes, lowercase); - Ok(hex_string) + len: usize, +) -> Result +where + I: Iterator>, + T: AsRef<[u8]> + 'a, +{ + let mut builder = StringBuilder::with_capacity(len, len * 64); + let mut buffer = Vec::with_capacity(16); + let hex_chars = if lowercase { + HEX_CHARS_LOWER + } else { + HEX_CHARS_UPPER + }; + + for v in iter { + if let Some(b) = v { + buffer.clear(); + let bytes = b.as_ref(); + for &byte in bytes { + buffer.push(hex_chars[(byte >> 4) as usize]); + buffer.push(hex_chars[(byte & 0x0f) as usize]); + } + unsafe { + builder.append_value(from_utf8_unchecked(&buffer)); + } + } else { + builder.append_null(); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) +} + +/// Generic hex encoding for int64 type +fn hex_encode_int64<'a, I>(iter: I, len: usize) -> Result +where + I: Iterator>, +{ + let mut builder = StringBuilder::with_capacity(len, len * 64); + let mut buffer = Vec::with_capacity(16); + + for v in iter { + if let Some(num) = v { + buffer.clear(); + hex_int64(num, &mut buffer); + unsafe { + builder.append_value(from_utf8_unchecked(&buffer)); + } + } else { + builder.append_null(); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } /// Spark-compatible `hex` function @@ -166,103 +234,72 @@ pub fn compute_hex( ColumnarValue::Array(array) => match array.data_type() { DataType::Int64 => { let array = as_int64_array(array)?; - - let hexed_array: StringArray = - array.iter().map(|v| v.map(hex_int64)).collect(); - - Ok(ColumnarValue::Array(Arc::new(hexed_array))) + hex_encode_int64(array.iter(), array.len()) } DataType::Utf8 => { let array = as_string_array(array); - - let hexed: StringArray = array - .iter() - .map(|v| v.map(|b| hex_bytes(b, lowercase)).transpose()) - .collect::>()?; - - Ok(ColumnarValue::Array(Arc::new(hexed))) + hex_encode_bytes(array.iter(), lowercase, array.len()) } DataType::Utf8View => { let array = as_string_view_array(array)?; - - let hexed: StringArray = array - .iter() - .map(|v| v.map(|b| hex_bytes(b, lowercase)).transpose()) - .collect::>()?; - - Ok(ColumnarValue::Array(Arc::new(hexed))) + hex_encode_bytes(array.iter(), lowercase, array.len()) } DataType::LargeUtf8 => { let array = as_largestring_array(array); - - let hexed: StringArray = array - .iter() - .map(|v| v.map(|b| hex_bytes(b, lowercase)).transpose()) - .collect::>()?; - - Ok(ColumnarValue::Array(Arc::new(hexed))) + hex_encode_bytes(array.iter(), lowercase, array.len()) } DataType::Binary => { let array = as_binary_array(array)?; - - let hexed: StringArray = array - .iter() - .map(|v| v.map(|b| hex_bytes(b, lowercase)).transpose()) - .collect::>()?; - - Ok(ColumnarValue::Array(Arc::new(hexed))) + hex_encode_bytes(array.iter(), lowercase, array.len()) } DataType::LargeBinary => { let array = as_large_binary_array(array)?; - - let hexed: StringArray = array - .iter() - .map(|v| v.map(|b| hex_bytes(b, lowercase)).transpose()) - .collect::>()?; - - Ok(ColumnarValue::Array(Arc::new(hexed))) + hex_encode_bytes(array.iter(), lowercase, array.len()) } DataType::FixedSizeBinary(_) => { let array = as_fixed_size_binary_array(array)?; - - let hexed: StringArray = array - .iter() - .map(|v| v.map(|b| hex_bytes(b, lowercase)).transpose()) - .collect::>()?; - - Ok(ColumnarValue::Array(Arc::new(hexed))) + hex_encode_bytes(array.iter(), lowercase, array.len()) } DataType::Dictionary(_, value_type) => { let dict = as_dictionary_array::(&array); - - let values = match **value_type { - DataType::Int64 => as_int64_array(dict.values())? - .iter() - .map(|v| v.map(hex_int64)) - .collect::>(), - DataType::Utf8 => as_string_array(dict.values()) - .iter() - .map(|v| v.map(|b| hex_bytes(b, lowercase)).transpose()) - .collect::>()?, - DataType::Binary => as_binary_array(dict.values())? - .iter() - .map(|v| v.map(|b| hex_bytes(b, lowercase)).transpose()) - .collect::>()?, - _ => exec_err!( - "hex got an unexpected argument type: {}", - array.data_type() - )?, - }; - - let new_values: Vec> = dict - .keys() - .iter() - .map(|key| key.map(|k| values[k as usize].clone()).unwrap_or(None)) - .collect(); - - let string_array_values = StringArray::from(new_values); - - Ok(ColumnarValue::Array(Arc::new(string_array_values))) + let keys = dict.keys(); + let values = dict.values(); + // let mut buffer = Vec::with_capacity(16); + + match **value_type { + DataType::Int64 => { + let int_values = as_int64_array(values)?; + hex_encode_int64( + keys.iter().map(|k| k.map(|idx| int_values.value(idx as usize))), + dict.len(), + ) + } + DataType::Utf8 => { + let str_values = as_string_array(values); + hex_encode_bytes( + keys.iter().map(|k| { + k.map(|idx| str_values.value(idx as usize).as_bytes()) + }), + lowercase, + dict.len(), + ) + } + DataType::Binary => { + let bin_values = as_binary_array(values)?; + hex_encode_bytes( + keys.iter() + .map(|k| k.map(|idx| bin_values.value(idx as usize))), + lowercase, + dict.len(), + ) + } + _ => { + exec_err!( + "hex got an unexpected argument type: {}", + array.data_type() + ) + } + } } _ => exec_err!("hex got an unexpected argument type: {}", array.data_type()), }, @@ -272,6 +309,7 @@ pub fn compute_hex( #[cfg(test)] mod test { + use std::str::from_utf8_unchecked; use std::sync::Arc; use arrow::array::{Int64Array, StringArray}; @@ -374,12 +412,18 @@ mod test { #[test] fn test_hex_int64() { let num = 1234; - let hexed = super::hex_int64(num); - assert_eq!(hexed, "4D2".to_string()); + let mut cache = Vec::with_capacity(16); + super::hex_int64(num, &mut cache); + unsafe { + assert_eq!(from_utf8_unchecked(&cache), "4D2".to_string()); + } let num = -1; - let hexed = super::hex_int64(num); - assert_eq!(hexed, "FFFFFFFFFFFFFFFF".to_string()); + cache.clear(); + super::hex_int64(num, &mut cache); + unsafe { + assert_eq!(from_utf8_unchecked(&cache), "FFFFFFFFFFFFFFFF".to_string()); + } } #[test] From db49fffb7a1f82bf73fe6b5300fc3789ad1ceace Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Fri, 9 Jan 2026 23:44:23 +0800 Subject: [PATCH 3/8] cargo fmt --- datafusion/spark/src/function/math/hex.rs | 33 +++++++---------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs index 3582c58f2241e..99c6b1dece5bb 100644 --- a/datafusion/spark/src/function/math/hex.rs +++ b/datafusion/spark/src/function/math/hex.rs @@ -111,12 +111,14 @@ impl ScalarUDFImpl for SparkHex { } } +/// Hex encoding lookup tables for fast byte-to-hex conversion +const HEX_CHARS_LOWER: &[u8; 16] = b"0123456789abcdef"; +const HEX_CHARS_UPPER: &[u8; 16] = b"0123456789ABCDEF"; + #[inline] fn hex_int64(num: i64, buffer: &mut Vec) { - const HEX_CHARS: &[u8; 16] = b"0123456789ABCDEF"; - if num == 0 { - buffer.push(HEX_CHARS[0]); + buffer.push(HEX_CHARS_UPPER[0]); return; } @@ -126,31 +128,13 @@ fn hex_int64(num: i64, buffer: &mut Vec) { while n != 0 && i > 0 { i -= 1; let digest = (n & 0xF) as u8; - temp[i] = HEX_CHARS[digest as usize]; + temp[i] = HEX_CHARS_UPPER[digest as usize]; n >>= 4; } buffer.extend_from_slice(&temp[i..]); } -/// Hex encoding lookup tables for fast byte-to-hex conversion -const HEX_CHARS_LOWER: &[u8; 16] = b"0123456789abcdef"; -const HEX_CHARS_UPPER: &[u8; 16] = b"0123456789ABCDEF"; - -#[inline] -fn hex_encode>(data: T, lower_case: bool, buffer: &mut Vec) { - let bytes = data.as_ref(); - let hex_chars = if lower_case { - HEX_CHARS_LOWER - } else { - HEX_CHARS_UPPER - }; - for &b in bytes { - buffer.push(hex_chars[(b >> 4) as usize]); - buffer.push(hex_chars[(b & 0x0f) as usize]); - } -} - /// Generic hex encoding for byte array types fn hex_encode_bytes<'a, I, T>( iter: I, @@ -189,7 +173,7 @@ where } /// Generic hex encoding for int64 type -fn hex_encode_int64<'a, I>(iter: I, len: usize) -> Result +fn hex_encode_int64(iter: I, len: usize) -> Result where I: Iterator>, { @@ -270,7 +254,8 @@ pub fn compute_hex( DataType::Int64 => { let int_values = as_int64_array(values)?; hex_encode_int64( - keys.iter().map(|k| k.map(|idx| int_values.value(idx as usize))), + keys.iter() + .map(|k| k.map(|idx| int_values.value(idx as usize))), dict.len(), ) } From 8b20f93dc5b1fbf1438cebe16bb957fee2a03f66 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Sun, 11 Jan 2026 11:16:09 +0800 Subject: [PATCH 4/8] optimize hex --- datafusion/spark/src/function/math/hex.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs index 99c6b1dece5bb..f2ae04cd9ca7a 100644 --- a/datafusion/spark/src/function/math/hex.rs +++ b/datafusion/spark/src/function/math/hex.rs @@ -146,7 +146,7 @@ where T: AsRef<[u8]> + 'a, { let mut builder = StringBuilder::with_capacity(len, len * 64); - let mut buffer = Vec::with_capacity(16); + let mut buffer = Vec::with_capacity(64); let hex_chars = if lowercase { HEX_CHARS_LOWER } else { @@ -177,7 +177,7 @@ fn hex_encode_int64(iter: I, len: usize) -> Result>, { - let mut builder = StringBuilder::with_capacity(len, len * 64); + let mut builder = StringBuilder::with_capacity(len, len * 16); let mut buffer = Vec::with_capacity(16); for v in iter { @@ -248,7 +248,6 @@ pub fn compute_hex( let dict = as_dictionary_array::(&array); let keys = dict.keys(); let values = dict.values(); - // let mut buffer = Vec::with_capacity(16); match **value_type { DataType::Int64 => { From ec42662319a2a30958ea187b08c50ff5185ab54c Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Sun, 11 Jan 2026 12:17:46 +0800 Subject: [PATCH 5/8] optimize hex_int64 --- datafusion/spark/src/function/math/hex.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs index f2ae04cd9ca7a..6b153f9504664 100644 --- a/datafusion/spark/src/function/math/hex.rs +++ b/datafusion/spark/src/function/math/hex.rs @@ -122,10 +122,10 @@ fn hex_int64(num: i64, buffer: &mut Vec) { return; } - let mut n = num; + let mut n = num as u64; let mut temp = [0u8; 16]; let mut i = 16; - while n != 0 && i > 0 { + while n != 0 { i -= 1; let digest = (n & 0xF) as u8; temp[i] = HEX_CHARS_UPPER[digest as usize]; From 45daf86c84a0b7f3d468cf9c091b8e4049ce9c8a Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Mon, 12 Jan 2026 22:17:57 +0800 Subject: [PATCH 6/8] optimize hex_int64 --- datafusion/spark/src/function/math/hex.rs | 41 ++++++++++------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs index 6b153f9504664..2a26d17c4d918 100644 --- a/datafusion/spark/src/function/math/hex.rs +++ b/datafusion/spark/src/function/math/hex.rs @@ -116,23 +116,19 @@ const HEX_CHARS_LOWER: &[u8; 16] = b"0123456789abcdef"; const HEX_CHARS_UPPER: &[u8; 16] = b"0123456789ABCDEF"; #[inline] -fn hex_int64(num: i64, buffer: &mut Vec) { +fn hex_int64(num: i64, buffer: &mut [u8; 16]) -> &[u8] { if num == 0 { - buffer.push(HEX_CHARS_UPPER[0]); - return; + return b"0"; } let mut n = num as u64; - let mut temp = [0u8; 16]; let mut i = 16; while n != 0 { i -= 1; - let digest = (n & 0xF) as u8; - temp[i] = HEX_CHARS_UPPER[digest as usize]; + buffer[i] = HEX_CHARS_UPPER[(n & 0xF) as usize]; n >>= 4; } - - buffer.extend_from_slice(&temp[i..]); + &buffer[i..] } /// Generic hex encoding for byte array types @@ -161,6 +157,7 @@ where buffer.push(hex_chars[(byte >> 4) as usize]); buffer.push(hex_chars[(byte & 0x0f) as usize]); } + // SAFETY: buffer contains only ASCII hex digests, which are valid UTF-8 unsafe { builder.append_value(from_utf8_unchecked(&buffer)); } @@ -178,14 +175,14 @@ where I: Iterator>, { let mut builder = StringBuilder::with_capacity(len, len * 16); - let mut buffer = Vec::with_capacity(16); for v in iter { if let Some(num) = v { - buffer.clear(); - hex_int64(num, &mut buffer); + let mut temp = [0u8; 16]; + let slice = hex_int64(num, &mut temp); + // SAFETY: slice contains only ASCII hex digests, which are valid UTF-8 unsafe { - builder.append_value(from_utf8_unchecked(&buffer)); + builder.append_value(from_utf8_unchecked(&slice)); } } else { builder.append_null(); @@ -395,18 +392,16 @@ mod test { #[test] fn test_hex_int64() { - let num = 1234; - let mut cache = Vec::with_capacity(16); - super::hex_int64(num, &mut cache); - unsafe { - assert_eq!(from_utf8_unchecked(&cache), "4D2".to_string()); - } + let test_cases = vec![(1234, "4D2"), (-1, "FFFFFFFFFFFFFFFF")]; - let num = -1; - cache.clear(); - super::hex_int64(num, &mut cache); - unsafe { - assert_eq!(from_utf8_unchecked(&cache), "FFFFFFFFFFFFFFFF".to_string()); + for (num, expected) in test_cases { + let mut cache = [0u8; 16]; + let slice = super::hex_int64(num, &mut cache); + + unsafe { + let result = from_utf8_unchecked(&slice); + assert_eq!(expected, result); + } } } From f79aed213477536143f0228c8c1275a180c1b7a7 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Mon, 12 Jan 2026 23:13:29 +0800 Subject: [PATCH 7/8] fix dictionary null values --- datafusion/spark/src/function/math/hex.rs | 57 +++++++++++++++++++---- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs index 2a26d17c4d918..d04a15d408a96 100644 --- a/datafusion/spark/src/function/math/hex.rs +++ b/datafusion/spark/src/function/math/hex.rs @@ -182,7 +182,7 @@ where let slice = hex_int64(num, &mut temp); // SAFETY: slice contains only ASCII hex digests, which are valid UTF-8 unsafe { - builder.append_value(from_utf8_unchecked(&slice)); + builder.append_value(from_utf8_unchecked(slice)); } } else { builder.append_null(); @@ -250,8 +250,15 @@ pub fn compute_hex( DataType::Int64 => { let int_values = as_int64_array(values)?; hex_encode_int64( - keys.iter() - .map(|k| k.map(|idx| int_values.value(idx as usize))), + keys.iter().map(|k| { + k.and_then(|idx| { + if int_values.is_valid(idx as usize) { + Some(int_values.value(idx as usize)) + } else { + None + } + }) + }), dict.len(), ) } @@ -259,7 +266,13 @@ pub fn compute_hex( let str_values = as_string_array(values); hex_encode_bytes( keys.iter().map(|k| { - k.map(|idx| str_values.value(idx as usize).as_bytes()) + k.and_then(|idx| { + if str_values.is_valid(idx as usize) { + Some(str_values.value(idx as usize).as_bytes()) + } else { + None + } + }) }), lowercase, dict.len(), @@ -268,8 +281,15 @@ pub fn compute_hex( DataType::Binary => { let bin_values = as_binary_array(values)?; hex_encode_bytes( - keys.iter() - .map(|k| k.map(|idx| bin_values.value(idx as usize))), + keys.iter().map(|k| { + k.and_then(|idx| { + if bin_values.is_valid(idx as usize) { + Some(bin_values.value(idx as usize)) + } else { + None + } + }) + }), lowercase, dict.len(), ) @@ -293,7 +313,7 @@ mod test { use std::str::from_utf8_unchecked; use std::sync::Arc; - use arrow::array::{Int64Array, StringArray}; + use arrow::array::{DictionaryArray, Int32Array, Int64Array, StringArray}; use arrow::{ array::{ BinaryDictionaryBuilder, PrimitiveDictionaryBuilder, StringBuilder, @@ -399,7 +419,7 @@ mod test { let slice = super::hex_int64(num, &mut cache); unsafe { - let result = from_utf8_unchecked(&slice); + let result = from_utf8_unchecked(slice); assert_eq!(expected, result); } } @@ -426,4 +446,25 @@ mod test { assert_eq!(string_array, &expected_array); } + + #[test] + fn test_dict_values_null() { + let keys = Int32Array::from(vec![Some(0), None, Some(1)]); + let vals = Int64Array::from(vec![Some(32), None]); + // [32, null, null] + let dict = DictionaryArray::new(keys, Arc::new(vals)); + + let columnar_value = ColumnarValue::Array(Arc::new(dict)); + let result = super::spark_hex(&[columnar_value]).unwrap(); + + let result = match result { + ColumnarValue::Array(array) => array, + _ => panic!("Expected array"), + }; + + let result = as_string_array(&result); + let expected = StringArray::from(vec![Some("20"), None, None]); + + assert_eq!(&expected, result); + } } From 604cc44bfadb1a1561f94afdd8ca14c716108e8e Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 13 Jan 2026 21:46:40 +0800 Subject: [PATCH 8/8] simplify dictionary handling --- datafusion/spark/src/function/math/hex.rs | 51 ++++------------------- 1 file changed, 7 insertions(+), 44 deletions(-) diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs index d04a15d408a96..134324f45f5bc 100644 --- a/datafusion/spark/src/function/math/hex.rs +++ b/datafusion/spark/src/function/math/hex.rs @@ -19,7 +19,7 @@ use std::any::Any; use std::str::from_utf8_unchecked; use std::sync::Arc; -use arrow::array::{Array, StringBuilder}; +use arrow::array::{Array, BinaryArray, Int64Array, StringArray, StringBuilder}; use arrow::datatypes::DataType; use arrow::{ array::{as_dictionary_array, as_largestring_array, as_string_array}, @@ -243,56 +243,19 @@ pub fn compute_hex( } DataType::Dictionary(_, value_type) => { let dict = as_dictionary_array::(&array); - let keys = dict.keys(); - let values = dict.values(); match **value_type { DataType::Int64 => { - let int_values = as_int64_array(values)?; - hex_encode_int64( - keys.iter().map(|k| { - k.and_then(|idx| { - if int_values.is_valid(idx as usize) { - Some(int_values.value(idx as usize)) - } else { - None - } - }) - }), - dict.len(), - ) + let arr = dict.downcast_dict::().unwrap(); + hex_encode_int64(arr.into_iter(), dict.len()) } DataType::Utf8 => { - let str_values = as_string_array(values); - hex_encode_bytes( - keys.iter().map(|k| { - k.and_then(|idx| { - if str_values.is_valid(idx as usize) { - Some(str_values.value(idx as usize).as_bytes()) - } else { - None - } - }) - }), - lowercase, - dict.len(), - ) + let arr = dict.downcast_dict::().unwrap(); + hex_encode_bytes(arr.into_iter(), lowercase, dict.len()) } DataType::Binary => { - let bin_values = as_binary_array(values)?; - hex_encode_bytes( - keys.iter().map(|k| { - k.and_then(|idx| { - if bin_values.is_valid(idx as usize) { - Some(bin_values.value(idx as usize)) - } else { - None - } - }) - }), - lowercase, - dict.len(), - ) + let arr = dict.downcast_dict::().unwrap(); + hex_encode_bytes(arr.into_iter(), lowercase, dict.len()) } _ => { exec_err!(