Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,71 @@ use row::GroupValuesRows;

pub(crate) use single_group_by::primitive::HashValue;

/// Counting sort of row indices by hash bucket for cache-friendly probing.
/// Populates `sorted_indices` and `gathered_hashes` in bucket order.
pub(crate) fn count_sort(
hashes: &[u64],
sorted_indices: &mut Vec<u32>,
gathered_hashes: &mut Vec<u64>,
bucket_offsets: &mut Vec<u32>,
num_buckets: usize,
) {
let n_rows = hashes.len();
let bucket_mask = num_buckets - 1;

let sort_bits = 8u32;
let num_partitions = 1usize << sort_bits;
let bucket_bits = num_buckets.trailing_zeros();
let shift = bucket_bits.saturating_sub(sort_bits);

// Phase 1: Count elements per partition
bucket_offsets.clear();
bucket_offsets.resize(num_partitions, 0);

for &hash in hashes.iter() {
let partition = ((hash as usize) & bucket_mask) >> shift;
bucket_offsets[partition] += 1;
}

// Phase 2: Prefix sum to get starting offsets
let mut sum = 0u32;
for count in bucket_offsets.iter_mut() {
let c = *count;
*count = sum;
sum += c;
}

// Phase 3: Scatter indices and gather hashes into sorted order
sorted_indices.resize(n_rows, 0);
gathered_hashes.resize(n_rows, 0);

for (i, &hash) in hashes.iter().enumerate() {
let partition = ((hash as usize) & bucket_mask) >> shift;
let pos = bucket_offsets[partition] as usize;
sorted_indices[pos] = i as u32;
gathered_hashes[pos] = hash;
bucket_offsets[partition] += 1;
}
}

/// Count unique hash values in a count-sorted hash slice.
/// Identical hashes always land in the same partition, so they
/// are adjacent after count sort.
pub(crate) fn count_unique(gathered_hashes: &[u64]) -> usize {
if gathered_hashes.is_empty() {
return 0;
}
let mut count = 1;
let mut prev = gathered_hashes[0];
for &h in &gathered_hashes[1..] {
if h != prev {
count += 1;
prev = h;
}
}
count
}

