Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,21 @@ pub(super) struct JoinLeftData {
pub(super) probe_side_non_empty: AtomicBool,
/// Shared atomic flag indicating if any probe partition saw NULL in join keys (for null-aware anti joins)
pub(super) probe_side_has_null: AtomicBool,
/// `true` if any hash bucket holds build rows with differing join keys
/// (hash collisions). When `false`, every chain is "pure" and the
/// probe side can validate a chain with a single key check at its head
/// instead of re-checking every duplicate. Computed once at build time.
has_key_collisions: bool,
}

impl JoinLeftData {
/// Returns `true` if the build side has any real hash collisions (a bucket
/// holding rows with differing join keys). When `false`, the probe side can
/// skip the per-duplicate key recheck. See [`Self::has_key_collisions`].
pub(super) fn has_key_collisions(&self) -> bool {
self.has_key_collisions
}

/// return a reference to the map
pub(super) fn map(&self) -> &Map {
&self.map
Expand Down Expand Up @@ -2088,6 +2100,17 @@ async fn collect_left_input(
(Map::HashMap(hashmap), batch, left_values)
};

// Detect whether the build side has real hash collisions (a bucket with
// differing keys). When it doesn't, the probe side can validate each chain
// with a single key check at its head instead of re-checking every
// duplicate.
let has_key_collisions = match &join_hash_map {
Map::HashMap(hashmap) => {
hashmap.has_key_collisions(&left_values, null_equality)?
}
Map::ArrayMap(_) => false,
};

// Reserve additional memory for visited indices bitmap and create shared builder
let visited_indices_bitmap = if with_visited_indices_bitmap {
let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
Expand Down Expand Up @@ -2144,6 +2167,7 @@ async fn collect_left_input(
membership,
probe_side_non_empty: AtomicBool::new(false),
probe_side_has_null: AtomicBool::new(false),
has_key_collisions,
};

Ok(data)
Expand Down Expand Up @@ -4688,6 +4712,8 @@ mod tests {
None,
8192,
(0, None),
// Exercise the per-pair recheck path.
true,
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;
Expand Down Expand Up @@ -4750,6 +4776,8 @@ mod tests {
None,
8192,
(0, None),
// Exercise the per-pair recheck path.
true,
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;
Expand Down
64 changes: 49 additions & 15 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::joins::hash_join::shared_bounds::{
PartitionBounds, PartitionBuildData, SharedBuildAccumulator,
};
use crate::joins::utils::{
OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap, matchable_join_keys,
JoinKeyComparator, OnceFut, get_final_indices_from_shared_bitmap, matchable_join_keys,
};
use crate::stream::EmptyRecordBatchStream;
use crate::{
Expand Down Expand Up @@ -402,6 +402,7 @@ pub(super) fn lookup_join_hashmap(
valid_keys: Option<&NullBuffer>,
limit: usize,
offset: MapOffset,
has_key_collisions: bool,
probe_indices_buffer: &mut Vec<u32>,
build_indices_buffer: &mut Vec<u64>,
) -> Result<(UInt64Array, UInt32Array, Option<MapOffset>)> {
Expand All @@ -414,26 +415,58 @@ pub(super) fn lookup_join_hashmap(
build_indices_buffer,
);

let build_indices_unfiltered: UInt64Array =
std::mem::take(build_indices_buffer).into();
let probe_indices_unfiltered: UInt32Array =
std::mem::take(probe_indices_buffer).into();

// TODO: optimize equal_rows_arr to avoid allocation of intermediate arrays
// https://github.com/apache/datafusion/issues/12131
let (build_indices, probe_indices) = equal_rows_arr(
&build_indices_unfiltered,
&probe_indices_unfiltered,
// Validate the candidate (build, probe) pairs against the join key to drop
// hash collisions. We compare values in place via a prebuilt comparator,
// avoiding the take() + eq_dyn_null() + FilterBuilder allocations that
// equal_rows_arr performs at O(matched_pairs) scale.
// See: https://github.com/apache/datafusion/issues/12131
let comparator = JoinKeyComparator::for_equality(
build_side_values,
probe_side_values,
null_equality,
)?;

// Reclaim buffers
*build_indices_buffer = build_indices_unfiltered.into_parts().1.into();
*probe_indices_buffer = probe_indices_unfiltered.into_parts().1.into();
let mut build_out: Vec<u64> = Vec::with_capacity(build_indices_buffer.len());
let mut probe_out: Vec<u32> = Vec::with_capacity(probe_indices_buffer.len());

if has_key_collisions {
// A bucket may mix keys, so every candidate pair must be rechecked.
for (b, p) in build_indices_buffer.iter().zip(probe_indices_buffer.iter()) {
if comparator.is_equal(*b as usize, *p as usize) {
build_out.push(*b);
probe_out.push(*p);
}
}
} else {
// Collision-free build side: all pairs in one probe row's run share
// the same build key, so checking the key once at the run head
// accepts or rejects the whole run (F comparisons per probe row -> 1).
let builds = build_indices_buffer.as_slice();
let probes = probe_indices_buffer.as_slice();
let mut start = 0;
while start < probes.len() {
let probe_idx = probes[start];
// Find the end of this probe row's run (equal probe indices are
// contiguous). Peek the next index for the common 1:1 case; fall
// back to a binary search for long runs (high fanout).
let end = if start + 1 >= probes.len() || probes[start + 1] != probe_idx {
start + 1
} else {
start + probes[start..].partition_point(|&p| p == probe_idx)
};
if comparator.is_equal(builds[start] as usize, probe_idx as usize) {
build_out.extend_from_slice(&builds[start..end]);
probe_out.extend_from_slice(&probes[start..end]);
}
start = end;
}
}

// Reclaim buffers for the next call
build_indices_buffer.clear();
probe_indices_buffer.clear();

Ok((build_indices, probe_indices, next_offset))
Ok((build_out.into(), probe_out.into(), next_offset))
}

/// Counts the number of distinct elements in the input array.
Expand Down Expand Up @@ -808,6 +841,7 @@ impl HashJoinStream {
state.valid_keys.as_ref(),
self.batch_size,
state.offset,
build_side.left_data.has_key_collisions(),
&mut self.probe_indices_buffer,
&mut self.build_indices_buffer,
)?,
Expand Down
93 changes: 92 additions & 1 deletion datafusion/physical-plan/src/joins/join_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
use std::fmt::{self, Debug};
use std::ops::Sub;

use arrow::array::BooleanArray;
use crate::joins::utils::JoinKeyComparator;
use arrow::array::{ArrayRef, BooleanArray};
use arrow::buffer::{BooleanBuffer, NullBuffer};
use arrow::datatypes::ArrowNativeType;
use datafusion_common::{NullEquality, Result};
use hashbrown::HashTable;
use hashbrown::hash_table::Entry::{Occupied, Vacant};

Expand Down Expand Up @@ -131,6 +133,18 @@ pub trait JoinHashMapType: Send + Sync {
match_indices: &mut Vec<u64>,
) -> Option<MapOffset>;

/// Returns `true` if any bucket holds build rows with differing join keys
/// (real hash collisions). When `false`, the probe can check once per
/// chain head and accept the whole run. Scanned once at build time.
fn has_key_collisions(
&self,
left_values: &[ArrayRef],
null_equality: NullEquality,
) -> Result<bool> {
let _ = (left_values, null_equality);
Ok(true)
}

/// Returns a BooleanArray indicating which of the provided hashes exist in the map.
fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray;

Expand Down Expand Up @@ -208,6 +222,14 @@ impl JoinHashMapType for JoinHashMapU32 {
)
}

fn has_key_collisions(
&self,
left_values: &[ArrayRef],
null_equality: NullEquality,
) -> Result<bool> {
detect_key_collisions(&self.next, left_values, null_equality)
}

fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray {
contain_hashes(&self.map, hash_values)
}
Expand Down Expand Up @@ -288,6 +310,14 @@ impl JoinHashMapType for JoinHashMapU64 {
)
}

fn has_key_collisions(
&self,
left_values: &[ArrayRef],
null_equality: NullEquality,
) -> Result<bool> {
detect_key_collisions(&self.next, left_values, null_equality)
}

fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray {
contain_hashes(&self.map, hash_values)
}
Expand Down Expand Up @@ -491,6 +521,40 @@ pub fn contain_hashes<T>(map: &HashTable<(u64, T)>, hash_values: &[u64]) -> Bool
BooleanArray::new(buffer, None)
}

/// Scans the `next` chain to detect real hash collisions (two build rows in
/// the same bucket with different keys). `next[i]` stores `prev_row + 1`
/// (`0` = end of chain). Checking every adjacent linked pair is sufficient:
/// any two distinct keys in the same bucket must appear as neighbors somewhere.
///
/// Example — keys `["cat", "cat", "dog"]`, next `[0, 1, 2]`:
/// row 1 → prev 0: "cat"=="cat" ✓
/// row 2 → prev 1: "dog"!="cat" → return true (collision found)
fn detect_key_collisions<T>(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked in the other engine (Trino) and there when a build row is inserted, Trino only inserts rows that share the same key, so by construction, every chain is already "pure" and hash collisions are not possible.
This happens because Trino uses a hash table that resolves key equality at insert time. DataFusion's update_from_iter only receives hashes and row indices (it has no access to key values) so it chains all rows in the same hash bucket together regardless of whether they share a key or not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note doing the same in DF would be non trivial because it would require modifying update_from_iter trait signature to accept key columns alongside the hashes, and updating all call sites accordingly.

next: &[T],
left_values: &[ArrayRef],
null_equality: NullEquality,
) -> Result<bool>
where
T: ArrowNativeType + Into<u64>,
{
if next.is_empty() {
return Ok(false);
}
let comparator =
JoinKeyComparator::for_equality(left_values, left_values, null_equality)?;
for (row, &link) in next.iter().enumerate() {
let link: u64 = link.into();
if link != 0 {
// `link` is `prev_row + 1`; both rows live in the same bucket.
let prev = (link - 1) as usize;
if !comparator.is_equal(row, prev) {
return Ok(true);
}
}
}
Ok(false)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -569,4 +633,31 @@ mod tests {
assert_eq!(input_indices, vec![1, 1]);
assert_eq!(match_indices, vec![3, 1]);
}

#[test]
fn test_has_key_collisions_same_key() -> Result<()> {
// 5 build rows all with key 10 chained in the same bucket — no collision.
// next: [0, 1, 2, 3, 4] → chain 4→3→2→1→0→end
use arrow::array::Int32Array;
use std::sync::Arc;
let next: Vec<u32> = vec![0, 1, 2, 3, 4];
let map = JoinHashMapU32::new(HashTable::new(), next);
let keys: ArrayRef = Arc::new(Int32Array::from(vec![10, 10, 10, 10, 10]));
assert!(!map.has_key_collisions(&[keys], NullEquality::NullEqualsNothing)?);
Ok(())
}

#[test]
fn test_has_key_collisions_distinct_keys() -> Result<()> {
// 5 build rows, 4 with key 10 and 1 with key 20 buried in the chain.
// next: [0, 1, 2, 3, 4] → chain 4→3→2→1→0→end
// Row 2 has key 20 — adjacent pair (row 2, row 1) differs → collision.
use arrow::array::Int32Array;
use std::sync::Arc;
let next: Vec<u32> = vec![0, 1, 2, 3, 4];
let map = JoinHashMapU32::new(HashTable::new(), next);
let keys: ArrayRef = Arc::new(Int32Array::from(vec![10, 10, 20, 10, 10]));
assert!(map.has_key_collisions(&[keys], NullEquality::NullEqualsNothing)?);
Ok(())
}
}
13 changes: 13 additions & 0 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2332,6 +2332,19 @@ impl JoinKeyComparator {
Ok(Self { first, rest })
}

/// Build equality-only comparators for each join key column pair.
///
/// Unlike [`Self::new`], no `SortOptions` are required — `SortOptions::default()`
/// is used internally, which is correct because callers only test `== Equal`.
pub fn for_equality(
left_arrays: &[ArrayRef],
right_arrays: &[ArrayRef],
null_equality: NullEquality,
) -> Result<Self> {
let sort_options = vec![SortOptions::default(); left_arrays.len()];
Self::new(left_arrays, right_arrays, &sort_options, null_equality)
}

/// Compare row `left` (in the left arrays) with row `right` (in the right
/// arrays). Returns the lexicographic ordering across all key columns.
#[inline]
Expand Down
Loading