diff --git a/benchmarks/sql_benchmarks/hj/benchmarks/q16.benchmark b/benchmarks/sql_benchmarks/hj/benchmarks/q16.benchmark new file mode 100644 index 000000000000..3bc097088e73 --- /dev/null +++ b/benchmarks/sql_benchmarks/hj/benchmarks/q16.benchmark @@ -0,0 +1,21 @@ +name Q16 +group hj + +init sql_benchmarks/hj/init/set_config.sql + +load sql_benchmarks/hj/init/load.sql + +assert I +SELECT count(*) > 0 FROM customer +---- +true + +expect_plan HashJoinExec + +run +-- Q16: RightSemi, Small build (25 rows), 100% Hit rate +-- Build Side: nation (25 rows) | Probe Side: customer (1.5M rows) +SELECT c.k +FROM (SELECT CAST(n_nationkey AS INT) as k FROM nation) n +RIGHT SEMI JOIN (SELECT CAST(c_nationkey AS INT) as k FROM customer) c +ON n.k = c.k; diff --git a/benchmarks/sql_benchmarks/hj/benchmarks/q17.benchmark b/benchmarks/sql_benchmarks/hj/benchmarks/q17.benchmark new file mode 100644 index 000000000000..75604de89aea --- /dev/null +++ b/benchmarks/sql_benchmarks/hj/benchmarks/q17.benchmark @@ -0,0 +1,21 @@ +name Q17 +group hj + +init sql_benchmarks/hj/init/set_config.sql + +load sql_benchmarks/hj/init/load.sql + +assert I +SELECT count(*) > 0 FROM lineitem +---- +true + +expect_plan HashJoinExec + +run +-- Q17: RightSemi, Medium build (100K rows), 100% Hit rate +-- Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) +SELECT l.k +FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s +RIGHT SEMI JOIN (SELECT CAST(l_suppkey AS INT) as k FROM lineitem) l +ON s.k = l.k; diff --git a/benchmarks/sql_benchmarks/hj/benchmarks/q18.benchmark b/benchmarks/sql_benchmarks/hj/benchmarks/q18.benchmark new file mode 100644 index 000000000000..e9af18ba7d95 --- /dev/null +++ b/benchmarks/sql_benchmarks/hj/benchmarks/q18.benchmark @@ -0,0 +1,24 @@ +name Q18 +group hj + +init sql_benchmarks/hj/init/set_config.sql + +load sql_benchmarks/hj/init/load.sql + +assert I +SELECT count(*) > 0 FROM lineitem +---- +true + +expect_plan HashJoinExec + +run +-- Q18: RightSemi, Medium build (100K rows), 10% Hit rate +-- Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) +SELECT l.k +FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s +RIGHT SEMI JOIN ( + SELECT CAST(CASE WHEN l_suppkey % 10 = 0 THEN l_suppkey ELSE l_suppkey + 1000000 END AS INT) as k + FROM lineitem +) l +ON s.k = l.k; diff --git a/benchmarks/sql_benchmarks/hj/benchmarks/q19.benchmark b/benchmarks/sql_benchmarks/hj/benchmarks/q19.benchmark new file mode 100644 index 000000000000..fc70b7bc060c --- /dev/null +++ b/benchmarks/sql_benchmarks/hj/benchmarks/q19.benchmark @@ -0,0 +1,21 @@ +name Q19 +group hj + +init sql_benchmarks/hj/init/set_config.sql + +load sql_benchmarks/hj/init/load.sql + +assert I +SELECT count(*) > 0 FROM customer +---- +true + +expect_plan HashJoinExec + +run +-- Q19: RightAnti, Small build (25 rows), 100% Hit rate (no output) +-- Build Side: nation (25 rows) | Probe Side: customer (1.5M rows) +SELECT c.k +FROM (SELECT CAST(n_nationkey AS INT) as k FROM nation) n +RIGHT ANTI JOIN (SELECT CAST(c_nationkey AS INT) as k FROM customer) c +ON n.k = c.k; diff --git a/benchmarks/sql_benchmarks/hj/benchmarks/q20.benchmark b/benchmarks/sql_benchmarks/hj/benchmarks/q20.benchmark new file mode 100644 index 000000000000..4fb421f1c0ff --- /dev/null +++ b/benchmarks/sql_benchmarks/hj/benchmarks/q20.benchmark @@ -0,0 +1,21 @@ +name Q20 +group hj + +init sql_benchmarks/hj/init/set_config.sql + +load sql_benchmarks/hj/init/load.sql + +assert I +SELECT count(*) > 0 FROM lineitem +---- +true + +expect_plan HashJoinExec + +run +-- Q20: RightAnti, Medium build (100K rows), 100% Hit rate (no output) +-- Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) +SELECT l.k +FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s +RIGHT ANTI JOIN (SELECT CAST(l_suppkey AS INT) as k FROM lineitem) l +ON s.k = l.k; diff --git a/benchmarks/sql_benchmarks/hj/benchmarks/q21.benchmark b/benchmarks/sql_benchmarks/hj/benchmarks/q21.benchmark new file mode 100644 index 000000000000..dae927178f86 --- /dev/null +++ b/benchmarks/sql_benchmarks/hj/benchmarks/q21.benchmark @@ -0,0 +1,24 @@ +name Q21 +group hj + +init sql_benchmarks/hj/init/set_config.sql + +load sql_benchmarks/hj/init/load.sql + +assert I +SELECT count(*) > 0 FROM lineitem +---- +true + +expect_plan HashJoinExec + +run +-- Q21: RightAnti, Medium build (100K rows), 10% Hit rate (90% output) +-- Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) +SELECT l.k +FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s +RIGHT ANTI JOIN ( + SELECT CAST(CASE WHEN l_suppkey % 10 = 0 THEN l_suppkey ELSE l_suppkey + 1000000 END AS INT) as k + FROM lineitem +) l +ON s.k = l.k; diff --git a/benchmarks/sql_benchmarks/hj/benchmarks/q22.benchmark b/benchmarks/sql_benchmarks/hj/benchmarks/q22.benchmark new file mode 100644 index 000000000000..868b88a5aaed --- /dev/null +++ b/benchmarks/sql_benchmarks/hj/benchmarks/q22.benchmark @@ -0,0 +1,28 @@ +name Q22 +group hj + +init sql_benchmarks/hj/init/set_config.sql + +load sql_benchmarks/hj/init/load.sql + +assert I +SELECT count(*) > 0 FROM lineitem +---- +true + +expect_plan HashJoinExec + +run +-- Q22: RightSemi, Medium build (100K rows), ~1% Hit rate, fanout ~100 +-- Build Side: supplier (100K rows) collapsed onto 1K distinct keys +-- Probe Side: lineitem (60M rows) +SELECT l.k +FROM ( + SELECT CAST(((s_suppkey - 1) % 1000) + 1 AS INT) as k + FROM supplier +) s +RIGHT SEMI JOIN ( + SELECT CAST(l_suppkey AS INT) as k + FROM lineitem +) l +ON s.k = l.k; diff --git a/benchmarks/sql_benchmarks/hj/benchmarks/q23.benchmark b/benchmarks/sql_benchmarks/hj/benchmarks/q23.benchmark new file mode 100644 index 000000000000..7aa8acc87e93 --- /dev/null +++ b/benchmarks/sql_benchmarks/hj/benchmarks/q23.benchmark @@ -0,0 +1,31 @@ +name Q23 +group hj + +init sql_benchmarks/hj/init/set_config_no_stats.sql + +load sql_benchmarks/hj/init/load.sql + +assert I +SELECT count(*) > 0 FROM lineitem +---- +true + +expect_plan HashJoinExec + +run +-- Q23: high-fanout string-key inner join. +-- Build ~32K rows / ~415 distinct keys (fanout ~78), probe ~2.3M rows +-- (all carrying the dominant key), output ~176M pairs. Long keys (~28 chars) +-- make per-pair key comparison expensive; count(*) isolates the match path. +-- Thresholds zeroed to force Partitioned mode (simulates absent row-count stats). +SELECT count(*) +FROM ( + SELECT 'high_fanout_string_join_key_' || CAST((s_suppkey % 415) + 1 AS VARCHAR) as k + FROM supplier + WHERE s_suppkey <= 32340 +) s +JOIN ( + SELECT 'high_fanout_string_join_key_1' as k + FROM lineitem + WHERE l_orderkey % 265 = 0 +) l ON s.k = l.k; diff --git a/benchmarks/sql_benchmarks/hj/init/set_config.sql b/benchmarks/sql_benchmarks/hj/init/set_config.sql new file mode 100644 index 000000000000..39a3ce259b0a --- /dev/null +++ b/benchmarks/sql_benchmarks/hj/init/set_config.sql @@ -0,0 +1 @@ +set datafusion.optimizer.join_reordering = false; diff --git a/benchmarks/sql_benchmarks/hj/init/set_config_no_stats.sql b/benchmarks/sql_benchmarks/hj/init/set_config_no_stats.sql new file mode 100644 index 000000000000..547cbe80bc49 --- /dev/null +++ b/benchmarks/sql_benchmarks/hj/init/set_config_no_stats.sql @@ -0,0 +1,3 @@ +set datafusion.optimizer.join_reordering = false; +set datafusion.optimizer.hash_join_single_partition_threshold = 0; +set datafusion.optimizer.hash_join_single_partition_threshold_rows = 0; diff --git a/benchmarks/src/hj.rs b/benchmarks/src/hj.rs index 7b56e75ea9eb..7d33bc3aa9e5 100644 --- a/benchmarks/src/hj.rs +++ b/benchmarks/src/hj.rs @@ -57,6 +57,7 @@ struct HashJoinQuery { prob_hit: f64, build_size: &'static str, probe_size: &'static str, + isolate_partitioned_join: bool, } /// Inline SQL queries for Hash Join benchmarks @@ -69,6 +70,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "25", probe_size: "1.5M", + isolate_partitioned_join: false, }, // Q2: Very Small Build Side (Sparse, range < 1024) // Build Side: nation (25 rows, range 961) | Probe Side: customer (1.5M rows) @@ -85,6 +87,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "25", probe_size: "1.5M", + isolate_partitioned_join: false, }, // Q3: 100% Density, 100% Hit rate HashJoinQuery { @@ -93,6 +96,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q4: 100% Density, 10% Hit rate HashJoinQuery { @@ -108,6 +112,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q5: 75% Density, 100% Hit rate HashJoinQuery { @@ -123,6 +128,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q6: 75% Density, 10% Hit rate HashJoinQuery { @@ -142,6 +148,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q7: 50% Density, 100% Hit rate HashJoinQuery { @@ -157,6 +164,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q8: 50% Density, 10% Hit rate HashJoinQuery { @@ -176,6 +184,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q9: 20% Density, 100% Hit rate HashJoinQuery { @@ -191,6 +200,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q10: 20% Density, 10% Hit rate HashJoinQuery { @@ -210,6 +220,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q11: 10% Density, 100% Hit rate HashJoinQuery { @@ -225,6 +236,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q12: 10% Density, 10% Hit rate HashJoinQuery { @@ -244,6 +256,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q13: 1% Density, 100% Hit rate HashJoinQuery { @@ -259,6 +272,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q14: 1% Density, 10% Hit rate HashJoinQuery { @@ -278,6 +292,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q15: 20% Density, 10% Hit rate, 20% Duplicates in Build Side HashJoinQuery { @@ -300,6 +315,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K_(20%_dups)", probe_size: "60M", + isolate_partitioned_join: false, }, // RightSemi Join benchmarks with Int32 keys // @@ -325,6 +341,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "25", probe_size: "1.5M_RightSemi", + isolate_partitioned_join: false, }, // Q17: RightSemi, Medium build (100K rows), 100% Hit rate // Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) @@ -337,6 +354,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M_RightSemi", + isolate_partitioned_join: false, }, // Q18: RightSemi, Medium build (100K rows), 10% Hit rate // Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) @@ -352,6 +370,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M_RightSemi", + isolate_partitioned_join: false, }, // RightAnti Join benchmarks with Int32 keys // @@ -377,6 +396,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "25", probe_size: "1.5M_RightAnti", + isolate_partitioned_join: false, }, // Q20: RightAnti, Medium build (100K rows), 100% Hit rate (no output) // Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) @@ -389,6 +409,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M_RightAnti", + isolate_partitioned_join: false, }, // Q21: RightAnti, Medium build (100K rows), 10% Hit rate (90% output) // Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) @@ -404,6 +425,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M_RightAnti", + isolate_partitioned_join: false, }, // Q22: RightSemi, Medium build (100K rows), ~1% Hit rate, fanout ~100 // @@ -425,6 +447,30 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.01, build_size: "100K_(fanout_100)", probe_size: "60M_RightSemi", + isolate_partitioned_join: false, + }, + // Q23: skewed high-fanout string-key inner join. + // Build ~32K rows / ~415 distinct keys (fanout ~78), probe ~2.3M rows all + // carrying the same dominant key — one partition does nearly all the work. + // Long keys (~28 chars) make per-pair key comparison expensive; count(*) + // isolates the match path. + HashJoinQuery { + sql: r###"SELECT count(*) + FROM ( + SELECT 'high_fanout_string_join_key_' || CAST((s_suppkey % 415) + 1 AS VARCHAR) as k + FROM supplier + WHERE s_suppkey <= 32340 + ) s + JOIN ( + SELECT 'high_fanout_string_join_key_1' as k + FROM lineitem + WHERE l_orderkey % 265 = 0 + ) l ON s.k = l.k"###, + density: 1.0, + prob_hit: 1.0, + build_size: "32K_(fanout~78)", + probe_size: "2.3M_long_keys_count", + isolate_partitioned_join: true, }, ]; @@ -487,6 +533,20 @@ impl RunOpt { ); benchmark_run.start_new_case(&case_name); + // For Q23 force Partitioned mode: zero the CollectLeft thresholds + // so the planner cannot prove the build side is small (as happens + // when the datasource provides no row-count stats). + if query.isolate_partitioned_join { + ctx.sql( + "SET datafusion.optimizer.hash_join_single_partition_threshold = 0", + ) + .await?; + ctx.sql( + "SET datafusion.optimizer.hash_join_single_partition_threshold_rows = 0", + ) + .await?; + } + let query_run = self .benchmark_query(query.sql, &query_id.to_string(), &ctx) .await;