Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 138 additions & 1 deletion datafusion/sql/src/expr/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{
Column, DFSchema, Result, not_impl_err, plan_datafusion_err, plan_err,
Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_datafusion_err, plan_err,
};
use datafusion_expr::expr::Sort;
use datafusion_expr::{Expr, SortExpr};
use indexmap::IndexSet;
use sqlparser::ast::{
Expr as SQLExpr, OrderByExpr, OrderByOptions, Value, ValueWithSpan,
};
Expand Down Expand Up @@ -117,4 +123,135 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

Ok(sort_expr_vec)
}

/// Add missing ORDER BY expressions to the SELECT list.
///
/// This function handles the case where ORDER BY expressions reference columns
/// or expressions that are not present in the SELECT list. Instead of traversing
/// the plan tree to find projection nodes, it directly adds the missing
/// expressions to the SELECT list.
///
/// # Behavior
///
/// - For aggregate functions (e.g., `SUM(x)`) and window functions, the original
/// expression is added to the SELECT list, and the ORDER BY expression is
/// replaced with a column reference to that expression's output name.
///
/// - For column references that don't exist in the current schema, the column
/// reference itself is added to the SELECT list.
///
/// - If the query uses `SELECT DISTINCT` and there are missing ORDER BY
/// expressions, an error is returned, as this would make the DISTINCT
/// operation ambiguous.
///
/// - Aliases defined in the SELECT list are recognized and used to replace
/// the corresponding expressions in ORDER BY with column references.
///
/// - When `strict` is true (e.g., when GROUP BY is present), ORDER BY
/// expressions must already be in the SELECT list, be an alias, or be an
/// aggregate/window function. Missing expressions will cause an error instead
/// of being added to the SELECT list. This preserves the error message
/// "Column in ORDER BY must be in GROUP BY" for invalid queries.
///
/// # Arguments
///
/// * `select_exprs` - Mutable reference to the SELECT expressions list. Missing
/// expressions will be added to this list (unless strict is true).
/// * `schema` - The schema of the projected plan, used to check if column
/// references exist.
/// * `distinct` - Whether the query uses `SELECT DISTINCT`. If true, missing
/// ORDER BY expressions will cause an error.
/// * `strict` - Whether to strictly validate ORDER BY expressions. If true,
/// missing expressions will cause an error instead of being added.
/// * `order_by` - Mutable slice of ORDER BY expressions. The expressions will
/// be rewritten to use column references where appropriate.
///
/// # Returns
///
/// * `Ok(true)` - If expressions were added to the SELECT list.
/// * `Ok(false)` - If no expressions needed to be added.
/// * `Err(...)` - If there's an error (e.g., DISTINCT with missing ORDER BY
/// expressions).
///
/// # Example
///
/// ```text
/// Input: SELECT x FROM foo ORDER BY y
///
/// Before: select_exprs = [x]
/// order_by = [Sort { expr: Column(y), ... }]
///
/// After: select_exprs = [x, y]
/// order_by = [Sort { expr: Column(y), ... }]
/// returns Ok(true)
/// ```
pub(crate) fn add_missing_order_by_exprs(
select_exprs: &mut Vec<Expr>,
schema: &DFSchemaRef,
distinct: bool,
strict: bool,
order_by: &mut [Sort],
) -> Result<bool> {
let mut missing_exprs: IndexSet<Expr> = IndexSet::new();

let mut aliases = HashMap::new();
for expr in select_exprs.iter() {
if let Expr::Alias(alias) = expr {
aliases.insert(alias.expr.clone(), alias.name.clone());
}
}

let mut rewrite = |expr: Expr| {
if select_exprs.contains(&expr) {
return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump));
}
if let Some(alias) = aliases.get(&expr) {
return Ok(Transformed::new(
Expr::Column(Column::new_unqualified(alias.clone())),
false,
TreeNodeRecursion::Jump,
));
}
match expr {
Expr::AggregateFunction(_) | Expr::WindowFunction(_) => {
let replaced = Expr::Column(Column::new_unqualified(
expr.schema_name().to_string(),
));
missing_exprs.insert(expr);
Ok(Transformed::new(replaced, true, TreeNodeRecursion::Jump))
}
Expr::Column(ref c) => {
if strict {
// In strict mode (e.g., GROUP BY present), the column must exist in schema
// If it doesn't exist and isn't in select_exprs, we'll error later
// Don't add it to missing_exprs to preserve proper error message
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
} else if !schema.has_column(c) {
missing_exprs.insert(Expr::Column(c.clone()));
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
} else {
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
}
}
_ => Ok(Transformed::no(expr)),
}
};
for sort in order_by.iter_mut() {
let expr = std::mem::take(&mut sort.expr);
sort.expr = expr.transform_down(&mut rewrite).data()?;
}
if !missing_exprs.is_empty() {
if distinct {
plan_err!(
"For SELECT DISTINCT, ORDER BY expressions {} must appear in select list",
missing_exprs[0]
)
} else {
select_exprs.extend(missing_exprs);
Ok(true)
}
} else {
Ok(false)
}
}
}
86 changes: 72 additions & 14 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
)?;

// Having and group by clause may reference aliases defined in select projection
let projected_plan = self.project(base_plan.clone(), select_exprs)?;
let select_exprs = projected_plan.expressions();
let projected_plan = self.project(base_plan.clone(), select_exprs.clone())?;

