fix: reject Parquet TimestampLTZ as TimestampNTZ on Spark 3.x for native_datafusion#4356
Closed
andygrove wants to merge 2 commits into
Closed
fix: reject Parquet TimestampLTZ as TimestampNTZ on Spark 3.x for native_datafusion#4356andygrove wants to merge 2 commits into
andygrove wants to merge 2 commits into
Conversation
…ive_datafusion Pre-Spark-4 (SPARK-36182) rejects reading a Parquet TimestampLTZ column as TimestampNTZ; native_datafusion previously did not, and silently returned the UTC instant. Plumb a per-Spark-version flag from ShimCometConf through the NativeScan proto into SparkParquetOptions, and gate a new rejection arm in the schema adapter on it. INT96 remains a gap because DataFusion's coerce_int96 strips the source timezone before the schema adapter runs, so it is indistinguishable from a true TIMESTAMP_NTZ source. Compatibility guide updated to describe the correctness implications.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Partial fix for #4219 (INT96 case tracked in the same issue, see below).
Rationale for this change
Pre-Spark-4 (SPARK-36182) rejects reading a Parquet TimestampLTZ column as TimestampNTZ because LTZ encodes UTC-adjusted instants that cannot be safely reinterpreted as timezone-free values. The
native_datafusionscan did not enforce this, and silently returned the UTC instant as the NTZ value. That is a correctness divergence on Spark 3.x: queries Spark would have failed instead return values, and downstream filters / joins / aggregations on the column may produce different results than running the same query without Comet. Spark 4.0 (SPARK-47447) lifted the restriction.The
native_iceberg_compatpath was already correct because its JVM-sideTypeUtil.checkParquetTyperejects the read before reaching native code.What changes are included in this PR?
COMET_ALLOW_TIMESTAMP_LTZ_AS_NTZinShimCometConf(false on 3.x, true on 4.x).allow_timestamp_ltz_to_ntzfield on theNativeScanCommonproto, set from the shim constant inCometNativeScan, plumbed intoSparkParquetOptionsviainit_datasource_exec/get_options.SparkPhysicalExprAdapter(schema_adapter.rs) that emitsreject_on_non_empty_exprwhen an ArrowTimestamp(_, Some(_))column is read asTimestamp(_, None)and the flag forbids it. Deferred to runtime so empty files (SPARK-26709) continue to pass.iceberg-compatJNI path passestruebecauseTypeUtil.checkParquetTypehas already validated.docs/source/user-guide/latest/compatibility/scans.md) to narrow the documented gap to INT96 and describe the correctness implications.Known gap
INT96 → TimestampNTZ on Spark 3.x is still not rejected by
native_datafusion. DataFusion'scoerce_int96producesTimestamp(unit, None)for INT96 columns, stripping the source timezone before Comet's schema adapter sees the column. At that point INT96 is indistinguishable from a true TIMESTAMP_NTZ source, so the new pattern does not fire. The annotated LTZ encodings (TIMESTAMP_MICROS/TIMESTAMP_MILLISwithisAdjustedToUTC=true) are covered. The INT96 +native_datafusiontest case in the suite stays skipped with a link back to #4219.How are these changes tested?
ParquetTimestampLtzAsNtzSuitealready covered the pre-Spark-4 case fornative_iceberg_compatand the Spark 4.0+ positive case. It has been extended to parametrize the pre-Spark-4 case across both scan implementations, so the threenative_datafusionvariants (INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS) now run instead of being globally skipped. On Spark 3.5, the two annotated-encoding variants pass; the INT96 variant skips with a link to #4219. On Spark 4.0 the existing positive tests continue to pass.Verified locally:
make(default Spark profile)./mvnw test -Pspark-3.5 -Dsuites=org.apache.comet.parquet.ParquetTimestampLtzAsNtzSuite./mvnw test -Pspark-4.0 -Dsuites=org.apache.comet.parquet.ParquetTimestampLtzAsNtzSuitecargo clippy --all-targets --workspace -- -D warnings