diff --git a/datafusion/functions-nested/benches/map.rs b/datafusion/functions-nested/benches/map.rs index e50c4659b17cd..12db634d34dca 100644 --- a/datafusion/functions-nested/benches/map.rs +++ b/datafusion/functions-nested/benches/map.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{Int32Array, ListArray, StringArray}; +use arrow::array::{ArrayRef, BinaryArray, Int32Array, ListArray, StringArray}; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::Field; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_common::ScalarValue; use datafusion_common::config::ConfigOptions; @@ -28,33 +28,100 @@ use datafusion_functions_nested::planner::NestedFunctionPlanner; use rand::Rng; use rand::prelude::ThreadRng; use std::collections::HashSet; +use std::hash::Hash; use std::hint::black_box; use std::sync::Arc; -fn keys(rng: &mut ThreadRng) -> Vec { - let mut keys = HashSet::with_capacity(1000); +const MAP_ROWS: usize = 1000; +const MAP_KEYS_PER_ROW: usize = 1000; - while keys.len() < 1000 { - keys.insert(rng.random_range(0..10000).to_string()); +fn gen_unique_values( + rng: &mut ThreadRng, + mut make_value: impl FnMut(i32) -> T, +) -> Vec +where + T: Eq + Hash, +{ + let mut values = HashSet::with_capacity(MAP_KEYS_PER_ROW); + + while values.len() < MAP_KEYS_PER_ROW { + values.insert(make_value(rng.random_range(0..10000))); } - keys.into_iter().collect() + values.into_iter().collect() } -fn values(rng: &mut ThreadRng) -> Vec { - let mut values = HashSet::with_capacity(1000); +fn gen_repeat_values(values: &[T], repeats: usize) -> Vec { + let mut repeated = Vec::with_capacity(values.len() * repeats); - while values.len() < 1000 { - values.insert(rng.random_range(0..10000)); + for _ in 0..repeats { + repeated.extend_from_slice(values); } - values.into_iter().collect() + + repeated +} + +fn gen_utf8_values(rng: &mut ThreadRng) -> Vec { + gen_unique_values(rng, |value| value.to_string()) +} + +fn gen_binary_values(rng: &mut ThreadRng) -> Vec> { + gen_unique_values(rng, |value| value.to_le_bytes().to_vec()) +} + +fn gen_primitive_values(rng: &mut ThreadRng) -> Vec { + gen_unique_values(rng, |value| value) +} + +fn list_array(values: ArrayRef, row_count: usize, values_per_row: usize) -> ArrayRef { + let offsets = (0..=row_count) + .map(|index| (index * values_per_row) as i32) + .collect::>(); + Arc::new(ListArray::new( + Arc::new(Field::new_list_field(values.data_type().clone(), true)), + OffsetBuffer::new(ScalarBuffer::from(offsets)), + values, + None, + )) +} + +fn bench_map_case(c: &mut Criterion, name: &str, keys: ArrayRef, values: ArrayRef) { + let number_rows = keys.len(); + let keys = ColumnarValue::Array(keys); + let values = ColumnarValue::Array(values); + + let return_type = map_udf() + .return_type(&[keys.data_type(), values.data_type()]) + .expect("should get return type"); + let arg_fields = vec![ + Field::new("a", keys.data_type(), true).into(), + Field::new("a", values.data_type(), true).into(), + ]; + let return_field = Field::new("f", return_type, true).into(); + let config_options = Arc::new(ConfigOptions::default()); + + c.bench_function(name, |b| { + b.iter(|| { + black_box( + map_udf() + .invoke_with_args(ScalarFunctionArgs { + args: vec![keys.clone(), values.clone()], + arg_fields: arg_fields.clone(), + number_rows, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("map should work on valid values"), + ); + }); + }); } fn criterion_benchmark(c: &mut Criterion) { c.bench_function("make_map_1000", |b| { let mut rng = rand::rng(); - let keys = keys(&mut rng); - let values = values(&mut rng); + let keys = gen_utf8_values(&mut rng); + let values = gen_primitive_values(&mut rng); let mut buffer = Vec::new(); for i in 0..1000 { buffer.push(Expr::Literal( @@ -63,9 +130,7 @@ fn criterion_benchmark(c: &mut Criterion) { )); buffer.push(Expr::Literal(ScalarValue::Int32(Some(values[i])), None)); } - let planner = NestedFunctionPlanner {}; - b.iter(|| { black_box( planner @@ -75,51 +140,51 @@ fn criterion_benchmark(c: &mut Criterion) { }); }); - c.bench_function("map_1000", |b| { - let mut rng = rand::rng(); - let field = Arc::new(Field::new_list_field(DataType::Utf8, true)); - let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000])); - let key_list = ListArray::new( - field, - offsets, - Arc::new(StringArray::from(keys(&mut rng))), - None, - ); - let field = Arc::new(Field::new_list_field(DataType::Int32, true)); - let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000])); - let value_list = ListArray::new( - field, - offsets, - Arc::new(Int32Array::from(values(&mut rng))), - None, - ); - let keys = ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list))); - let values = ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list))); - - let return_type = map_udf() - .return_type(&[keys.data_type(), values.data_type()]) - .expect("should get return type"); - let arg_fields = vec![ - Field::new("a", keys.data_type(), true).into(), - Field::new("a", values.data_type(), true).into(), - ]; - let return_field = Field::new("f", return_type, true).into(); - let config_options = Arc::new(ConfigOptions::default()); + let mut rng = rand::rng(); + let values = Arc::new(Int32Array::from(gen_repeat_values( + &gen_primitive_values(&mut rng), + MAP_ROWS, + ))) as ArrayRef; + let values = list_array(values, MAP_ROWS, MAP_KEYS_PER_ROW); + let map_cases = [ + ( + "map_1000_utf8", + list_array( + Arc::new(StringArray::from(gen_repeat_values( + &gen_utf8_values(&mut rng), + MAP_ROWS, + ))) as ArrayRef, + MAP_ROWS, + MAP_KEYS_PER_ROW, + ), + ), + ( + "map_1000_binary", + list_array( + Arc::new(BinaryArray::from_iter_values(gen_repeat_values( + &gen_binary_values(&mut rng), + MAP_ROWS, + ))) as ArrayRef, + MAP_ROWS, + MAP_KEYS_PER_ROW, + ), + ), + ( + "map_1000_int32", + list_array( + Arc::new(Int32Array::from(gen_repeat_values( + &gen_primitive_values(&mut rng), + MAP_ROWS, + ))) as ArrayRef, + MAP_ROWS, + MAP_KEYS_PER_ROW, + ), + ), + ]; - b.iter(|| { - black_box( - map_udf() - .invoke_with_args(ScalarFunctionArgs { - args: vec![keys.clone(), values.clone()], - arg_fields: arg_fields.clone(), - number_rows: 1, - return_field: Arc::clone(&return_field), - config_options: Arc::clone(&config_options), - }) - .expect("map should work on valid values"), - ); - }); - }); + for (name, keys) in map_cases { + bench_map_case(c, name, keys, Arc::clone(&values)); + } } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index 7df131cf5e27e..850f2b37797c3 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -17,11 +17,18 @@ use std::any::Any; use std::collections::VecDeque; +use std::hash::Hash; use std::sync::Arc; -use arrow::array::{Array, ArrayData, ArrayRef, MapArray, OffsetSizeTrait, StructArray}; +use arrow::array::{ + Array, ArrayData, ArrayRef, ArrowPrimitiveType, MapArray, OffsetSizeTrait, + StructArray, cast::AsArray, +}; use arrow::buffer::Buffer; -use arrow::datatypes::{DataType, Field, SchemaBuilder, ToByteSlice}; +use arrow::datatypes::{ + DataType, Date32Type, Date64Type, Field, Int8Type, Int16Type, Int32Type, Int64Type, + SchemaBuilder, ToByteSlice, UInt8Type, UInt16Type, UInt32Type, UInt64Type, +}; use datafusion_common::utils::{fixed_size_list_to_arrays, list_to_arrays}; use datafusion_common::{ @@ -65,23 +72,30 @@ fn make_map_batch(args: &[ColumnarValue]) -> Result { let key_array = keys.as_ref(); match keys_arg { - ColumnarValue::Array(_) => { - let row_keys = match key_array.data_type() { - DataType::List(_) => list_to_arrays::(&keys), - DataType::LargeList(_) => list_to_arrays::(&keys), - DataType::FixedSizeList(_, _) => fixed_size_list_to_arrays(&keys), - data_type => { - return exec_err!( - "Expected list, large_list or fixed_size_list, got {:?}", - data_type - ); - } - }; - - row_keys + ColumnarValue::Array(_) => match key_array.data_type() { + DataType::List(_) => keys + .as_list::() .iter() - .try_for_each(|key| validate_map_keys(key.as_ref()))?; - } + .flatten() + .try_for_each(|row| validate_map_keys(row.as_ref()))?, + DataType::LargeList(_) => keys + .as_list::() + .iter() + .flatten() + .try_for_each(|row| validate_map_keys(row.as_ref()))?, + DataType::FixedSizeList(_, _) => { + keys.as_fixed_size_list() + .iter() + .flatten() + .try_for_each(|row| validate_map_keys(row.as_ref()))? + } + data_type => { + return exec_err!( + "Expected list, large_list or fixed_size_list, got {:?}", + data_type + ); + } + }, ColumnarValue::Scalar(_) => { validate_map_keys(key_array)?; } @@ -92,8 +106,65 @@ fn make_map_batch(args: &[ColumnarValue]) -> Result { make_map_batch_internal(&keys, &values, can_evaluate_to_const, &keys_arg.data_type()) } -/// Validates that map keys are non-null and unique. -fn validate_map_keys(array: &dyn Array) -> Result<()> { +fn validate_unique_primitive_keys(array: &dyn Array) -> Result<()> +where + T::Native: Copy + Eq + Hash + std::fmt::Display, +{ + let primitive_array = array.as_primitive::(); + if primitive_array.null_count() > 0 { + return exec_err!("map key cannot be null"); + } + + if let Some(value) = find_duplicate_value( + primitive_array.len(), + primitive_array.values().iter().copied(), + ) { + return exec_err!("map key must be unique, duplicate key found: {}", value); + } + + Ok(()) +} + +fn validate_unique_string_keys(array: &dyn Array) -> Result<()> { + let string_array = array.as_string::(); + if string_array.null_count() > 0 { + return exec_err!("map key cannot be null"); + } + + if let Some(value) = + find_duplicate_value(string_array.len(), string_array.iter().flatten()) + { + return exec_err!("map key must be unique, duplicate key found: {}", value); + } + + Ok(()) +} + +fn validate_unique_binary_keys(array: &dyn Array) -> Result<()> { + let binary_array = array.as_binary::(); + if binary_array.null_count() > 0 { + return exec_err!("map key cannot be null"); + } + + if let Some(value) = + find_duplicate_value(binary_array.len(), binary_array.iter().flatten()) + { + return exec_err!("map key must be unique, duplicate key found: {:?}", value); + } + + Ok(()) +} + +fn find_duplicate_value(len: usize, values: I) -> Option +where + T: Copy + Eq + Hash, + I: IntoIterator, +{ + let mut seen_keys = HashSet::with_capacity(len); + values.into_iter().find(|value| !seen_keys.insert(*value)) +} + +fn validate_unique_keys_generic(array: &dyn Array) -> Result<()> { let mut seen_keys = HashSet::with_capacity(array.len()); for i in 0..array.len() { @@ -113,6 +184,27 @@ fn validate_map_keys(array: &dyn Array) -> Result<()> { Ok(()) } +/// Validates that map keys are non-null and unique. +fn validate_map_keys(array: &dyn Array) -> Result<()> { + match array.data_type() { + DataType::Int8 => validate_unique_primitive_keys::(array), + DataType::Int16 => validate_unique_primitive_keys::(array), + DataType::Int32 => validate_unique_primitive_keys::(array), + DataType::Int64 => validate_unique_primitive_keys::(array), + DataType::UInt8 => validate_unique_primitive_keys::(array), + DataType::UInt16 => validate_unique_primitive_keys::(array), + DataType::UInt32 => validate_unique_primitive_keys::(array), + DataType::UInt64 => validate_unique_primitive_keys::(array), + DataType::Date32 => validate_unique_primitive_keys::(array), + DataType::Date64 => validate_unique_primitive_keys::(array), + DataType::Utf8 => validate_unique_string_keys::(array), + DataType::LargeUtf8 => validate_unique_string_keys::(array), + DataType::Binary => validate_unique_binary_keys::(array), + DataType::LargeBinary => validate_unique_binary_keys::(array), + _ => validate_unique_keys_generic(array), + } +} + fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result { match columnar_value { ColumnarValue::Scalar(value) => match value { @@ -381,7 +473,7 @@ fn make_map_array_internal( let nulls_bitmap = keys.nulls().cloned(); let keys = list_to_arrays::(keys); - let values = list_to_arrays::(values); + let values = list_to_arrays_skipping_null_rows::(values, nulls_bitmap.as_ref()); build_map_array( &keys, @@ -408,7 +500,8 @@ fn make_map_array_from_fixed_size_list( let nulls_bitmap = keys.nulls().cloned(); let keys = fixed_size_list_to_arrays(keys); - let values = fixed_size_list_to_arrays(values); + let values = + fixed_size_list_to_arrays_skipping_null_rows(values, nulls_bitmap.as_ref()); build_map_array( &keys, @@ -419,6 +512,41 @@ fn make_map_array_from_fixed_size_list( nulls_bitmap, ) } +fn list_to_arrays_skipping_null_rows( + array: &ArrayRef, + null_rows: Option<&arrow::buffer::NullBuffer>, +) -> Vec { + array + .as_list::() + .iter() + .enumerate() + .filter_map(|(i, row)| { + if null_rows.is_some_and(|nulls| nulls.is_null(i)) { + None + } else { + row + } + }) + .collect() +} + +fn fixed_size_list_to_arrays_skipping_null_rows( + array: &ArrayRef, + null_rows: Option<&arrow::buffer::NullBuffer>, +) -> Vec { + array + .as_fixed_size_list() + .iter() + .enumerate() + .filter_map(|(i, row)| { + if null_rows.is_some_and(|nulls| nulls.is_null(i)) { + None + } else { + row + } + }) + .collect() +} /// Common logic to build a MapArray from decomposed list arrays fn build_map_array( @@ -429,6 +557,10 @@ fn build_map_array( original_len: usize, nulls_bitmap: Option, ) -> Result { + if keys.len() != values.len() { + return exec_err!("map requires key and value lists to have the same length"); + } + let mut key_array_vec = vec![]; let mut value_array_vec = vec![]; for (k, v) in keys.iter().zip(values.iter()) { diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index 7ea54464d3e99..358f212144157 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -263,6 +263,14 @@ SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'LargeList(Utf8)'), a ---- {POST: 41, HEAD: 33, PATCH: 30} +query ? +SELECT MAP(make_array(X'6162', X'6364'), make_array(1, 2)); +---- +{6162: 1, 6364: 2} + +query error DataFusion error: Execution error: map key must be unique, duplicate key found: \[100, 117, 112\] +SELECT MAP(make_array(X'647570', X'647570'), make_array(1, 2)); + statement ok create table t as values ('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2], 'POST', [[1,2,3]], ['a']),