let projected_plan_exprs = projected_plan.expressions();
let order_by =
to_order_by_exprs_with_select(query_order_by, Some(&select_exprs))?;
to_order_by_exprs_with_select(query_order_by, Some(&projected_plan_exprs))?;

// Place the fields of the base plan at the front so that when there are references
// with the same name, the fields of the base plan will be searched first.
Expand All @@ -131,11 +131,54 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
true,
Some(base_plan.schema().as_ref()),
)?;
let order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?;
let mut order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?;

// Check if there are any Wildcards in select_exprs
let has_wildcard = select_exprs.iter().any(|e| {
matches!(
e,
SelectExpr::Wildcard(_) | SelectExpr::QualifiedWildcard(_, _)
)
});

// Convert SelectExpr to Expr for add_missing_order_by_exprs
// If there's a wildcard, we need to use the expanded expressions from the projected plan
// Otherwise, we filter out non-Expression items
let mut projected_select_exprs: Vec<Expr> = if has_wildcard {
projected_plan.expressions()
} else {
select_exprs
.iter()
.filter_map(|e| match e {
SelectExpr::Expression(expr) => Some(expr.clone()),
_ => None,
})
.collect()
};

// This alias map is resolved and looked up in both having exprs and group by exprs
let alias_map = extract_aliases(&select_exprs);
// Check if we need strict mode: GROUP BY present or aggregates in SELECT/ORDER BY
let has_group_by = matches!(
select.group_by,
GroupByExpr::Expressions(_, _) | GroupByExpr::All(_)
);
let has_aggregates = {
let select_aggrs = find_aggregate_exprs(projected_select_exprs.iter());
let order_by_aggrs =
find_aggregate_exprs(order_by_rex.iter().map(|s| &s.expr));
!select_aggrs.is_empty() || !order_by_aggrs.is_empty()
};
let strict = has_group_by || has_aggregates;

let added = Self::add_missing_order_by_exprs(
&mut projected_select_exprs,
projected_plan.schema(),
matches!(select.distinct, Some(Distinct::Distinct)),
strict,
&mut order_by_rex,
)?;

// This alias map is resolved and looked up in both having exprs and group by exprs
let alias_map = extract_aliases(&projected_select_exprs);
// Optionally the HAVING expression.
let having_expr_opt = select
.having
Expand Down Expand Up @@ -181,8 +224,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
let group_by_expr =
resolve_aliases_to_exprs(group_by_expr, &alias_map)?;
let group_by_expr =
resolve_positions_to_exprs(group_by_expr, &select_exprs)?;
let group_by_expr = resolve_positions_to_exprs(
group_by_expr,
&projected_select_exprs,
)?;
let group_by_expr = normalize_col(group_by_expr, &projected_plan)?;
self.validate_schema_satisfies_exprs(
base_plan.schema(),
Expand All @@ -194,7 +239,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
} else {
// 'group by all' groups wrt. all select expressions except 'AggregateFunction's.
// Filter and collect non-aggregate select expressions
select_exprs
projected_select_exprs
.iter()
.filter(|select_expr| match select_expr {
Expr::AggregateFunction(_) => false,
Expand Down Expand Up @@ -237,7 +282,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
// The outer expressions we will search through for aggregates.
// First, find aggregates in SELECT, HAVING, and QUALIFY
let select_having_qualify_aggrs = find_aggregate_exprs(
select_exprs
projected_select_exprs
.iter()
.chain(having_expr_opt.iter())
.chain(qualify_expr_opt.iter()),
Expand Down Expand Up @@ -265,7 +310,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
} = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() {
self.aggregate(
&base_plan,
&select_exprs,
projected_select_exprs.as_slice(),
having_expr_opt.as_ref(),
qualify_expr_opt.as_ref(),
&order_by_rex,
Expand All @@ -281,7 +326,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
None => AggregatePlanResult {
plan: base_plan.clone(),
select_exprs: select_exprs.clone(),
select_exprs: projected_select_exprs.clone(),
having_expr: having_expr_opt,
qualify_expr: qualify_expr_opt,
order_by_exprs: order_by_rex,
Expand Down Expand Up @@ -383,7 +428,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

// Build the final plan
LogicalPlanBuilder::from(base_plan)
.distinct_on(on_expr, select_exprs, None)?
.distinct_on(on_expr, projected_select_exprs.clone(), None)?
.build()
}
}?;
Expand All @@ -409,7 +454,20 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
};

let plan = self.order_by(plan, order_by_rex)?;
Ok(plan)
// if add missing columns, we MUST remove unused columns in project
if added {
LogicalPlanBuilder::from(plan)
.project(
projected_plan
.schema()
.columns()
.into_iter()
.map(Expr::Column),
)?
.build()
} else {
Ok(plan)
}
}

/// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1984,7 +1984,7 @@ fn test_complex_order_by_with_grouping() -> Result<()> {
}, {
assert_snapshot!(
sql,
@r#"SELECT j1.j1_id, j1.j1_string, lochierarchy FROM (SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS lochierarchy, grouping(j1.j1_string), grouping(j1.j1_id) FROM j1 GROUP BY ROLLUP (j1.j1_id, j1.j1_string)) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN (("grouping(j1.j1_id)" + "grouping(j1.j1_string)") = 0) THEN j1.j1_id END ASC NULLS LAST LIMIT 100"#
@r#"SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS lochierarchy FROM j1 GROUP BY ROLLUP (j1.j1_id, j1.j1_string) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN (lochierarchy = 0) THEN j1.j1_id END ASC NULLS LAST LIMIT 100"#
);
});

Expand Down