Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 155 additions & 48 deletions datafusion/physical-expr-common/src/binary_view_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

//! [`ArrowBytesViewMap`] and [`ArrowBytesViewSet`] for storing maps/sets of values from
//! `StringViewArray`/`BinaryViewArray`.
//! Much of the code is from `binary_map.rs`, but with simpler implementation because we directly use the
//! [`GenericByteViewBuilder`].
use crate::binary_map::OutputType;
use ahash::RandomState;
use arrow::array::cast::AsArray;
use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder};
use arrow::array::{Array, ArrayRef, BinaryViewArray, ByteView, make_view};
use arrow::buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
use std::fmt::Debug;
use std::mem::size_of;
use std::sync::Arc;

/// HashSet optimized for storing string or binary values that can produce that
Expand Down Expand Up @@ -113,6 +113,9 @@ impl ArrowBytesViewSet {
/// This map is used by the special `COUNT DISTINCT` aggregate function to
/// store the distinct values, and by the `GROUP BY` operator to store
/// group values when they are a single string array.
/// Max size of the in-progress buffer before flushing to completed buffers
const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;

pub struct ArrowBytesViewMap<V>
where
V: Debug + PartialEq + Eq + Clone + Copy + Default,
Expand All @@ -124,8 +127,15 @@ where
/// Total size of the map in bytes
map_size: usize,

/// Builder for output array
builder: GenericByteViewBuilder<BinaryViewType>,
/// Views for all stored values (in insertion order)
views: Vec<u128>,
/// In-progress buffer for out-of-line string data
in_progress: Vec<u8>,
/// Completed buffers containing string data
completed: Vec<Buffer>,
/// Tracks null values (true = null)
nulls: Vec<bool>,

/// random state used to generate hashes
random_state: RandomState,
/// buffer that stores hash values (reused across batches to save allocations)
Expand All @@ -148,7 +158,10 @@ where
output_type,
map: hashbrown::hash_table::HashTable::with_capacity(INITIAL_MAP_CAPACITY),
map_size: 0,
builder: GenericByteViewBuilder::new(),
views: Vec::new(),
in_progress: Vec::new(),
completed: Vec::new(),
nulls: Vec::new(),
random_state: RandomState::new(),
hashes_buffer: vec![],
null: None,
Expand Down Expand Up @@ -250,52 +263,92 @@ where
// step 2: insert each value into the set, if not already present
let values = values.as_byte_view::<B>();

// Get raw views buffer for direct comparison
let input_views = values.views();

// Ensure lengths are equivalent
assert_eq!(values.len(), batch_hashes.len());
assert_eq!(values.len(), self.hashes_buffer.len());

for i in 0..values.len() {
let view_u128 = input_views[i];
let hash = self.hashes_buffer[i];

for (value, &hash) in values.iter().zip(batch_hashes.iter()) {
// handle null value
let Some(value) = value else {
// handle null value via validity bitmap check
if !values.is_valid(i) {
let payload = if let Some(&(payload, _offset)) = self.null.as_ref() {
payload
} else {
let payload = make_payload_fn(None);
let null_index = self.builder.len();
self.builder.append_null();
let null_index = self.views.len();
self.views.push(0);
self.nulls.push(true);
self.null = Some((payload, null_index));
payload
};
observe_payload_fn(payload);
continue;
};
}

// get the value as bytes
let value: &[u8] = value.as_ref();
// Extract length from the view (first 4 bytes of u128 in little-endian)
let len = view_u128 as u32;

let entry = self.map.find_mut(hash, |header| {
if header.hash != hash {
return false;
}
let v = self.builder.get_value(header.view_idx);
// Check if value already exists
let maybe_payload = {
// Borrow completed and in_progress for comparison
let completed = &self.completed;
let in_progress = &self.in_progress;

v == value
});
self.map
.find(hash, |header| {
if header.hash != hash {
return false;
}

// Fast path: inline strings can be compared directly
if len <= 12 {
return header.view == view_u128;
}

// For larger strings: first compare the 4-byte prefix
let stored_prefix = (header.view >> 32) as u32;
let input_prefix = (view_u128 >> 32) as u32;
if stored_prefix != input_prefix {
return false;
}

// Prefix matched - compare full bytes
let byte_view = ByteView::from(header.view);
let stored_len = byte_view.length as usize;
let buffer_index = byte_view.buffer_index as usize;
let offset = byte_view.offset as usize;

let stored_value = if buffer_index < completed.len() {
&completed[buffer_index].as_slice()
[offset..offset + stored_len]
} else {
&in_progress[offset..offset + stored_len]
};
let input_value: &[u8] = values.value(i).as_ref();
stored_value == input_value
})
.map(|entry| entry.payload)
};

let payload = if let Some(entry) = entry {
entry.payload
let payload = if let Some(payload) = maybe_payload {
payload
} else {
// no existing value, make a new one.
// no existing value, make a new one
let value: &[u8] = values.value(i).as_ref();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid this for inlined values (and it's the same as above input_value, so now it does it twice.

let payload = make_payload_fn(Some(value));

let inner_view_idx = self.builder.len();
// Create view pointing to our buffers
let new_view = self.append_value(value);
let new_header = Entry {
view_idx: inner_view_idx,
view: new_view,
hash,
payload,
};

self.builder.append_value(value);

self.map
.insert_accounted(new_header, |h| h.hash, &mut self.map_size);
payload
Expand All @@ -310,29 +363,67 @@ where
///
/// The values are guaranteed to be returned in the same order in which
/// they were first seen.
pub fn into_state(self) -> ArrayRef {
let mut builder = self.builder;
match self.output_type {
OutputType::BinaryView => {
let array = builder.finish();
pub fn into_state(mut self) -> ArrayRef {
// Flush any remaining in-progress buffer
if !self.in_progress.is_empty() {
let flushed = std::mem::take(&mut self.in_progress);
self.completed.push(Buffer::from_vec(flushed));
}

Arc::new(array)
}
// Build null buffer if we have any nulls
let null_buffer = if self.nulls.iter().any(|&is_null| is_null) {
Some(NullBuffer::from(
self.nulls
.iter()
.map(|&is_null| !is_null)
.collect::<Vec<_>>(),
))
} else {
None
};

let views = ScalarBuffer::from(self.views);
let array =
unsafe { BinaryViewArray::new_unchecked(views, self.completed, null_buffer) };

match self.output_type {
OutputType::BinaryView => Arc::new(array),
OutputType::Utf8View => {
// SAFETY:
// we asserted the input arrays were all the correct type and
// thus since all the values that went in were valid (e.g. utf8)
// so are all the values that come out
let array = builder.finish();
// SAFETY: all input was valid utf8
let array = unsafe { array.to_string_view_unchecked() };
Arc::new(array)
}
_ => {
unreachable!("Utf8/Binary should use `ArrowBytesMap`")
}
_ => unreachable!("Utf8/Binary should use `ArrowBytesMap`"),
}
}

/// Append a value to our buffers and return the view pointing to it
fn append_value(&mut self, value: &[u8]) -> u128 {
let len = value.len();
let view = if len <= 12 {
make_view(value, 0, 0)
} else {
// Ensure buffer is big enough
if self.in_progress.len() + len > BYTE_VIEW_MAX_BLOCK_SIZE {
let flushed = std::mem::replace(
&mut self.in_progress,
Vec::with_capacity(BYTE_VIEW_MAX_BLOCK_SIZE),
);
self.completed.push(Buffer::from_vec(flushed));
}

let buffer_index = self.completed.len() as u32;
let offset = self.in_progress.len() as u32;
self.in_progress.extend_from_slice(value);

make_view(value, buffer_index, offset)
};

self.views.push(view);
self.nulls.push(false);
view
}

/// Total number of entries (including null, if present)
pub fn len(&self) -> usize {
self.non_null_len() + self.null.map(|_| 1).unwrap_or(0)
Expand All @@ -351,8 +442,16 @@ where
/// Return the total size, in bytes, of memory used to store the data in
/// this set, not including `self`
pub fn size(&self) -> usize {
let views_size = self.views.len() * size_of::<u128>();
let in_progress_size = self.in_progress.capacity();
let completed_size: usize = self.completed.iter().map(|b| b.len()).sum();
let nulls_size = self.nulls.len();

self.map_size
+ self.builder.allocated_size()
+ views_size
+ in_progress_size
+ completed_size
+ nulls_size
+ self.hashes_buffer.allocated_size()
}
}
Expand All @@ -365,21 +464,29 @@ where
f.debug_struct("ArrowBytesMap")
.field("map", &"<map>")
.field("map_size", &self.map_size)
.field("view_builder", &self.builder)
.field("views_len", &self.views.len())
.field("completed_buffers", &self.completed.len())
.field("random_state", &self.random_state)
.field("hashes_buffer", &self.hashes_buffer)
.finish()
}
}

/// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details
///
/// Stores the view pointing to our internal buffers, eliminating the need
/// for a separate builder index. For inline strings (<=12 bytes), the view
/// contains the entire value. For out-of-line strings, the view contains
/// buffer_index and offset pointing directly to our storage.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
struct Entry<V>
where
V: Debug + PartialEq + Eq + Clone + Copy + Default,
{
/// The idx into the views array
view_idx: usize,
/// The u128 view pointing to our internal buffers. For inline strings,
/// this contains the complete value. For larger strings, this contains
/// the buffer_index/offset into our completed/in_progress buffers.
view: u128,

hash: u64,

Expand Down