From 514e58771cb961f62e901391fe66fc83c4cc4cd6 Mon Sep 17 00:00:00 2001 From: codedump Date: Sat, 24 Jan 2026 17:15:15 +0800 Subject: [PATCH 1/2] feat: do ambiguous_distinct_check in select --- datafusion/sql/src/select.rs | 101 +++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 1d6ccde6be13a..572f6fa60d2fe 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -136,6 +136,57 @@ impl SqlToRel<'_, S> { // This alias map is resolved and looked up in both having exprs and group by exprs let alias_map = extract_aliases(&select_exprs); + // Check if ORDER BY references any columns not in the SELECT list + // If DISTINCT is used, we need to verify this is acceptable + // This is similar to how HAVING is handled + let select_exprs = if select.distinct.is_some() && !order_by_rex.is_empty() { + let mut missing_order_by_exprs = Vec::new(); + let mut missing_cols = HashSet::new(); + + for sort_expr in &order_by_rex { + let order_by_expr = &sort_expr.expr; + + // Extract columns referenced in the ORDER BY expression + let mut order_by_cols = HashSet::new(); + if expr_to_columns(order_by_expr, &mut order_by_cols).is_ok() { + for col in order_by_cols { + // Check if this column is in the projected schema + if !projected_plan.schema().has_column(&col) { + // This column is not in the current projection + // Check if we can resolve it from the base_plan schema + if base_plan.schema().has_column(&col) { + missing_cols.insert(col.clone()); + if !missing_order_by_exprs + .iter() + .any(|e: &Expr| e == order_by_expr) + { + missing_order_by_exprs.push(order_by_expr.clone()); + } + } + } + } + } + } + + // If there are missing columns and DISTINCT is used, perform the ambiguous distinct check + if !missing_order_by_exprs.is_empty() { + // Perform the ambiguous distinct check - if it fails, we should return the error + // immediately, not add the columns to the select list + Self::ambiguous_distinct_check( + &missing_order_by_exprs, + &missing_cols, + &select_exprs, + )?; + // If we get here, the check passed (expressions are aliases or already in select list) + // so we should NOT add them again + select_exprs.to_vec() + } else { + select_exprs.to_vec() + } + } else { + select_exprs.to_vec() + }; + // Optionally the HAVING expression. let having_expr_opt = select .having @@ -412,6 +463,56 @@ impl SqlToRel<'_, S> { Ok(plan) } + /// Check if ORDER BY expressions with DISTINCT are ambiguous + /// + /// This function verifies that ORDER BY expressions only reference + /// columns that are either: + /// 1. Already in the SELECT list, or + /// 2. Aliases for expressions in the SELECT list + /// + /// If neither condition is met, it returns an error since this would + /// make the DISTINCT operation ambiguous. + fn ambiguous_distinct_check( + missing_exprs: &[Expr], + missing_cols: &HashSet, + projection_exprs: &[Expr], + ) -> Result<()> { + if missing_exprs.is_empty() { + return Ok(()); + } + + // If the missing columns are all only aliases for things in + // the existing select list, it is ok + // + // This handles the special case for: + // SELECT col as ORDER BY + // + // As described in https://github.com/apache/datafusion/issues/5293 + let all_aliases = missing_exprs.iter().all(|e| { + projection_exprs.iter().any(|proj_expr| { + if let Expr::Alias(Alias { expr, .. }) = proj_expr { + e == expr.as_ref() + } else { + false + } + }) + }); + if all_aliases { + return Ok(()); + } + + let missing_col_names = missing_cols + .iter() + .map(|col| col.flat_name()) + .collect::>() + .join(", "); + + plan_err!( + "For SELECT DISTINCT, ORDER BY expressions {} must appear in select list", + missing_col_names + ) + } + /// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection pub(super) fn try_process_unnest( &self, From 618a2894c63ab449e8f58d1bbfcda8d7d1dc094a Mon Sep 17 00:00:00 2001 From: codedump Date: Sat, 24 Jan 2026 22:26:22 +0800 Subject: [PATCH 2/2] refactor: unify SQL planning for ORDER BY, HAVING, DISTINCT, etc --- datafusion/sql/src/expr/order_by.rs | 139 ++++++++++++++++- datafusion/sql/src/select.rs | 181 +++++++++------------- datafusion/sql/tests/cases/plan_to_sql.rs | 2 +- 3 files changed, 208 insertions(+), 114 deletions(-) diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index faecfbcfecc05..85b5733e7358e 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -15,12 +15,18 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; + use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; use datafusion_common::{ - Column, DFSchema, Result, not_impl_err, plan_datafusion_err, plan_err, + Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_datafusion_err, plan_err, }; use datafusion_expr::expr::Sort; use datafusion_expr::{Expr, SortExpr}; +use indexmap::IndexSet; use sqlparser::ast::{ Expr as SQLExpr, OrderByExpr, OrderByOptions, Value, ValueWithSpan, }; @@ -117,4 +123,135 @@ impl SqlToRel<'_, S> { Ok(sort_expr_vec) } + + /// Add missing ORDER BY expressions to the SELECT list. + /// + /// This function handles the case where ORDER BY expressions reference columns + /// or expressions that are not present in the SELECT list. Instead of traversing + /// the plan tree to find projection nodes, it directly adds the missing + /// expressions to the SELECT list. + /// + /// # Behavior + /// + /// - For aggregate functions (e.g., `SUM(x)`) and window functions, the original + /// expression is added to the SELECT list, and the ORDER BY expression is + /// replaced with a column reference to that expression's output name. + /// + /// - For column references that don't exist in the current schema, the column + /// reference itself is added to the SELECT list. + /// + /// - If the query uses `SELECT DISTINCT` and there are missing ORDER BY + /// expressions, an error is returned, as this would make the DISTINCT + /// operation ambiguous. + /// + /// - Aliases defined in the SELECT list are recognized and used to replace + /// the corresponding expressions in ORDER BY with column references. + /// + /// - When `strict` is true (e.g., when GROUP BY is present), ORDER BY + /// expressions must already be in the SELECT list, be an alias, or be an + /// aggregate/window function. Missing expressions will cause an error instead + /// of being added to the SELECT list. This preserves the error message + /// "Column in ORDER BY must be in GROUP BY" for invalid queries. + /// + /// # Arguments + /// + /// * `select_exprs` - Mutable reference to the SELECT expressions list. Missing + /// expressions will be added to this list (unless strict is true). + /// * `schema` - The schema of the projected plan, used to check if column + /// references exist. + /// * `distinct` - Whether the query uses `SELECT DISTINCT`. If true, missing + /// ORDER BY expressions will cause an error. + /// * `strict` - Whether to strictly validate ORDER BY expressions. If true, + /// missing expressions will cause an error instead of being added. + /// * `order_by` - Mutable slice of ORDER BY expressions. The expressions will + /// be rewritten to use column references where appropriate. + /// + /// # Returns + /// + /// * `Ok(true)` - If expressions were added to the SELECT list. + /// * `Ok(false)` - If no expressions needed to be added. + /// * `Err(...)` - If there's an error (e.g., DISTINCT with missing ORDER BY + /// expressions). + /// + /// # Example + /// + /// ```text + /// Input: SELECT x FROM foo ORDER BY y + /// + /// Before: select_exprs = [x] + /// order_by = [Sort { expr: Column(y), ... }] + /// + /// After: select_exprs = [x, y] + /// order_by = [Sort { expr: Column(y), ... }] + /// returns Ok(true) + /// ``` + pub(crate) fn add_missing_order_by_exprs( + select_exprs: &mut Vec, + schema: &DFSchemaRef, + distinct: bool, + strict: bool, + order_by: &mut [Sort], + ) -> Result { + let mut missing_exprs: IndexSet = IndexSet::new(); + + let mut aliases = HashMap::new(); + for expr in select_exprs.iter() { + if let Expr::Alias(alias) = expr { + aliases.insert(alias.expr.clone(), alias.name.clone()); + } + } + + let mut rewrite = |expr: Expr| { + if select_exprs.contains(&expr) { + return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)); + } + if let Some(alias) = aliases.get(&expr) { + return Ok(Transformed::new( + Expr::Column(Column::new_unqualified(alias.clone())), + false, + TreeNodeRecursion::Jump, + )); + } + match expr { + Expr::AggregateFunction(_) | Expr::WindowFunction(_) => { + let replaced = Expr::Column(Column::new_unqualified( + expr.schema_name().to_string(), + )); + missing_exprs.insert(expr); + Ok(Transformed::new(replaced, true, TreeNodeRecursion::Jump)) + } + Expr::Column(ref c) => { + if strict { + // In strict mode (e.g., GROUP BY present), the column must exist in schema + // If it doesn't exist and isn't in select_exprs, we'll error later + // Don't add it to missing_exprs to preserve proper error message + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } else if !schema.has_column(c) { + missing_exprs.insert(Expr::Column(c.clone())); + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } else { + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } + } + _ => Ok(Transformed::no(expr)), + } + }; + for sort in order_by.iter_mut() { + let expr = std::mem::take(&mut sort.expr); + sort.expr = expr.transform_down(&mut rewrite).data()?; + } + if !missing_exprs.is_empty() { + if distinct { + plan_err!( + "For SELECT DISTINCT, ORDER BY expressions {} must appear in select list", + missing_exprs[0] + ) + } else { + select_exprs.extend(missing_exprs); + Ok(true) + } + } else { + Ok(false) + } + } } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 572f6fa60d2fe..aad9ce7acd19c 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -110,11 +110,11 @@ impl SqlToRel<'_, S> { )?; // Having and group by clause may reference aliases defined in select projection - let projected_plan = self.project(base_plan.clone(), select_exprs)?; - let select_exprs = projected_plan.expressions(); + let projected_plan = self.project(base_plan.clone(), select_exprs.clone())?; + let projected_plan_exprs = projected_plan.expressions(); let order_by = - to_order_by_exprs_with_select(query_order_by, Some(&select_exprs))?; + to_order_by_exprs_with_select(query_order_by, Some(&projected_plan_exprs))?; // Place the fields of the base plan at the front so that when there are references // with the same name, the fields of the base plan will be searched first. @@ -131,62 +131,54 @@ impl SqlToRel<'_, S> { true, Some(base_plan.schema().as_ref()), )?; - let order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?; - - // This alias map is resolved and looked up in both having exprs and group by exprs - let alias_map = extract_aliases(&select_exprs); - - // Check if ORDER BY references any columns not in the SELECT list - // If DISTINCT is used, we need to verify this is acceptable - // This is similar to how HAVING is handled - let select_exprs = if select.distinct.is_some() && !order_by_rex.is_empty() { - let mut missing_order_by_exprs = Vec::new(); - let mut missing_cols = HashSet::new(); - - for sort_expr in &order_by_rex { - let order_by_expr = &sort_expr.expr; - - // Extract columns referenced in the ORDER BY expression - let mut order_by_cols = HashSet::new(); - if expr_to_columns(order_by_expr, &mut order_by_cols).is_ok() { - for col in order_by_cols { - // Check if this column is in the projected schema - if !projected_plan.schema().has_column(&col) { - // This column is not in the current projection - // Check if we can resolve it from the base_plan schema - if base_plan.schema().has_column(&col) { - missing_cols.insert(col.clone()); - if !missing_order_by_exprs - .iter() - .any(|e: &Expr| e == order_by_expr) - { - missing_order_by_exprs.push(order_by_expr.clone()); - } - } - } - } - } - } + let mut order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?; + + // Check if there are any Wildcards in select_exprs + let has_wildcard = select_exprs.iter().any(|e| { + matches!( + e, + SelectExpr::Wildcard(_) | SelectExpr::QualifiedWildcard(_, _) + ) + }); - // If there are missing columns and DISTINCT is used, perform the ambiguous distinct check - if !missing_order_by_exprs.is_empty() { - // Perform the ambiguous distinct check - if it fails, we should return the error - // immediately, not add the columns to the select list - Self::ambiguous_distinct_check( - &missing_order_by_exprs, - &missing_cols, - &select_exprs, - )?; - // If we get here, the check passed (expressions are aliases or already in select list) - // so we should NOT add them again - select_exprs.to_vec() - } else { - select_exprs.to_vec() - } + // Convert SelectExpr to Expr for add_missing_order_by_exprs + // If there's a wildcard, we need to use the expanded expressions from the projected plan + // Otherwise, we filter out non-Expression items + let mut projected_select_exprs: Vec = if has_wildcard { + projected_plan.expressions() } else { - select_exprs.to_vec() + select_exprs + .iter() + .filter_map(|e| match e { + SelectExpr::Expression(expr) => Some(expr.clone()), + _ => None, + }) + .collect() }; + // Check if we need strict mode: GROUP BY present or aggregates in SELECT/ORDER BY + let has_group_by = matches!( + select.group_by, + GroupByExpr::Expressions(_, _) | GroupByExpr::All(_) + ); + let has_aggregates = { + let select_aggrs = find_aggregate_exprs(projected_select_exprs.iter()); + let order_by_aggrs = + find_aggregate_exprs(order_by_rex.iter().map(|s| &s.expr)); + !select_aggrs.is_empty() || !order_by_aggrs.is_empty() + }; + let strict = has_group_by || has_aggregates; + + let added = Self::add_missing_order_by_exprs( + &mut projected_select_exprs, + projected_plan.schema(), + matches!(select.distinct, Some(Distinct::Distinct)), + strict, + &mut order_by_rex, + )?; + + // This alias map is resolved and looked up in both having exprs and group by exprs + let alias_map = extract_aliases(&projected_select_exprs); // Optionally the HAVING expression. let having_expr_opt = select .having @@ -232,8 +224,10 @@ impl SqlToRel<'_, S> { } let group_by_expr = resolve_aliases_to_exprs(group_by_expr, &alias_map)?; - let group_by_expr = - resolve_positions_to_exprs(group_by_expr, &select_exprs)?; + let group_by_expr = resolve_positions_to_exprs( + group_by_expr, + &projected_select_exprs, + )?; let group_by_expr = normalize_col(group_by_expr, &projected_plan)?; self.validate_schema_satisfies_exprs( base_plan.schema(), @@ -245,7 +239,7 @@ impl SqlToRel<'_, S> { } else { // 'group by all' groups wrt. all select expressions except 'AggregateFunction's. // Filter and collect non-aggregate select expressions - select_exprs + projected_select_exprs .iter() .filter(|select_expr| match select_expr { Expr::AggregateFunction(_) => false, @@ -288,7 +282,7 @@ impl SqlToRel<'_, S> { // The outer expressions we will search through for aggregates. // First, find aggregates in SELECT, HAVING, and QUALIFY let select_having_qualify_aggrs = find_aggregate_exprs( - select_exprs + projected_select_exprs .iter() .chain(having_expr_opt.iter()) .chain(qualify_expr_opt.iter()), @@ -316,7 +310,7 @@ impl SqlToRel<'_, S> { } = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() { self.aggregate( &base_plan, - &select_exprs, + projected_select_exprs.as_slice(), having_expr_opt.as_ref(), qualify_expr_opt.as_ref(), &order_by_rex, @@ -332,7 +326,7 @@ impl SqlToRel<'_, S> { } None => AggregatePlanResult { plan: base_plan.clone(), - select_exprs: select_exprs.clone(), + select_exprs: projected_select_exprs.clone(), having_expr: having_expr_opt, qualify_expr: qualify_expr_opt, order_by_exprs: order_by_rex, @@ -434,7 +428,7 @@ impl SqlToRel<'_, S> { // Build the final plan LogicalPlanBuilder::from(base_plan) - .distinct_on(on_expr, select_exprs, None)? + .distinct_on(on_expr, projected_select_exprs.clone(), None)? .build() } }?; @@ -460,57 +454,20 @@ impl SqlToRel<'_, S> { }; let plan = self.order_by(plan, order_by_rex)?; - Ok(plan) - } - - /// Check if ORDER BY expressions with DISTINCT are ambiguous - /// - /// This function verifies that ORDER BY expressions only reference - /// columns that are either: - /// 1. Already in the SELECT list, or - /// 2. Aliases for expressions in the SELECT list - /// - /// If neither condition is met, it returns an error since this would - /// make the DISTINCT operation ambiguous. - fn ambiguous_distinct_check( - missing_exprs: &[Expr], - missing_cols: &HashSet, - projection_exprs: &[Expr], - ) -> Result<()> { - if missing_exprs.is_empty() { - return Ok(()); - } - - // If the missing columns are all only aliases for things in - // the existing select list, it is ok - // - // This handles the special case for: - // SELECT col as ORDER BY - // - // As described in https://github.com/apache/datafusion/issues/5293 - let all_aliases = missing_exprs.iter().all(|e| { - projection_exprs.iter().any(|proj_expr| { - if let Expr::Alias(Alias { expr, .. }) = proj_expr { - e == expr.as_ref() - } else { - false - } - }) - }); - if all_aliases { - return Ok(()); + // if add missing columns, we MUST remove unused columns in project + if added { + LogicalPlanBuilder::from(plan) + .project( + projected_plan + .schema() + .columns() + .into_iter() + .map(Expr::Column), + )? + .build() + } else { + Ok(plan) } - - let missing_col_names = missing_cols - .iter() - .map(|col| col.flat_name()) - .collect::>() - .join(", "); - - plan_err!( - "For SELECT DISTINCT, ORDER BY expressions {} must appear in select list", - missing_col_names - ) } /// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 4717b843abb53..042a3dd51ae00 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -1984,7 +1984,7 @@ fn test_complex_order_by_with_grouping() -> Result<()> { }, { assert_snapshot!( sql, - @r#"SELECT j1.j1_id, j1.j1_string, lochierarchy FROM (SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS lochierarchy, grouping(j1.j1_string), grouping(j1.j1_id) FROM j1 GROUP BY ROLLUP (j1.j1_id, j1.j1_string)) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN (("grouping(j1.j1_id)" + "grouping(j1.j1_string)") = 0) THEN j1.j1_id END ASC NULLS LAST LIMIT 100"# + @r#"SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS lochierarchy FROM j1 GROUP BY ROLLUP (j1.j1_id, j1.j1_string) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN (lochierarchy = 0) THEN j1.j1_id END ASC NULLS LAST LIMIT 100"# ); });