diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2e23fef1da768..a1dffc2563e1a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -800,6 +800,27 @@ impl LogicalPlanBuilder { self, sorts: impl IntoIterator> + Clone, fetch: Option, + ) -> Result { + self.sort_with_limit_inner(sorts, fetch, false) + } + + /// Apply a sort with option to skip adding missing columns + /// + /// This is used by SELECT statements where missing ORDER BY columns are + /// already added by `add_missing_order_by_exprs`. + pub fn sort_with_limit_skip_missing( + self, + sorts: impl IntoIterator> + Clone, + fetch: Option, + ) -> Result { + self.sort_with_limit_inner(sorts, fetch, true) + } + + fn sort_with_limit_inner( + self, + sorts: impl IntoIterator> + Clone, + fetch: Option, + skip_add_missing_columns: bool, ) -> Result { let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?; @@ -820,7 +841,7 @@ impl LogicalPlanBuilder { Ok(()) })?; - if missing_cols.is_empty() { + if missing_cols.is_empty() || skip_add_missing_columns { return Ok(Self::new(LogicalPlan::Sort(Sort { expr: normalize_sorts(sorts, &self.plan)?, input: self.plan, diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index faecfbcfecc05..a1d2caced5a54 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -15,12 +15,18 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; + use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; use datafusion_common::{ - Column, DFSchema, Result, not_impl_err, plan_datafusion_err, plan_err, + Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_datafusion_err, plan_err, }; use datafusion_expr::expr::Sort; use datafusion_expr::{Expr, SortExpr}; +use indexmap::IndexSet; use sqlparser::ast::{ Expr as SQLExpr, OrderByExpr, OrderByOptions, Value, ValueWithSpan, }; @@ -117,4 +123,294 @@ impl SqlToRel<'_, S> { Ok(sort_expr_vec) } + + /// Add missing ORDER BY expressions to the SELECT list. + /// + /// This function handles the case where ORDER BY expressions reference columns + /// or expressions that are not present in the SELECT list. Instead of traversing + /// the plan tree to find projection nodes, it directly adds the missing + /// expressions to the SELECT list. + /// + /// # Behavior + /// + /// - For aggregate functions (e.g., `SUM(x)`) and window functions, the original + /// expression is added to the SELECT list, and the ORDER BY expression is + /// replaced with a column reference to that expression's output name. + /// + /// - For column references that don't exist in the current schema, the column + /// reference itself is added to the SELECT list. + /// + /// - If the query uses `SELECT DISTINCT` and there are missing ORDER BY + /// expressions, an error is returned, as this would make the DISTINCT + /// operation ambiguous. + /// + /// - Aliases defined in the SELECT list are recognized and used to replace + /// the corresponding expressions in ORDER BY with column references. + /// + /// - When `strict` is true (e.g., when GROUP BY is present), ORDER BY + /// expressions must already be in the SELECT list, be an alias, or be an + /// aggregate/window function. Missing expressions will cause an error instead + /// of being added to the SELECT list. This preserves the error message + /// "Column in ORDER BY must be in GROUP BY" for invalid queries. + /// + /// # Arguments + /// + /// * `select_exprs` - Mutable reference to the SELECT expressions list. Missing + /// expressions will be added to this list (unless strict is true). + /// * `schema` - The schema of the projected plan, used to check if column + /// references exist. + /// * `distinct` - Whether the query uses `SELECT DISTINCT`. If true, missing + /// ORDER BY expressions will cause an error. + /// * `strict` - Whether to strictly validate ORDER BY expressions. If true, + /// missing expressions will cause an error instead of being added. + /// * `order_by` - Mutable slice of ORDER BY expressions. The expressions will + /// be rewritten to use column references where appropriate. + /// + /// # Returns + /// + /// * `Ok(true)` - If expressions were added to the SELECT list. + /// * `Ok(false)` - If no expressions needed to be added. + /// * `Err(...)` - If there's an error (e.g., DISTINCT with missing ORDER BY + /// expressions). + /// + /// # Example + /// + /// ```text + /// Input: SELECT x FROM foo ORDER BY y + /// + /// Before: select_exprs = [x] + /// order_by = [Sort { expr: Column(y), ... }] + /// + /// After: select_exprs = [x, y] + /// order_by = [Sort { expr: Column(y), ... }] + /// returns Ok(true) + /// ``` + pub(crate) fn add_missing_order_by_exprs( + select_exprs: &mut Vec, + schema: &DFSchemaRef, + distinct: bool, + strict: bool, + order_by: &mut [Sort], + ) -> Result { + add_missing_order_by_exprs_impl(select_exprs, schema, distinct, strict, order_by) + } +} + +/// Internal implementation of add_missing_order_by_exprs for testability. +fn add_missing_order_by_exprs_impl( + select_exprs: &mut Vec, + schema: &DFSchemaRef, + distinct: bool, + strict: bool, + order_by: &mut [Sort], +) -> Result { + let mut missing_exprs: IndexSet = IndexSet::new(); + + let mut aliases = HashMap::new(); + for expr in select_exprs.iter() { + if let Expr::Alias(alias) = expr { + aliases.insert(alias.expr.clone(), alias.name.clone()); + } + } + + let mut rewrite = |expr: Expr| { + if select_exprs.contains(&expr) { + return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)); + } + if let Some(alias) = aliases.get(&expr) { + return Ok(Transformed::new( + Expr::Column(Column::new_unqualified(alias.clone())), + false, + TreeNodeRecursion::Jump, + )); + } + match expr { + Expr::AggregateFunction(_) | Expr::WindowFunction(_) => { + let replaced = Expr::Column(Column::new_unqualified( + expr.schema_name().to_string(), + )); + missing_exprs.insert(expr); + Ok(Transformed::new(replaced, true, TreeNodeRecursion::Jump)) + } + Expr::Column(ref c) => { + if strict { + // In strict mode (e.g., GROUP BY present), the column must exist in schema + // If it doesn't exist and isn't in select_exprs, we'll error later + // Don't add it to missing_exprs to preserve proper error message + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } else if !schema.has_column(c) { + missing_exprs.insert(Expr::Column(c.clone())); + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } else { + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } + } + _ => Ok(Transformed::no(expr)), + } + }; + for sort in order_by.iter_mut() { + let expr = std::mem::take(&mut sort.expr); + sort.expr = expr.transform_down(&mut rewrite).data()?; + } + if !missing_exprs.is_empty() { + if distinct { + plan_err!( + "For SELECT DISTINCT, ORDER BY expressions {} must appear in select list", + missing_exprs[0] + ) + } else { + select_exprs.extend(missing_exprs); + Ok(true) + } + } else { + Ok(false) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::expr::Alias; + + fn create_test_schema() -> DFSchemaRef { + let fields = vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + ]; + DFSchemaRef::new( + DFSchema::from_unqualified_fields(fields.into(), HashMap::new()).unwrap(), + ) + } + + #[test] + fn test_add_missing_column_not_in_select() { + let schema = create_test_schema(); + let mut select_exprs = vec![col("a")]; + let mut order_by = vec![col("d").sort(true, false)]; // d is not in schema + + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + false, + false, + &mut order_by, + ); + + // d is not in schema, so it should be added + assert!(result.unwrap()); + assert_eq!(select_exprs.len(), 2); + assert!(select_exprs.contains(&col("a"))); + assert!(select_exprs.contains(&col("d"))); + } + + #[test] + fn test_no_missing_column_when_already_in_select() { + let schema = create_test_schema(); + let mut select_exprs = vec![col("a"), col("b")]; + let mut order_by = vec![col("b").sort(true, false)]; + + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + false, + false, + &mut order_by, + ); + + assert!(!result.unwrap()); + assert_eq!(select_exprs.len(), 2); + } + + #[test] + fn test_alias_resolution() { + let schema = create_test_schema(); + // SELECT a AS x, b + let mut select_exprs = vec![ + Expr::Alias(Alias::new(col("a"), None::<&str>, "x")), + col("b"), + ]; + // ORDER BY a (should be resolved to alias x) + let mut order_by = vec![col("a").sort(true, false)]; + + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + false, + false, + &mut order_by, + ); + + // No new expressions should be added (a is resolved to alias x) + assert!(!result.unwrap()); + // ORDER BY a should be replaced with Column(x) reference + assert_eq!(order_by[0].expr, col("x")); + } + + #[test] + fn test_distinct_with_missing_column_error() { + let schema = create_test_schema(); + // SELECT DISTINCT a + // ORDER BY d (d is not in select, not in schema) + let mut select_exprs = vec![col("a")]; + let mut order_by = vec![col("d").sort(true, false)]; + + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + true, // distinct = true + false, + &mut order_by, + ); + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("SELECT DISTINCT")); + assert!(err_msg.contains("must appear in select list")); + } + + #[test] + fn test_strict_mode_no_add() { + let schema = create_test_schema(); + let mut select_exprs = vec![col("a")]; + let mut order_by = vec![col("b").sort(true, false)]; + + // strict = true should NOT add missing columns + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + false, + true, // strict = true + &mut order_by, + ); + + assert!(!result.unwrap()); + assert_eq!(select_exprs.len(), 1); // b was not added + } + + #[test] + fn test_column_in_order_by_not_in_select_or_schema() { + let schema = create_test_schema(); + // SELECT a, b + // ORDER BY d - d is not in schema (would come from FROM clause in real scenario) + let mut select_exprs = vec![col("a"), col("b")]; + let mut order_by = vec![col("d").sort(true, false)]; + + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + false, + false, + &mut order_by, + ); + + // d should be added to select_exprs + assert!(result.unwrap()); + assert!(select_exprs.contains(&col("d"))); + } + + fn col(name: &str) -> Expr { + Expr::Column(Column::new_unqualified(name)) + } } diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 1b7bb856a592b..aca6238943b4c 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -94,7 +94,9 @@ impl SqlToRel<'_, S> { true, None, )?; - let plan = self.order_by(plan, order_by_rex)?; + // Pass false to skip_add_missing_columns because for non-SELECT set expressions + // (like UNION), we still need to use add_missing_columns in sort_with_limit + let plan = self.order_by(plan, order_by_rex, false)?; self.limit(plan, limit_clause, planner_context) } }?; @@ -134,7 +136,8 @@ impl SqlToRel<'_, S> { true, None, )?; - self.order_by(plan, sort_exprs) + // For pipe operator ORDER BY, use add_missing_columns behavior + self.order_by(plan, sort_exprs, false) } PipeOperator::Limit { expr, offset } => self.limit( plan, @@ -299,10 +302,15 @@ impl SqlToRel<'_, S> { } /// Wrap the logical in a sort + /// + /// If `skip_add_missing_columns` is true, the method will not try to add + /// missing columns to the input plan. This is used by SELECT statements + /// where missing ORDER BY columns are already added by `add_missing_order_by_exprs`. pub(super) fn order_by( &self, plan: LogicalPlan, order_by: Vec, + skip_add_missing_columns: bool, ) -> Result { if order_by.is_empty() { return Ok(plan); @@ -313,6 +321,10 @@ impl SqlToRel<'_, S> { // optimization we're effectively doing a `first_value` aggregation according to them. let distinct_on = distinct_on.clone().with_sort_expr(order_by)?; Ok(LogicalPlan::Distinct(Distinct::On(distinct_on))) + } else if skip_add_missing_columns { + LogicalPlanBuilder::from(plan) + .sort_with_limit_skip_missing(order_by, None)? + .build() } else { LogicalPlanBuilder::from(plan).sort(order_by)?.build() } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 1d6ccde6be13a..6bb77017768c9 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -110,11 +110,11 @@ impl SqlToRel<'_, S> { )?; // Having and group by clause may reference aliases defined in select projection - let projected_plan = self.project(base_plan.clone(), select_exprs)?; - let select_exprs = projected_plan.expressions(); + let projected_plan = self.project(base_plan.clone(), select_exprs.clone())?; + let projected_plan_exprs = projected_plan.expressions(); let order_by = - to_order_by_exprs_with_select(query_order_by, Some(&select_exprs))?; + to_order_by_exprs_with_select(query_order_by, Some(&projected_plan_exprs))?; // Place the fields of the base plan at the front so that when there are references // with the same name, the fields of the base plan will be searched first. @@ -131,11 +131,56 @@ impl SqlToRel<'_, S> { true, Some(base_plan.schema().as_ref()), )?; - let order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?; + let mut order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?; + + // Check if there are any Wildcards in select_exprs + let has_wildcard = select_exprs.iter().any(|e| { + matches!( + e, + SelectExpr::Wildcard(_) | SelectExpr::QualifiedWildcard(_, _) + ) + }); + + // Convert SelectExpr to Expr for add_missing_order_by_exprs + // If there's a wildcard, we need to use the expanded expressions from the projected plan + // Otherwise, we filter out non-Expression items + let mut projected_select_exprs: Vec = if has_wildcard { + projected_plan.expressions() + } else { + select_exprs + .iter() + .filter_map(|e| match e { + SelectExpr::Expression(expr) => Some(expr.clone()), + _ => None, + }) + .collect() + }; - // This alias map is resolved and looked up in both having exprs and group by exprs - let alias_map = extract_aliases(&select_exprs); + // Check if we need strict mode: GROUP BY present or aggregates in SELECT/ORDER BY + // Note: GroupByExpr::Expressions(vec![], _) means no GROUP BY, so we need to check + // if the expressions list is non-empty + let has_group_by = match &select.group_by { + GroupByExpr::Expressions(exprs, _) => !exprs.is_empty(), + GroupByExpr::All(_) => true, + }; + let has_aggregates = { + let select_aggrs = find_aggregate_exprs(projected_select_exprs.iter()); + let order_by_aggrs = + find_aggregate_exprs(order_by_rex.iter().map(|s| &s.expr)); + !select_aggrs.is_empty() || !order_by_aggrs.is_empty() + }; + let strict = has_group_by || has_aggregates; + + let added = Self::add_missing_order_by_exprs( + &mut projected_select_exprs, + projected_plan.schema(), + matches!(select.distinct, Some(Distinct::Distinct)), + strict, + &mut order_by_rex, + )?; + // This alias map is resolved and looked up in both having exprs and group by exprs + let alias_map = extract_aliases(&projected_select_exprs); // Optionally the HAVING expression. let having_expr_opt = select .having @@ -181,8 +226,10 @@ impl SqlToRel<'_, S> { } let group_by_expr = resolve_aliases_to_exprs(group_by_expr, &alias_map)?; - let group_by_expr = - resolve_positions_to_exprs(group_by_expr, &select_exprs)?; + let group_by_expr = resolve_positions_to_exprs( + group_by_expr, + &projected_select_exprs, + )?; let group_by_expr = normalize_col(group_by_expr, &projected_plan)?; self.validate_schema_satisfies_exprs( base_plan.schema(), @@ -194,7 +241,7 @@ impl SqlToRel<'_, S> { } else { // 'group by all' groups wrt. all select expressions except 'AggregateFunction's. // Filter and collect non-aggregate select expressions - select_exprs + projected_select_exprs .iter() .filter(|select_expr| match select_expr { Expr::AggregateFunction(_) => false, @@ -237,7 +284,7 @@ impl SqlToRel<'_, S> { // The outer expressions we will search through for aggregates. // First, find aggregates in SELECT, HAVING, and QUALIFY let select_having_qualify_aggrs = find_aggregate_exprs( - select_exprs + projected_select_exprs .iter() .chain(having_expr_opt.iter()) .chain(qualify_expr_opt.iter()), @@ -265,7 +312,7 @@ impl SqlToRel<'_, S> { } = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() { self.aggregate( &base_plan, - &select_exprs, + projected_select_exprs.as_slice(), having_expr_opt.as_ref(), qualify_expr_opt.as_ref(), &order_by_rex, @@ -281,7 +328,7 @@ impl SqlToRel<'_, S> { } None => AggregatePlanResult { plan: base_plan.clone(), - select_exprs: select_exprs.clone(), + select_exprs: projected_select_exprs.clone(), having_expr: having_expr_opt, qualify_expr: qualify_expr_opt, order_by_exprs: order_by_rex, @@ -383,7 +430,7 @@ impl SqlToRel<'_, S> { // Build the final plan LogicalPlanBuilder::from(base_plan) - .distinct_on(on_expr, select_exprs, None)? + .distinct_on(on_expr, projected_select_exprs.clone(), None)? .build() } }?; @@ -408,8 +455,26 @@ impl SqlToRel<'_, S> { plan }; - let plan = self.order_by(plan, order_by_rex)?; - Ok(plan) + // For non-aggregate queries (no GROUP BY and no aggregates), we can skip + // add_missing_columns because add_missing_order_by_exprs has already added + // the missing columns. For aggregate queries, we still need add_missing_columns + // to handle complex cases like ORDER BY count(*). + let skip_add_missing = !strict; + let plan = self.order_by(plan, order_by_rex, skip_add_missing)?; + // if add missing columns, we MUST remove unused columns in project + if added { + LogicalPlanBuilder::from(plan) + .project( + projected_plan + .schema() + .columns() + .into_iter() + .map(Expr::Column), + )? + .build() + } else { + Ok(plan) + } } /// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection @@ -428,8 +493,13 @@ impl SqlToRel<'_, S> { .iter() .any(has_unnest_expr_recursively) { + // Explicitly convert Expr to SelectExpr to ensure all expressions are properly included + let select_exprs: Vec = intermediate_select_exprs + .into_iter() + .map(SelectExpr::Expression) + .collect(); return LogicalPlanBuilder::from(intermediate_plan) - .project(intermediate_select_exprs)? + .project(select_exprs)? .build(); } @@ -459,7 +529,12 @@ impl SqlToRel<'_, S> { // The original expr does not contain any unnest if i == 0 { return LogicalPlanBuilder::from(intermediate_plan) - .project(intermediate_select_exprs)? + .project( + intermediate_select_exprs + .iter() + .cloned() + .map(SelectExpr::Expression), + )? .build(); } break; diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 4717b843abb53..042a3dd51ae00 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -1984,7 +1984,7 @@ fn test_complex_order_by_with_grouping() -> Result<()> { }, { assert_snapshot!( sql, - @r#"SELECT j1.j1_id, j1.j1_string, lochierarchy FROM (SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS lochierarchy, grouping(j1.j1_string), grouping(j1.j1_id) FROM j1 GROUP BY ROLLUP (j1.j1_id, j1.j1_string)) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN (("grouping(j1.j1_id)" + "grouping(j1.j1_string)") = 0) THEN j1.j1_id END ASC NULLS LAST LIMIT 100"# + @r#"SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS lochierarchy FROM j1 GROUP BY ROLLUP (j1.j1_id, j1.j1_string) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN (lochierarchy = 0) THEN j1.j1_id END ASC NULLS LAST LIMIT 100"# ); });