use crate::aggregates::{
group_values::single_group_by::{
boolean::GroupValuesBoolean, bytes::GroupValuesBytes,
Expand Down Expand Up @@ -135,6 +200,8 @@ pub fn new_group_values(
schema: SchemaRef,
group_ordering: &GroupOrdering,
) -> Result<Box<dyn GroupValues>> {
let streaming = !matches!(group_ordering, GroupOrdering::None);

if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();

Expand Down Expand Up @@ -207,6 +274,6 @@ pub fn new_group_values(
Ok(Box::new(GroupValuesColumn::<true>::try_new(schema)?))
}
} else {
Ok(Box::new(GroupValuesRows::try_new(schema)?))
Ok(Box::new(GroupValuesRows::try_new(schema, streaming)?))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ pub struct GroupValuesColumn<const STREAMING: bool> {
/// reused buffer to store hashes
hashes_buffer: Vec<u64>,

/// Reused buffer for sorted indices during intern
sorted_indices: Vec<u32>,

/// Reused buffer for gathered hashes in sorted order
gathered_hashes: Vec<u64>,

/// Reused buffer for counting sort bucket offsets
bucket_offsets: Vec<u32>,

/// Random state for creating hashes
random_state: RandomState,
}
Expand Down Expand Up @@ -271,6 +280,9 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
map_size: 0,
group_values: vec![],
hashes_buffer: Default::default(),
sorted_indices: Vec::new(),
gathered_hashes: Vec::new(),
bucket_offsets: Vec::new(),
random_state: crate::aggregates::AGGREGATION_HASH_SEED,
})
}
Expand Down Expand Up @@ -475,13 +487,17 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {

/// Collect vectorized context by checking hash values of `cols` in `map`
///
/// Processes rows in hash-sorted order for better cache locality when
/// probing the hash table, and skips hash table probes for runs of
/// duplicate hashes (adding them directly to the equal_to list).
///
/// 1. If bucket not found
/// - Build and insert the `new inlined group index view`
/// and its hash value to `map`
/// - Add row index to `vectorized_append_row_indices`
/// - Set group index to row in `groups`
///
/// 2. bucket found
/// 2. bucket found (or duplicate hash from sorted run)
/// - Add row index to `vectorized_equal_to_row_indices`
/// - Check if the `group index view` is `inlined` or `non_inlined`:
/// If it is inlined, add to `vectorized_equal_to_group_indices` directly.
Expand All @@ -499,8 +515,82 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
.equal_to_group_indices
.clear();

let n_rows = batch_hashes.len();

let sorted_indices = &mut self.sorted_indices;
sorted_indices.clear();

let gathered_hashes = &mut self.gathered_hashes;
gathered_hashes.clear();

// Only apply counting sort when the hash table is large enough to
// benefit from improved cache locality (i.e. doesn't fit in L2 cache).
let num_buckets = self.map.capacity().next_power_of_two();
// Each bucket holds a control byte + entry: (u64, GroupIndexView) = 16 bytes + 1
let table_bytes = num_buckets * (std::mem::size_of::<(u64, GroupIndexView)>() + 1);
const L2_CACHE_BYTES: usize = 256 * 1024;

if table_bytes > L2_CACHE_BYTES {
use super::{count_sort, count_unique};

let mut num_buckets = num_buckets;
count_sort(
batch_hashes,
sorted_indices,
gathered_hashes,
&mut self.bucket_offsets,
num_buckets,
);

let unique_count = count_unique(gathered_hashes);

// Reserve capacity so the table won't rehash during inserts,
// keeping the bucket layout stable for our sort order.
self.map.reserve(unique_count, |(hash, _)| *hash);

// If reserve changed the bucket count, redo the count sort
let new_num_buckets = self.map.capacity().next_power_of_two();
if new_num_buckets != num_buckets {
num_buckets = new_num_buckets;
sorted_indices.clear();
gathered_hashes.clear();
count_sort(
batch_hashes,
sorted_indices,
gathered_hashes,
&mut self.bucket_offsets,
num_buckets,
);
}
} else {
// Small table fits in L2 cache; skip sorting overhead
sorted_indices.extend(0..n_rows as u32);
gathered_hashes.extend_from_slice(batch_hashes);
}

let mut group_values_len = self.group_values[0].len();
for (row, &target_hash) in batch_hashes.iter().enumerate() {

// Track previous hash and group index for duplicate hash detection
let mut prev_hash: u64 = 0;
let mut prev_group_idx: usize = 0;
let mut has_prev = false;

for (sorted_pos, &idx) in sorted_indices.iter().enumerate() {
let row = idx as usize;
let target_hash = gathered_hashes[sorted_pos];

// Fast path: same hash as previous row in sorted order.
// Skip hash table probe and add directly to equal_to list.
if has_prev && target_hash == prev_hash {
self.vectorized_operation_buffers
.equal_to_row_indices
.push(row);
self.vectorized_operation_buffers
.equal_to_group_indices
.push(prev_group_idx);
continue;
}

let entry = self
.map
.find(target_hash, |(exist_hash, _)| target_hash == *exist_hash);
Expand Down Expand Up @@ -528,34 +618,42 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
// Set group index to row in `groups`
groups[row] = current_group_idx;

prev_hash = target_hash;
prev_group_idx = current_group_idx;
has_prev = true;

group_values_len += 1;
continue;
};

// 2. bucket found
// Check if the `group index view` is `inlined` or `non_inlined`
let first_group_idx;
if group_index_view.is_non_inlined() {
// Non-inlined case, the value of view is offset in `group_index_lists`.
// We use it to get `group_index_list`, and add related `rows` and `group_indices`
// into `vectorized_equal_to_row_indices` and `vectorized_equal_to_group_indices`.
let list_offset = group_index_view.value() as usize;
let group_index_list = &self.group_index_lists[list_offset];

first_group_idx = group_index_list[0];
self.vectorized_operation_buffers
.equal_to_group_indices
.extend_from_slice(group_index_list);
self.vectorized_operation_buffers
.equal_to_row_indices
.extend(std::iter::repeat_n(row, group_index_list.len()));
} else {
let group_index = group_index_view.value() as usize;
first_group_idx = group_index_view.value() as usize;
self.vectorized_operation_buffers
.equal_to_row_indices
.push(row);
self.vectorized_operation_buffers
.equal_to_group_indices
.push(group_index);
.push(first_group_idx);
}

prev_hash = target_hash;
prev_group_idx = first_group_idx;
has_prev = true;
}
}

Expand Down
Loading
Loading