Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 129 additions & 3 deletions packages/db/src/query/compiler/joins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import { ensureIndexForField } from '../../indexes/auto-index.js'
import { PropRef, followRef } from '../ir.js'
import { inArray } from '../builder/functions.js'
import { compileExpression } from './evaluators.js'
import { normalizeOrderByPaths } from './expressions.js'
import type { CompileQueryFn } from './index.js'
import type { OrderByOptimizationInfo } from './order-by.js'
import type {
BasicExpression,
CollectionRef,
JoinClause,
OrderBy,
QueryIR,
QueryRef,
} from '../ir.js'
Expand Down Expand Up @@ -166,10 +168,17 @@ function processJoin(
throw new JoinCollectionNotFoundError(joinedCollectionId)
}

const orderByPreference = getOrderByActivePreference(
rawQuery,
mainSource,
joinedSource,
)

const { activeSource, lazySource } = getActiveAndLazySources(
joinClause.type,
mainCollection,
joinedCollection,
orderByPreference,
)

// Analyze which source each expression refers to and swap if necessary
Expand Down Expand Up @@ -273,6 +282,12 @@ function processJoin(
)
}

// Compute orderBy and limit for the lazy collection from the query.
// When the query's orderBy references fields from the lazy collection,
// pass this information to loadSubset so the sync layer can optimize
// data fetching (e.g., server-side sorting/pagination).
const lazyOrderByAndLimit = computeOrderByForAlias(rawQuery, lazyAlias)

// Set up lazy loading: intercept active side's stream and dynamically load
// matching rows from lazy side based on join keys.
const activePipelineWithLoading: IStreamBuilder<
Expand Down Expand Up @@ -302,12 +317,16 @@ function processJoin(
return
}

// Request filtered snapshot from lazy collection for matching join keys
// Request filtered snapshot from lazy collection for matching join keys.
// Include orderBy/limit when they reference the lazy collection's fields
// so the sync layer can optimize fetching (e.g., server-side sort/pagination).
const joinKeys = data.getInner().map(([[joinKey]]) => joinKey)
const lazyJoinRef = new PropRef(followRefResult.path)
const loaded = lazySourceSubscription.requestSnapshot({
where: inArray(lazyJoinRef, joinKeys),
optimizedOnly: true,
orderBy: lazyOrderByAndLimit?.orderBy,
limit: lazyOrderByAndLimit?.limit,
})

if (!loaded) {
Expand Down Expand Up @@ -610,15 +629,19 @@ function processJoinResults(joinType: string) {
* The active collection is the one that we need to fully iterate over
* and it can be the main source (i.e. left collection) or the joined source (i.e. right collection).
* The lazy collection is the one that we should join-in lazily based on matches in the active collection.
* @param joinClause - The join clause to analyze
* @param joinType - The type of join
* @param leftCollection - The left collection
* @param rightCollection - The right collection
* @param orderByPreference - Which source the query's orderBy references, if any.
* Used as tiebreaker for inner joins so the orderBy collection is active
* and its loadSubset receives orderBy/limit for efficient server-side queries.
* @returns The active and lazy collections. They are undefined if we need to loop over both collections (i.e. both are active)
*/
function getActiveAndLazySources(
joinType: JoinClause[`type`],
leftCollection: Collection,
rightCollection: Collection,
orderByPreference?: `main` | `joined`,
):
| { activeSource: `main` | `joined`; lazySource: Collection }
| { activeSource: undefined; lazySource: undefined } {
Expand All @@ -631,12 +654,115 @@ function getActiveAndLazySources(
case `right`:
return { activeSource: `joined`, lazySource: leftCollection }
case `inner`:
// When the query has an orderBy that references one specific source,
// prefer that source as active. This ensures the active collection's
// loadSubset receives the orderBy (and limit when present), enabling
// efficient server-side sorting and pagination for on-demand collections.
// This is especially important because at compile time both collections
// are typically empty (size 0), making the size heuristic arbitrary.
if (orderByPreference === `main`) {
return { activeSource: `main`, lazySource: rightCollection }
}
if (orderByPreference === `joined`) {
return { activeSource: `joined`, lazySource: leftCollection }
}

// No orderBy preference: fall back to size heuristic.
// The smallest collection should be the active collection
// and the biggest collection should be lazy
// and the biggest collection should be lazy.
return leftCollection.size < rightCollection.size
? { activeSource: `main`, lazySource: rightCollection }
: { activeSource: `joined`, lazySource: leftCollection }
default:
return { activeSource: undefined, lazySource: undefined }
}
}

/**
* Determines which source (main or joined) the query's orderBy references.
* Returns undefined if there's no orderBy, or if the orderBy references
* neither source or both sources.
*/
function getOrderByActivePreference(
query: QueryIR,
mainAlias: string,
joinedAlias: string,
): `main` | `joined` | undefined {
if (!query.orderBy || query.orderBy.length === 0) {
return undefined
}

// Check which alias each orderBy clause references
let referencedAlias: string | undefined
for (const clause of query.orderBy) {
if (clause.expression.type !== `ref`) {
// Computed expression — can't determine source
return undefined
}
const path = clause.expression.path
if (!Array.isArray(path) || path.length < 2) {
// Single-element path — ambiguous (could be from the FROM clause)
// Default to main source (FROM) since that's where unqualified refs resolve
const alias = mainAlias
if (referencedAlias === undefined) {
referencedAlias = alias
} else if (referencedAlias !== alias) {
return undefined // Mixed aliases
}
continue
}
const alias = String(path[0])
if (referencedAlias === undefined) {
referencedAlias = alias
} else if (referencedAlias !== alias) {
// orderBy references multiple aliases — no clear preference
return undefined
}
}

if (referencedAlias === mainAlias) return `main`
if (referencedAlias === joinedAlias) return `joined`
return undefined
}

/**
* Extracts orderBy and limit from a query when they reference fields
* belonging to a specific alias. This enables the sync layer to optimize
* data fetching for lazy-loaded collections in joins (e.g., server-side
* sorting and pagination).
*
* Returns undefined if the query has no orderBy or if the orderBy references
* fields from a different collection.
*/
function computeOrderByForAlias(
query: QueryIR,
alias: string,
): { orderBy: OrderBy; limit?: number } | undefined {
if (!query.orderBy || query.orderBy.length === 0) {
return undefined
}

const normalizedOrderBy = normalizeOrderByPaths(query.orderBy, alias)

// Only pass orderBy if ALL clauses reference fields from this alias.
// After normalization, paths belonging to the alias have length 1.
const allPathsAreLocal = normalizedOrderBy.every((clause) => {
const exp = clause.expression
return (
exp.type === `ref` && Array.isArray(exp.path) && exp.path.length === 1
)
})

if (!allPathsAreLocal) {
return undefined
}

const { limit, offset } = query
const effectiveLimit =
limit !== undefined && offset !== undefined ? limit + offset : limit

return {
orderBy: normalizedOrderBy,
limit: effectiveLimit,
}
}
Loading
Loading