diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b4fb44f670e8d..1b51c78adcf45 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -785,7 +785,7 @@ impl DefaultPhysicalPlanner { // We pass the filters and let the provider handle the projection let filters = extract_dml_filters(input, table_name)?; // Extract assignments from the projection in input plan - let assignments = extract_update_assignments(input)?; + let assignments = extract_update_assignments(input, table_name)?; provider .table_provider .update(session_state, assignments, filters) @@ -2235,7 +2235,10 @@ fn strip_column_qualifiers(expr: Expr) -> Result { /// over the source table. This function extracts column name and expression pairs /// from the projection. Column qualifiers are stripped from the expressions. /// -fn extract_update_assignments(input: &Arc) -> Result> { +fn extract_update_assignments( + input: &Arc, + _target_table: &TableReference, +) -> Result> { // The UPDATE input plan structure is: // Projection(updated columns as expressions with aliases) // Filter(optional WHERE clause) @@ -3115,7 +3118,8 @@ mod tests { use crate::execution::session_state::SessionStateBuilder; use arrow::array::{ArrayRef, DictionaryArray, Int32Array}; - use arrow::datatypes::{DataType, Field, Int32Type}; + use arrow::datatypes::{DataType, Field, Int32Type, Schema}; + use arrow::record_batch::RecordBatch; use arrow_schema::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{ @@ -3125,13 +3129,79 @@ mod tests { use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::builder::subquery_alias; use datafusion_expr::{ - LogicalPlanBuilder, TableSource, UserDefinedLogicalNodeCore, col, lit, + DmlStatement, LogicalPlanBuilder, TableSource, UserDefinedLogicalNodeCore, + WriteOp, col, lit, }; use datafusion_functions_aggregate::count::count_all; use datafusion_functions_aggregate::expr_fn::sum; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; + async fn make_update_from_plan( + sql: &str, + ) -> Result<(Arc, TableReference)> { + let ctx = SessionContext::new(); + let t1_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), + ])); + let t2_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), + ])); + let t1 = MemTable::try_new( + Arc::clone(&t1_schema), + vec![vec![RecordBatch::new_empty(Arc::clone(&t1_schema))]], + )?; + let t2 = MemTable::try_new( + Arc::clone(&t2_schema), + vec![vec![RecordBatch::new_empty(Arc::clone(&t2_schema))]], + )?; + ctx.register_table("t1", Arc::new(t1))?; + ctx.register_table("t2", Arc::new(t2))?; + + let plan = ctx.sql(sql).await?.into_unoptimized_plan(); + match plan { + LogicalPlan::Dml(DmlStatement { + table_name, + op: WriteOp::Update, + input, + .. + }) => Ok((input, table_name)), + other => internal_err!("Expected UPDATE DML plan, got: {other}"), + } + } + + async fn make_delete_plan(sql: &str) -> Result<(Arc, TableReference)> { + let ctx = SessionContext::new(); + let t1_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), + ])); + let t1 = MemTable::try_new( + Arc::clone(&t1_schema), + vec![vec![RecordBatch::new_empty(Arc::clone(&t1_schema))]], + )?; + ctx.register_table("t1", Arc::new(t1))?; + + let plan = ctx.sql(sql).await?.into_unoptimized_plan(); + match plan { + LogicalPlan::Dml(DmlStatement { + table_name, + op: WriteOp::Delete, + input, + .. + }) => Ok((input, table_name)), + other => internal_err!("Expected DELETE DML plan, got: {other}"), + } + } + fn make_session_state() -> SessionState { let runtime = Arc::new(RuntimeEnv::default()); let config = SessionConfig::new().with_target_partitions(4); @@ -3418,6 +3488,159 @@ mod tests { Ok(()) } + #[tokio::test] + #[ignore = "TODO(19950): enable once the implementation PR lands"] + async fn test_extract_update_assignments_preserves_source_qualifiers_for_update_from() + -> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let (input, table_name) = make_update_from_plan( + "UPDATE t1 AS dst \ + SET b = src.b, d = src.d \ + FROM t2 AS src \ + WHERE dst.a = src.a", + ) + .await?; + + let assignments = extract_update_assignments(&input, &table_name)?; + let b_expr = assignments + .iter() + .find(|(name, _)| name == "b") + .map(|(_, expr)| expr.to_string()) + .ok_or_else(|| { + internal_datafusion_err!("Expected assignment for target column b") + })?; + let d_expr = assignments + .iter() + .find(|(name, _)| name == "d") + .map(|(_, expr)| expr.to_string()) + .ok_or_else(|| { + internal_datafusion_err!("Expected assignment for target column d") + })?; + + assert!( + b_expr.contains("src.b"), + "Unexpected b assignment: {b_expr}" + ); + assert!( + d_expr.contains("src.d"), + "Unexpected d assignment: {d_expr}" + ); + assert!( + assignments.iter().all(|(name, _)| name != "a"), + "Identity target columns should not be extracted as assignments" + ); + + Ok(()) + } + + #[tokio::test] + #[ignore = "TODO(19950): enable once the implementation PR lands"] + async fn test_extract_update_assignments_strips_target_qualifiers_single_table() + -> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let (input, table_name) = + make_update_from_plan("UPDATE t1 AS dst SET d = dst.d + 1 WHERE dst.a > 0") + .await?; + + let assignments = extract_update_assignments(&input, &table_name)?; + let d_expr = assignments + .iter() + .find(|(name, _)| name == "d") + .map(|(_, expr)| expr.to_string()) + .ok_or_else(|| { + internal_datafusion_err!("Expected assignment for target column d") + })?; + + assert!( + !d_expr.contains("dst."), + "Single-table assignment should not keep target qualifiers: {d_expr}" + ); + assert!( + d_expr.contains("d"), + "Unexpected assignment expression: {d_expr}" + ); + + Ok(()) + } + + #[tokio::test] + #[ignore = "TODO(19950): enable once the implementation PR lands"] + async fn test_extract_update_assignments_preserves_self_join_source_aliases() + -> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let (input, table_name) = make_update_from_plan( + "UPDATE t1 AS dst \ + SET b = src.b, d = src.d \ + FROM t1 AS src \ + WHERE dst.a = src.a + 1", + ) + .await?; + + let assignments = extract_update_assignments(&input, &table_name)?; + let b_expr = assignments + .iter() + .find(|(name, _)| name == "b") + .map(|(_, expr)| expr.to_string()) + .ok_or_else(|| { + internal_datafusion_err!("Expected assignment for target column b") + })?; + let d_expr = assignments + .iter() + .find(|(name, _)| name == "d") + .map(|(_, expr)| expr.to_string()) + .ok_or_else(|| { + internal_datafusion_err!("Expected assignment for target column d") + })?; + + assert!( + b_expr.contains("src.b"), + "Self-join source alias should be preserved: {b_expr}" + ); + assert!( + d_expr.contains("src.d"), + "Self-join source alias should be preserved: {d_expr}" + ); + + Ok(()) + } + + #[tokio::test] + #[ignore = "TODO(19950): enable once the implementation PR lands"] + async fn test_extract_dml_filters_delete_limit_without_where() -> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let (input, table_name) = make_delete_plan("DELETE FROM t1 LIMIT 10").await?; + + let filters = extract_dml_filters(&input, &table_name)?; + assert!( + filters.is_empty(), + "DELETE ... LIMIT without WHERE should not synthesize filters: {filters:?}" + ); + + Ok(()) + } + + #[tokio::test] + #[ignore = "TODO(19950): enable once the implementation PR lands"] + async fn test_extract_dml_filters_delete_where_limit() -> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let (input, table_name) = + make_delete_plan("DELETE FROM t1 WHERE a > 1 LIMIT 10").await?; + + let filters = extract_dml_filters(&input, &table_name)?; + assert_eq!( + filters.len(), + 1, + "Expected one target predicate from DELETE WHERE ... LIMIT" + ); + let rendered = filters[0].to_string(); + assert!( + rendered.starts_with("a > Int") && rendered.ends_with("(1)"), + "Unexpected rendered filter for DELETE WHERE ... LIMIT: {rendered}" + ); + + Ok(()) + } + #[tokio::test] async fn test_create_not() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]); diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 8c4bae5e98b36..76bbaa6b32e3f 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -20,8 +20,12 @@ use std::any::Any; use std::sync::{Arc, Mutex}; +use arrow::array::{Float64Array, Int32Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::assert_batches_eq; +use datafusion::datasource::MemTable; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::{SessionConfig, SessionContext}; @@ -308,6 +312,21 @@ fn test_schema() -> SchemaRef { ])) } +fn abcd_schema(extra: &[Field]) -> SchemaRef { + let mut fields = vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), + ]; + fields.extend_from_slice(extra); + Arc::new(Schema::new(fields)) +} + +fn abcd_schema_no_extra() -> SchemaRef { + abcd_schema(&[]) +} + #[tokio::test] async fn test_delete_single_filter() -> Result<()> { let provider = Arc::new(CaptureDeleteProvider::new(test_schema())); @@ -761,6 +780,280 @@ async fn test_update_from_drops_non_target_predicates() -> Result<()> { Ok(()) } +#[tokio::test] +#[ignore = "TODO(19950): enable once the implementation PR lands"] +async fn test_update_from_drops_non_target_predicates_issue_19950() -> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let target_schema = abcd_schema_no_extra(); + let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( + Arc::clone(&target_schema), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; + + let source_schema = abcd_schema(&[Field::new("src_only", DataType::Utf8, true)]); + let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); + ctx.register_table("t2", Arc::new(source_table))?; + + let df = ctx + .sql( + "UPDATE t1 SET d = 1 FROM t2 \ + WHERE t1.a = t2.a AND t2.src_only = 'active' AND t1.d > 10", + ) + .await?; + + df.collect().await?; + + let filters = target_provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!( + filters.len(), + 1, + "only target-table predicates should be retained for provider update" + ); + assert!( + filters[0].to_string().contains("d"), + "Expected target predicate on d, got: {}", + filters[0] + ); + Ok(()) +} + +#[tokio::test] +#[ignore = "TODO(19950): enable once the implementation PR lands"] +async fn test_update_from_alias_variants_are_accepted_issue_19950() -> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let target_schema = abcd_schema_no_extra(); + let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( + Arc::clone(&target_schema), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; + + let source_schema = abcd_schema_no_extra(); + let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); + ctx.register_table("t2", Arc::new(source_table))?; + + let alias_queries = [ + "UPDATE t1 AS dst \ + SET b = src.b, d = src.d \ + FROM t2 AS src \ + WHERE dst.a = src.a AND src.b = 'active'", + "UPDATE t1 \ + FROM t2 AS src \ + SET b = src.b, d = src.d \ + WHERE t1.a = src.a", + ]; + + for sql in alias_queries { + let _ = ctx.sql(sql).await?; + } + + Ok(()) +} + +#[tokio::test] +#[ignore = "TODO(19950): enable once the implementation PR lands"] +async fn test_update_from_joined_assignments_plan_success_issue_19950() -> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let target_schema = abcd_schema_no_extra(); + let target_provider = + Arc::new(CaptureUpdateProvider::new(Arc::clone(&target_schema))); + let ctx = SessionContext::new(); + ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; + + let source_schema = abcd_schema_no_extra(); + let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); + ctx.register_table("t2", Arc::new(source_table))?; + + let _ = ctx + .sql( + "UPDATE t1 AS dst \ + SET b = src.b, d = src.d \ + FROM t2 AS src \ + WHERE dst.a = src.a AND src.b = 'active'", + ) + .await?; + + Ok(()) +} + +#[tokio::test] +#[ignore = "TODO(19950): enable once the implementation PR lands"] +async fn test_update_from_with_join_only_predicates_executes_issue_19950() -> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let ctx = + SessionContext::new_with_config(SessionConfig::new().with_target_partitions(4)); + + let t1_schema = abcd_schema_no_extra(); + let t1_batch = RecordBatch::try_new( + Arc::clone(&t1_schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["zoo", "qux", "bar"])), + Arc::new(Float64Array::from(vec![2.0, 3.0, 4.0])), + Arc::new(Int32Array::from(vec![10, 20, 30])), + ], + )?; + let t1 = MemTable::try_new(Arc::clone(&t1_schema), vec![vec![t1_batch]])?; + ctx.register_table("t1", Arc::new(t1))?; + + let t2_schema = abcd_schema_no_extra(); + let t2_batch = RecordBatch::try_new( + Arc::clone(&t2_schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 4])), + Arc::new(StringArray::from(vec![ + "updated_b", + "updated_b2", + "updated_b3", + ])), + Arc::new(Float64Array::from(vec![5.0, 2.5, 1.5])), + Arc::new(Int32Array::from(vec![40, 50, 60])), + ], + )?; + let t2 = MemTable::try_new(Arc::clone(&t2_schema), vec![vec![t2_batch]])?; + ctx.register_table("t2", Arc::new(t2))?; + + ctx.sql( + "UPDATE t1 AS dst \ + SET b = src.b, c = src.a, d = 1 \ + FROM t2 AS src \ + WHERE dst.a = src.a AND src.c > 1.0", + ) + .await? + .collect() + .await?; + + let actual = ctx + .sql("SELECT * FROM t1 ORDER BY a") + .await? + .collect() + .await?; + assert_batches_eq!( + [ + "+---+------------+-----+----+", + "| a | b | c | d |", + "+---+------------+-----+----+", + "| 1 | updated_b | 1.0 | 1 |", + "| 2 | updated_b2 | 2.0 | 1 |", + "| 3 | bar | 4.0 | 30 |", + "+---+------------+-----+----+", + ], + &actual + ); + + Ok(()) +} + +#[tokio::test] +#[ignore = "TODO(19950): enable once the implementation PR lands"] +async fn test_update_from_with_only_join_predicate_executes_issue_19950() -> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let ctx = + SessionContext::new_with_config(SessionConfig::new().with_target_partitions(4)); + + let t1_schema = abcd_schema_no_extra(); + let t1_batch = RecordBatch::try_new( + Arc::clone(&t1_schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["zoo", "qux", "bar"])), + Arc::new(Float64Array::from(vec![2.0, 3.0, 4.0])), + Arc::new(Int32Array::from(vec![10, 20, 30])), + ], + )?; + let t1 = MemTable::try_new(Arc::clone(&t1_schema), vec![vec![t1_batch]])?; + ctx.register_table("t1", Arc::new(t1))?; + + let t2_schema = abcd_schema_no_extra(); + let t2_batch = RecordBatch::try_new( + Arc::clone(&t2_schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 4])), + Arc::new(StringArray::from(vec![ + "updated_b", + "updated_b2", + "updated_b3", + ])), + Arc::new(Float64Array::from(vec![5.0, 2.5, 1.5])), + Arc::new(Int32Array::from(vec![40, 50, 60])), + ], + )?; + let t2 = MemTable::try_new(Arc::clone(&t2_schema), vec![vec![t2_batch]])?; + ctx.register_table("t2", Arc::new(t2))?; + + ctx.sql( + "UPDATE t1 AS dst \ + SET b = src.b, c = src.a, d = src.d \ + FROM t2 AS src \ + WHERE dst.a = src.a", + ) + .await? + .collect() + .await?; + + let actual = ctx + .sql("SELECT * FROM t1 ORDER BY a") + .await? + .collect() + .await?; + assert_batches_eq!( + [ + "+---+------------+-----+----+", + "| a | b | c | d |", + "+---+------------+-----+----+", + "| 1 | updated_b | 1.0 | 40 |", + "| 2 | updated_b2 | 2.0 | 50 |", + "| 3 | bar | 4.0 | 30 |", + "+---+------------+-----+----+", + ], + &actual + ); + + Ok(()) +} + +#[tokio::test] +#[ignore = "TODO(19950): enable once the implementation PR lands"] +async fn test_update_from_self_join_drops_source_side_predicates_issue_19950() +-> Result<()> { + // TODO(19950): enable once the implementation PR lands. + let target_schema = abcd_schema_no_extra(); + let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( + Arc::clone(&target_schema), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; + + let df = ctx + .sql( + "UPDATE t1 AS dst \ + SET b = src.b \ + FROM t1 AS src \ + WHERE dst.a = src.a AND dst.d > 10 AND src.b = 'active'", + ) + .await?; + + df.collect().await?; + + let filters = target_provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!( + filters.len(), + 1, + "only target-side predicates should be forwarded in self-join updates" + ); + assert_eq!(filters[0].to_string(), "d > Int32(10)"); + + Ok(()) +} + #[tokio::test] async fn test_delete_qualifier_stripping_and_validation() -> Result<()> { // Test that filter qualifiers are properly stripped and validated diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 9570336e995f2..d26149f63ab4d 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -739,6 +739,30 @@ fn plan_update() { ); } +#[test] +#[ignore = "TODO(19950): enable once the implementation PR lands"] +fn plan_update_from_projects_original_target_row() { + // TODO(19950): enable once the implementation PR lands. + let sql = "update person as dst \ + set last_name = src.last_name, age = src.age \ + from person as src \ + where dst.id = src.id"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" + Dml: op=[Update] table=[person] + Projection: dst.id AS id, dst.first_name AS first_name, src.last_name AS last_name, src.age AS age, dst.state AS state, dst.salary AS salary, dst.birth_date AS birth_date, dst.😀 AS 😀, dst.id AS __df_update_old_id, dst.first_name AS __df_update_old_first_name, dst.last_name AS __df_update_old_last_name, dst.age AS __df_update_old_age, dst.state AS __df_update_old_state, dst.salary AS __df_update_old_salary, dst.birth_date AS __df_update_old_birth_date, dst.😀 AS __df_update_old_😀 + Filter: dst.id = src.id + Cross Join: + SubqueryAlias: dst + TableScan: person + SubqueryAlias: src + TableScan: person + "# + ); +} + #[rstest] #[case::missing_assignment_target("UPDATE person SET doesnotexist = true")] #[case::missing_assignment_expression("UPDATE person SET age = doesnotexist + 42")] diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index 1cd2b626e3b8e..abeb87457465f 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -112,3 +112,110 @@ insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200); # TODO fix https://github.com/apache/datafusion/issues/19950 statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; + +# SKIPPED TODO(19950): enable once the implementation PR lands. +# query TT +# explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; +# ---- +# logical_plan +# 01)Dml: op=[Update] table=[t1] +# 02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d, t1.a AS __df_update_old_a, t1.b AS __df_update_old_b, t1.c AS __df_update_old_c, t1.d AS __df_update_old_d +# 03)----Filter: t1.a = t2.a AND t1.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1) +# 04)------Cross Join: +# 05)--------TableScan: t1 +# 06)--------TableScan: t2 +# physical_plan +# 01)CooperativeExec +# 02)--DmlResultExec: rows_affected=0 +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# query TT +# explain update t1 from t2 set b = t2.b, c = t1.a + t2.a where t1.a = t2.a; +# ---- +# logical_plan +# 01)Dml: op=[Update] table=[t1] +# 02)--Projection: t1.a AS a, t2.b AS b, CAST(t1.a + t2.a AS Float64) AS c, t1.d AS d, t1.a AS __df_update_old_a, t1.b AS __df_update_old_b, t1.c AS __df_update_old_c, t1.d AS __df_update_old_d +# 03)----Filter: t1.a = t2.a +# 04)------Cross Join: +# 05)--------TableScan: t1 +# 06)--------TableScan: t2 +# physical_plan +# 01)CooperativeExec +# 02)--DmlResultExec: rows_affected=0 +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# query TT +# explain update t1 as dst set b = src.b, c = src.a + dst.a, d = dst.d + src.d from t2 as src where dst.a = src.a and src.c > dst.c; +# ---- +# logical_plan +# 01)Dml: op=[Update] table=[t1] +# 02)--Projection: dst.a AS a, src.b AS b, CAST(src.a + dst.a AS Float64) AS c, dst.d + src.d AS d, dst.a AS __df_update_old_a, dst.b AS __df_update_old_b, dst.c AS __df_update_old_c, dst.d AS __df_update_old_d +# 03)----Filter: dst.a = src.a AND src.c > dst.c +# 04)------Cross Join: +# 05)--------SubqueryAlias: dst +# 06)----------TableScan: t1 +# 07)--------SubqueryAlias: src +# 08)----------TableScan: t2 +# physical_plan +# 01)CooperativeExec +# 02)--DmlResultExec: rows_affected=0 +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# statement ok +# update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# query ITRI +# select * from t1 order by a; +# ---- +# 1 updated_b 1 1 +# 2 updated_b2 2 1 +# 3 bar 4 30 +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# query TT +# explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; +# ---- +# logical_plan +# 01)Dml: op=[Update] table=[t1] +# 02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d, t.a AS __df_update_old_a, t.b AS __df_update_old_b, t.c AS __df_update_old_c, t.d AS __df_update_old_d +# 03)----Filter: t.a = t2.a AND t.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1) +# 04)------Cross Join: +# 05)--------SubqueryAlias: t +# 06)----------TableScan: t1 +# 07)--------TableScan: t2 +# physical_plan +# 01)CooperativeExec +# 02)--DmlResultExec: rows_affected=2 +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# statement ok +# update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# query ITRI +# select * from t1 order by a; +# ---- +# 1 new_val 1 1 +# 2 new_val2 2 1 +# 3 apple 3.5 15 +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# statement ok +# update t1 as dst set b = src.b, c = src.a + dst.a, d = src.d from t2 as src where dst.a = src.a and src.c > 1.0; +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# query ITRI +# select * from t1 order by a; +# ---- +# 1 new_val 2 100 +# 2 new_val2 4 200 +# 3 apple 3.5 15 +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# statement ok +# insert into t2 values (1, 'duplicate_match', 9.9, 300); +# +# SKIPPED TODO(19950): enable once the implementation PR lands. +# statement error DataFusion error: UPDATE operation on table 't1' +# update t1 as dst set b = src.b, c = src.a + dst.a, d = src.d from t2 as src where dst.a = src.a and dst.a = 1;