diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index b31982ea3b7b4..57218244bae6b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -655,10 +655,10 @@ impl HashJoinStream { self.join_type, )?; timer.done(); - + self.output_buffer.push_batch(result)?; self.state = HashJoinStreamState::FetchProbeBatch; - return Ok(StatefulStreamResult::Ready(Some(result))); + return Ok(StatefulStreamResult::Continue); } // get the matched by join keys indices diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 228918c3855f2..136a68573562a 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -5056,6 +5056,49 @@ LEFT ANTI JOIN t2 ON k1 = k2 WHERE k1 < 0 ---- +# Also check that the reported number of output rows/batches are correct in the "empty build side" +# optimization. +# Issue: https://github.com/apache/datafusion/issues/20809 +query TT +EXPLAIN ANALYZE +WITH t1 (k) AS ( + VALUES (1), (2) +), t2 (k) AS ( + VALUES (1) +) +SELECT * +FROM t1 +LEFT ANTI JOIN ( + SELECT * + FROM t2 + WHERE k <> 1 +) t2 ON t1.k = t2.k; +---- +Plan with Metrics +01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=1, array_map_created_count=0, build_input_batches=0, build_input_rows=0, input_batches=1, input_rows=2, build_mem_used=, build_time=, join_time=, avg_fanout=N/A (0/0), probe_hit_rate=0% (0/2)] +02)--ProjectionExec: expr=[column1@0 as k], metrics=[output_rows=0, elapsed_compute=, output_bytes=, output_batches=0, expr_0_eval_time=] +03)----FilterExec: column1@0 != 1, metrics=[output_rows=0, elapsed_compute=, output_bytes=, output_batches=0, selectivity=0% (0/1)] +04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] +05)--ProjectionExec: expr=[column1@0 as k], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=1, expr_0_eval_time=] +06)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + +query I +WITH t1 (k) AS ( + VALUES (1), (2) +), t2 (k) AS ( + VALUES (1) +) +SELECT * +FROM t1 +LEFT ANTI JOIN ( + SELECT * + FROM t2 + WHERE k <> 1 +) t2 ON t1.k = t2.k; +---- +1 +2 + # Mark testing statement ok CREATE OR REPLACE TABLE t1(b INT, c INT, d INT);