Skip to content

Proposal: Refactor NLJ into an extensible framework for specialized joins #21921

@2010YOUY01

Description

@2010YOUY01

Is your feature request related to a problem or challenge?

Motivation

There has been increasing interest in specialized join algorithms for different workloads. One example is range join support: #318.

Join is one of the most expensive operators in OLAP workloads. When DataFusion cannot use an equi-join implementation, the fallback is often a nested loop join, which may need to consider |build_side| * |probe_side| row pairs. For queries with multiple joins, this cost can compound quickly as intermediate cardinalities grow.

One way to support these workloads is to add more specialized join executors, but this is harder than it first appears.

For functional requirements, a join executor has to support semi, anti, mark, and other join types correctly.

For non-functional requirements, a join executor also has to handle internal buffering and incremental output. For example, it should buffer enough data to preserve vectorized execution, while avoiding materializing too many intermediate results in memory.

This proposal introduces a simpler extension API for specialized join implementations. Many specialized joins follow a common build-probe pattern, and can reduce candidate pairs with runtime indexes or runtime filters. We can reuse much of the existing NLJ code for common logic that addresses the challenges above, while each specialized implementation only provides the acceleration logic: how to build an index or summary from the build side, and how to probe more efficiently using the runtime index.

Proposed abstraction

For joins that fit the build-then-probe pattern, the execution flow can be abstracted as follows:

for build_batch in build_input {
    accelerator.add_build_batch(build_batch)?;
}
accelerator.finish()?;

for probe_batch in probe_input {
    let (probe_batch, mut prober) =
        accelerator.init_prober(probe_batch, batch_size)?;

    while let Some(candidates) = prober.probe()? {
        // The common join driver consumes candidate row pairs,
        // evaluates residual predicates, tracks join state, and builds output.
    }
}

This abstraction provides two main optimization opportunities:

  • Runtime index on the build side. After all build batches are buffered, the accelerator can create an index-like representation that finds candidate build rows for each probe row more cheaply than a Cartesian scan.
  • Dynamic filter on the probe side. Build-side statistics can sometimes prove that a probe row cannot match any build row, allowing the executor to skip that probe row before candidate generation.

Walkthrough: piecewise merge join

Consider this query:

SELECT *
FROM generate_series(1000) AS t1(v1)
JOIN generate_series(1000000) AS t2(v1)
ON (t1.v1 > t2.v1)
   AND ((t1.v1 + t2.v1) % 2 = 0);

Here, t1.v1 > t2.v1 is the accelerated predicate. It has structure that a specialized algorithm can exploit. The second predicate, ((t1.v1 + t2.v1) % 2 = 0), is the residual predicate. The accelerator does not need to understand it; the common join driver can evaluate it after candidate pairs are produced.

Dynamic filter

After buffering t1 as the build side, the accelerator can compute max(t1.v1). For an inner join, any probe row with t2.v1 >= max(t1.v1) cannot satisfy t1.v1 > t2.v1, so those probe rows can be filtered out before candidate generation.

In this example, the dynamic filter reduces the probe side from roughly 1M rows to roughly 1K rows. The candidate search space therefore goes from roughly 1K x 1M to roughly 1K x 1K before residual predicate evaluation.

Runtime index

The accelerator can further sort the buffered t1 rows by v1. For each incoming t2 batch, it can sort the remaining probe rows by v1 and scan the two sorted runs.

For a probe row where t2.v1 = 10, the matching build rows are the suffix of sorted t1 rows with t1.v1 > 10. For a later probe row where t2.v1 = 20, the scan cursor only moves forward. This avoids repeatedly checking every build row for every probe row.

Ignoring the cost of emitting the final matching pairs, this further reduces the search work from roughly 1K x 1K to log(1K) + 1K + 1K (sort and linear scan)

Other possible accelerators

This framework could support several specialized join families:

  • Range joins.
  • ASOF joins.
  • Spatial joins, where the build side can be indexed with an R-tree or a similar structure, and build-side bounding boxes can also derive probe-side pruning filters.
  • Array/list joins, such as joins based on array_contains, array_intersect, or tag membership, where the build side can be represented as an inverted index.
  • Full-text-style joins and semantic joins

Limitations

Not every specialized join fits perfectly into this build-buffer/probe framework.

For example, an optimal ASOF join can often stream both sides in sorted order without fully buffering one side. That design may be better implemented as a dedicated physical operator.

The goal of this proposal is to provide a simple but good-enough abstraction for specialized joins, and hopefully inspire a broader class of practical optimizations. If a particular workload later needs a deeper optimization, it can still first get implemented on this simplified API with less friction, and later move to a dedicated executor.

Proposed implementation plan

  1. Introduce an API for custom join indexes and dynamic filters:
pub trait JoinAccelerator: Debug + Send + Sync {
    /// Add one build-side input batch to the accelerator.
    ///
    /// Implementations may buffer the batch, update build-side statistics,
    /// or incrementally build a runtime index.
    fn add_build_batch(&mut self, batch: RecordBatch) -> Result<()>;

    /// Signal the end of build-side input and prepare for probing.
    ///
    /// Implementations can finalize indexes, summaries, or dynamic filters here.
    fn finish(&mut self) -> Result<()>;

    /// Initialize probing for one probe-side batch.
    ///
    /// Returns the prepared probe batch and a prober that iterates over
    /// candidate pairs satisfying the accelerated predicate.
    ///
    /// - Prepared probe batch: implementations may preprocess the probe batch.
    ///   The outer join loop owns this returned batch while consuming the prober.
    /// - Output shape: an iterator over candidate pairs from
    ///   `ALL_BUILD_BATCHES x CURRENT_PROBE_BATCH` that satisfy the accelerated
    ///   predicate.
    ///
    /// # Default implementation
    ///
    /// The default implementation is the nested-loop fallback. It returns the
    /// probe batch unchanged and emits every build-side row as a candidate for
    /// each probe row. Implementations that only provide dynamic filtering can
    /// reuse this default.
    fn init_prober(
        &self,
        probe_batch: RecordBatch,
        batch_size: usize,
    ) -> Result<(RecordBatch, Box<dyn JoinAcceleratorProber>)> {
        let prober = Box::new(FallbackNestedLoopJoinProber {
            batch_size: batch_size.max(1),
            build_batch_size: self.num_build_rows(),
            probe_batch_size: probe_batch.num_rows(),
            cur_probe_offset: 0,
            cur_build_offset: 0,
        });

        Ok((probe_batch, prober))
    }

    // ...
}
  1. Refactor nested loop join to use the new JoinAccelerator trait.
  2. Port piecewise merge join to the new trait.
  3. Explore additional specialized join accelerators.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions