Skip to content

Commit bb8fb73

Browse files
JustinRush80ion-elgreco
authored andcommitted
sane check for when_not_matched_insert().when_matched_update()
Signed-off-by: JustinRush80 <[email protected]>
1 parent 437a15c commit bb8fb73

File tree

2 files changed

+73
-0
lines changed

2 files changed

+73
-0
lines changed

crates/core/src/operations/merge/mod.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2352,6 +2352,78 @@ mod tests {
23522352
);
23532353
assert_batches_sorted_eq!(&expected, &actual);
23542354
}
2355+
#[tokio::test]
2356+
async fn test_merge_schema_evolution_simple_insert_with_simple_update() {
2357+
let (table, _) = setup().await;
2358+
2359+
let schema = Arc::new(ArrowSchema::new(vec![
2360+
Field::new("id", ArrowDataType::Utf8, true),
2361+
Field::new("value", ArrowDataType::Int32, true),
2362+
Field::new("modified", ArrowDataType::Utf8, true),
2363+
Field::new("inserted_by", ArrowDataType::Utf8, true),
2364+
]));
2365+
let ctx = SessionContext::new();
2366+
let batch = RecordBatch::try_new(
2367+
Arc::clone(&schema),
2368+
vec![
2369+
Arc::new(arrow::array::StringArray::from(vec!["B", "C", "X"])),
2370+
Arc::new(arrow::array::Int32Array::from(vec![50, 200, 30])),
2371+
Arc::new(arrow::array::StringArray::from(vec![
2372+
"2021-02-02",
2373+
"2023-07-04",
2374+
"2023-07-04",
2375+
])),
2376+
Arc::new(arrow::array::StringArray::from(vec!["B1", "C1", "X1"])),
2377+
],
2378+
)
2379+
.unwrap();
2380+
let source = ctx.read_batch(batch).unwrap();
2381+
2382+
let (table, _) = DeltaOps(table)
2383+
.merge(source, col("target.id").eq(col("source.id")))
2384+
.with_source_alias("source")
2385+
.with_target_alias("target")
2386+
.with_merge_schema(true)
2387+
.when_not_matched_insert(|insert| {
2388+
insert
2389+
.set("id", col("source.id"))
2390+
.set("value", col("source.value"))
2391+
.set("modified", col("source.modified"))
2392+
.set("inserted_by", "source.inserted_by")
2393+
})
2394+
.unwrap()
2395+
.when_matched_update(|update| {
2396+
update
2397+
.update("value", col("source.value").add(lit(1)))
2398+
.update("modified", col("source.modified"))
2399+
.update("inserted_by", col("source.inserted_by"))
2400+
})
2401+
.unwrap()
2402+
.await
2403+
.unwrap();
2404+
2405+
let last_commit = table.last_commit().await.unwrap();
2406+
let parameters = last_commit.operation_parameters.clone().unwrap();
2407+
assert_eq!(parameters["mergePredicate"], json!("target.id = source.id"));
2408+
let expected = vec![
2409+
"+----+-------+------------+-------------+",
2410+
"| id | value | modified | inserted_by |",
2411+
"+----+-------+------------+-------------+",
2412+
"| A | 1 | 2021-02-01 | |",
2413+
"| B | 51 | 2021-02-02 | B1 |",
2414+
"| C | 201 | 2023-07-04 | C1 |",
2415+
"| D | 100 | 2021-02-02 | |",
2416+
"| X | 30 | 2023-07-04 | X1 |",
2417+
"+----+-------+------------+-------------+",
2418+
];
2419+
let actual = get_data(&table).await;
2420+
let expected_schema_struct: StructType = Arc::clone(&schema).try_into_kernel().unwrap();
2421+
assert_eq!(
2422+
&expected_schema_struct,
2423+
table.snapshot().unwrap().schema().as_ref()
2424+
);
2425+
assert_batches_sorted_eq!(&expected, &actual);
2426+
}
23552427

23562428
#[tokio::test]
23572429
async fn test_merge_schema_evolution_simple_insert() {

python/tests/test_merge.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ def test_merge_when_matched_update_wo_predicate_with_schema_evolution(
298298
assert result.schema == expected.schema
299299
assert result == expected
300300

301+
301302
def test_merge_when_matched_update_wo_predicate_and_insert_with_schema_evolution(
302303
tmp_path: pathlib.Path, sample_table: Table
303304
):

0 commit comments

Comments
 (0)