diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala index 2795911da3..042fd9ced3 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala @@ -261,7 +261,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { val subExprsCode = ctx.subexprFunctionsCode val (cls, setup, snippet) = CometBatchKernelCodegenOutput.emitOutputWriter(boundExpr.dataType, ev.value, ctx) - (cls, setup, defaultBody(boundExpr, ev, snippet, subExprsCode)) + (cls, setup, defaultBody(boundExpr, inputSchema, ev, snippet, subExprsCode)) } val typedFieldDecls = CometBatchKernelCodegenInput.emitInputFieldDecls(inputSchema) @@ -343,6 +343,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { */ private def defaultBody( boundExpr: Expression, + inputSchema: Seq[ArrowColumnSpec], ev: ExprCode, writeSnippet: String, subExprsCode: String): String = { @@ -353,9 +354,17 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { // make this incorrect (`coalesce(null, x)` is `x`); `allNullIntolerant` rejects those. val inputOrdinals = boundExpr.collect { case b: BoundReference => b.ordinal }.distinct + // Primitive Arrow vectors are wrapped in `CometPlainVector` at input-cast time, which + // exposes `isNullAt(int)` rather than the raw Arrow `isNull(int)`. Pick the right method + // per ordinal so the short-circuit compiles for timestamp / int / float columns too, + // not just VarChar / Decimal vectors that stay as raw Arrow types. + def nullCheckCall(ord: Int): String = { + val method = CometBatchKernelCodegenInput.nullCheckMethod(inputSchema(ord)) + s"this.col$ord.$method(i)" + } val nullCheck = if (inputOrdinals.isEmpty) "false" - else inputOrdinals.map(ord => s"this.col$ord.isNull(i)").mkString(" || ") + else inputOrdinals.map(nullCheckCall).mkString(" || ") s""" |if ($nullCheck) { | output.setNull(i); diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala index 79a2af6837..74e4881de0 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala @@ -404,8 +404,10 @@ private[codegen] object CometBatchKernelCodegenInput { /** * Java method name for the per-column null check. Primitive scalars wrapped in * [[CometPlainVector]] expose `isNullAt`; Arrow typed fields expose `isNull`. Same semantics. + * Used both by `emitTypedGetters` (for the kernel's `isNullAt` switch) and by + * `CometBatchKernelCodegen.defaultBody` (for the `NullIntolerant` short-circuit). */ - private def nullCheckMethod(spec: ArrowColumnSpec): String = spec match { + def nullCheckMethod(spec: ArrowColumnSpec): String = spec match { case sc: ScalarColumnSpec if wrapsInCometPlainVector(sc.vectorClass) => "isNullAt" case _ => "isNull" } diff --git a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala index bf636f7221..852e80ae44 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala @@ -20,7 +20,7 @@ package org.apache.comet.serde import org.apache.spark.SparkEnv -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Literal, ScalaUDF} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, Literal, ScalaUDF} import org.apache.spark.sql.types.BinaryType import org.apache.comet.CometConf @@ -45,15 +45,35 @@ import org.apache.comet.udf.codegen.CometScalaUDFCodegen * * Gated by [[CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED]]. When disabled, plans containing a * `ScalaUDF` fall back to Spark for the enclosing operator. + * + * [[emitJvmCodegenDispatch]] exposes the same closure-serialize + dispatcher-proto path to other + * serdes that want to keep a built-in Spark expression inside the Comet pipeline when no native + * lowering is viable. See [[CometDateFormat]] for an example. */ object CometScalaUDF extends CometExpressionSerde[ScalaUDF] { - override def convert(expr: ScalaUDF, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { + override def convert(expr: ScalaUDF, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = + emitJvmCodegenDispatch(expr, inputs, binding) + + /** + * Bind `expr`, closure-serialize it, and emit a `JvmScalarUdf` proto routed through + * [[CometScalaUDFCodegen]] so that native execution evaluates the expression inside the + * Arrow-direct codegen dispatcher. The dispatcher will Janino-compile `expr.doGenCode` into a + * batch kernel on first invocation per task. + * + * Returns `None` (with `withInfo` tagging the reason) when the dispatcher is disabled via + * [[CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED]] or when [[CometBatchKernelCodegen.canHandle]] + * refuses the expression tree. Callers should treat `None` as a clean Spark-fallback signal. + */ + def emitJvmCodegenDispatch( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { if (!CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.get()) { withInfo( expr, - s"${CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key}=false; ScalaUDF has no native path " + - "so the plan falls back to Spark") + s"${CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key}=false; expression has no native " + + "path so the plan falls back to Spark") return None } @@ -100,3 +120,21 @@ object CometScalaUDF extends CometExpressionSerde[ScalaUDF] { .build()) } } + +/** + * Convenience base for serdes that route a non-ScalaUDF Spark expression through the codegen + * dispatcher. Delegates `convert` to [[CometScalaUDF.emitJvmCodegenDispatch]] and marks the + * expression `Compatible()` because the dispatcher runs Spark's own `doGenCode` inside the + * kernel: behavior matches Spark exactly when [[CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED]] is + * enabled, and the operator falls back to Spark cleanly when it is not. + */ +class CometCodegenDispatch[T <: Expression] extends CometExpressionSerde[T] { + override def getSupportLevel(expr: T): SupportLevel = Compatible() + // Intentionally no getCompatibleNotes override: the docs generator emits compat notes under + // a heading that promises "no additional configuration required". The dispatcher flag is a + // global concern documented elsewhere; tagging each expression here would contradict the + // heading. When the flag is off, `convert` returns None with a clear withInfo reason that + // shows up in EXPLAIN, which is the right place for that signal. + override def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = + CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding) +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 8d48239e76..bbb64133cf 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -219,6 +219,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { private[comet] val temporalExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + classOf[AddMonths] -> CometAddMonths, classOf[ConvertTimezone] -> CometConvertTimezone, classOf[DateAdd] -> CometDateAdd, classOf[DateDiff] -> CometDateDiff, @@ -234,12 +235,20 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[LastDay] -> CometLastDay, classOf[Hour] -> CometHour, classOf[MakeDate] -> CometMakeDate, + classOf[MakeTimestamp] -> CometMakeTimestamp, + classOf[MicrosToTimestamp] -> CometMicrosToTimestamp, + classOf[MillisToTimestamp] -> CometMillisToTimestamp, + classOf[MonthsBetween] -> CometMonthsBetween, classOf[Minute] -> CometMinute, classOf[NextDay] -> CometNextDay, classOf[Second] -> CometSecond, classOf[SecondsToTimestamp] -> CometSecondsToTimestamp, classOf[TruncDate] -> CometTruncDate, classOf[TruncTimestamp] -> CometTruncTimestamp, + classOf[ToUnixTimestamp] -> CometToUnixTimestamp, + classOf[UnixMicros] -> CometUnixMicros, + classOf[UnixMillis] -> CometUnixMillis, + classOf[UnixSeconds] -> CometUnixSeconds, classOf[UnixTimestamp] -> CometUnixTimestamp, classOf[Year] -> CometYear, classOf[Month] -> CometMonth, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index b57b1e4e56..e2995274ad 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,11 +21,12 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{AddMonths, Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, MakeTimestamp, MicrosToTimestamp, MillisToTimestamp, Minute, Month, MonthsBetween, NextDay, Quarter, Second, SecondsToTimestamp, ToUnixTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixMicros, UnixMillis, UnixSeconds, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType} import org.apache.spark.unsafe.types.UTF8String +import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.serde.CometGetDateField.CometGetDateField @@ -593,17 +594,23 @@ object CometTruncTimestamp extends CometExpressionSerde[TruncTimestamp] { } /** - * Converts Spark DateFormatClass expression to DataFusion's to_char function. + * Converts Spark `DateFormatClass` to DataFusion's `to_char` when format and timezone are + * mappable, otherwise routes the expression through the Arrow-direct codegen dispatcher so that + * Spark's own `DateFormatClass.doGenCode` runs inside the Comet pipeline. * - * Spark uses Java SimpleDateFormat patterns while DataFusion uses strftime patterns. This - * implementation supports a whitelist of common format strings that can be reliably mapped - * between the two systems. + * Routing: + * - format is a literal in `supportedFormats` AND timezone is UTC -> native `to_char` + * - format is a literal in `supportedFormats` AND timezone is non-UTC, with the per-expression + * `allowIncompatible` flag set -> native `to_char` (results may differ from Spark) + * - all other cases -> JVM codegen dispatcher ([[CometScalaUDF.emitJvmCodegenDispatch]]), gated + * by [[CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED]]. When that flag is disabled the operator + * falls back to Spark. */ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { /** * Mapping from Spark SimpleDateFormat patterns to strftime patterns. Only formats in this map - * are supported. + * are supported by the native path. */ val supportedFormats: Map[String, String] = Map( // Full date formats @@ -637,66 +644,50 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { // ISO formats "yyyy-MM-dd'T'HH:mm:ss" -> "%Y-%m-%dT%H:%M:%S") - override def getIncompatibleReasons(): Seq[String] = Seq( - "Non-UTC timezones may produce different results than Spark") - - override def getUnsupportedReasons(): Seq[String] = Seq( - "Only the following formats are supported:" + - supportedFormats.keys.toSeq.sorted - .map(k => s"`$k`") - .mkString("\n - ", "\n - ", "")) + // Compatibility is decided inside `convert`: the native path covers a subset, and the codegen + // dispatcher covers everything else when enabled. Plan-time tagging happens via `withInfo` on + // the path that returns None. + override def getSupportLevel(expr: DateFormatClass): SupportLevel = Compatible() - override def getSupportLevel(expr: DateFormatClass): SupportLevel = { - // Check timezone - only UTC is fully compatible - val timezone = expr.timeZoneId.getOrElse("UTC") - val isUtc = timezone == "UTC" || timezone == "Etc/UTC" - - expr.right match { - case Literal(fmt: UTF8String, _) => - val format = fmt.toString - if (supportedFormats.contains(format)) { - if (isUtc) { - Compatible() - } else { - Incompatible(Some(s"Non-UTC timezone '$timezone' may produce different results")) - } - } else { - Unsupported( - Some( - s"Format '$format' is not supported. Supported formats: " + - supportedFormats.keys.mkString(", "))) - } - case _ => - Unsupported(Some("Only literal format strings are supported")) - } - } + override def getCompatibleNotes(): Seq[String] = Seq( + "Format strings in a curated allow-list run natively via DataFusion's `to_char` for UTC " + + "sessions. Other format strings (including non-literal formats), as well as non-UTC " + + "sessions, route through Spark's own `DateFormatClass.doGenCode` via the Arrow-direct " + + "codegen dispatcher when `spark.comet.exec.scalaUDF.codegen.enabled=true`. When the " + + "codegen dispatcher is disabled (default) the operator falls back to Spark in those " + + "cases.") override def convert( expr: DateFormatClass, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - // Get the format string - must be a literal for us to map it - val strftimeFormat = expr.right match { - case Literal(fmt: UTF8String, _) => - supportedFormats.get(fmt.toString) + val timezone = expr.timeZoneId.getOrElse("UTC") + val isUtc = timezone == "UTC" || timezone == "Etc/UTC" + + val nativeFormat: Option[String] = expr.right match { + case Literal(fmt: UTF8String, _) => supportedFormats.get(fmt.toString) case _ => None } - strftimeFormat match { - case Some(format) => - val childExpr = exprToProtoInternal(expr.left, inputs, binding) - val formatExpr = exprToProtoInternal(Literal(format), inputs, binding) - - val optExpr = scalarFunctionExprToProtoWithReturnType( - "to_char", - StringType, - false, - childExpr, - formatExpr) - optExprWithInfo(optExpr, expr, expr.left, expr.right) - case None => - withInfo(expr, expr.left, expr.right) - None + val canUseNative = nativeFormat.isDefined && { + isUtc || CometConf.isExprAllowIncompat(getExprConfigName(expr)) + } + + if (canUseNative) { + val childExpr = exprToProtoInternal(expr.left, inputs, binding) + val formatExpr = exprToProtoInternal(Literal(nativeFormat.get), inputs, binding) + val optExpr = scalarFunctionExprToProtoWithReturnType( + "to_char", + StringType, + false, + childExpr, + formatExpr) + optExprWithInfo(optExpr, expr, expr.left, expr.right) + } else { + // Hand the full `DateFormatClass` (with `timeZoneId` already stamped by `ResolveTimeZone`) + // to the codegen dispatcher. It closure-serializes the bound tree, so non-UTC timezones + // and non-whitelisted / non-literal format strings produce Spark-identical results. + CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding) } } } @@ -780,3 +771,21 @@ object CometDays extends CometExpressionSerde[Days] { optExprWithInfo(optExpr, expr, expr.child) } } + +object CometAddMonths extends CometCodegenDispatch[AddMonths] + +object CometMonthsBetween extends CometCodegenDispatch[MonthsBetween] + +object CometMakeTimestamp extends CometCodegenDispatch[MakeTimestamp] + +object CometMicrosToTimestamp extends CometCodegenDispatch[MicrosToTimestamp] + +object CometMillisToTimestamp extends CometCodegenDispatch[MillisToTimestamp] + +object CometUnixSeconds extends CometCodegenDispatch[UnixSeconds] + +object CometUnixMillis extends CometCodegenDispatch[UnixMillis] + +object CometUnixMicros extends CometCodegenDispatch[UnixMicros] + +object CometToUnixTimestamp extends CometCodegenDispatch[ToUnixTimestamp] diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/add_months.sql b/spark/src/test/resources/sql-tests/expressions/datetime/add_months.sql new file mode 100644 index 0000000000..47c3957812 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/add_months.sql @@ -0,0 +1,43 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Routes add_months through the codegen dispatcher. Spark's own AddMonths.doGenCode +-- runs inside the Janino-compiled kernel. +-- Config: spark.sql.session.timeZone=America/Los_Angeles +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +statement +CREATE TABLE test_add_months(d date, n int) USING parquet + +statement +INSERT INTO test_add_months VALUES + (date('2024-01-15'), 1), + (date('2024-01-31'), 1), + (date('2024-12-15'), -13), + (date('1970-01-01'), 0), + (NULL, 1), + (date('2024-06-15'), NULL) + +query +SELECT add_months(d, n) FROM test_add_months + +query +SELECT add_months(d, 12) FROM test_add_months + +-- literal arguments +query +SELECT add_months(date('2024-02-29'), 12) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/date_format.sql b/spark/src/test/resources/sql-tests/expressions/datetime/date_format.sql index 09333f44d3..dec690cb6a 100644 --- a/spark/src/test/resources/sql-tests/expressions/datetime/date_format.sql +++ b/spark/src/test/resources/sql-tests/expressions/datetime/date_format.sql @@ -15,21 +15,27 @@ -- specific language governing permissions and limitations -- under the License. +-- Pin the session timezone so the test exercises the non-UTC path regardless of the JVM +-- default. Enable the codegen dispatcher so non-UTC and non-whitelisted formats stay inside +-- Comet via Spark's own DateFormatClass.doGenCode instead of falling back to Spark. +-- Config: spark.sql.session.timeZone=America/Los_Angeles +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + statement CREATE TABLE test_date_format(ts timestamp) USING parquet statement INSERT INTO test_date_format VALUES (timestamp('2024-06-15 10:30:45')), (timestamp('1970-01-01 00:00:00')), (NULL) -query expect_fallback(Non-UTC timezone) +query SELECT date_format(ts, 'yyyy-MM-dd') FROM test_date_format -query expect_fallback(Non-UTC timezone) +query SELECT date_format(ts, 'HH:mm:ss') FROM test_date_format -query expect_fallback(Non-UTC timezone) +query SELECT date_format(ts, 'yyyy-MM-dd HH:mm:ss') FROM test_date_format -- literal arguments -query expect_fallback(Non-UTC timezone) +query SELECT date_format(timestamp('2024-06-15 10:30:45'), 'yyyy-MM-dd') diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/make_timestamp.sql b/spark/src/test/resources/sql-tests/expressions/datetime/make_timestamp.sql new file mode 100644 index 0000000000..07f4ffd8c8 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/make_timestamp.sql @@ -0,0 +1,42 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Routes make_timestamp through the codegen dispatcher. +-- Config: spark.sql.session.timeZone=America/Los_Angeles +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +statement +CREATE TABLE test_make_timestamp(y int, mo int, d int, h int, mi int, s decimal(8,6)) USING parquet + +statement +INSERT INTO test_make_timestamp VALUES + (2024, 6, 15, 10, 30, 45.123456), + (1970, 1, 1, 0, 0, 0.0), + (2024, 12, 31, 23, 59, 59.999999), + (NULL, 6, 15, 10, 30, 45.0), + (2024, NULL, 15, 10, 30, 45.0) + +query +SELECT make_timestamp(y, mo, d, h, mi, s) FROM test_make_timestamp + +-- Explicit timezone argument +query +SELECT make_timestamp(y, mo, d, h, mi, s, 'UTC') FROM test_make_timestamp + +-- literal arguments +query +SELECT make_timestamp(2024, 6, 15, 10, 30, 45.000) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/make_timestamp_ansi.sql b/spark/src/test/resources/sql-tests/expressions/datetime/make_timestamp_ansi.sql new file mode 100644 index 0000000000..16ffa09d8b --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/make_timestamp_ansi.sql @@ -0,0 +1,49 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ANSI mode: make_timestamp throws on out-of-range argument values. With the codegen +-- dispatcher enabled, Spark's own MakeTimestamp.doGenCode produces the throw site, so +-- Comet's kernel raises the same exception as Spark. The expect_error substrings target +-- the inner JDK java.time.DateTimeException message text (which is wrapped in a +-- SparkDateTimeException whose getMessage() preserves it). The driver-formatted error +-- class string `DATETIME_FIELD_OUT_OF_BOUNDS` is NOT preserved when the exception is +-- thrown from a task on the executor side (only the wrapped `Job aborted ... Lost task +-- ... SparkDateTimeException: ` form is preserved), so we match the JDK +-- field names which are stable from Spark 3.4 through 4.x. +-- Config: spark.sql.session.timeZone=UTC +-- Config: spark.sql.ansi.enabled=true +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +-- month out of range +query expect_error(MonthOfYear) +SELECT make_timestamp(2024, 13, 1, 0, 0, 0) + +-- day out of range +query expect_error(Invalid date) +SELECT make_timestamp(2024, 2, 30, 0, 0, 0) + +-- hour out of range +query expect_error(HourOfDay) +SELECT make_timestamp(2024, 6, 15, 25, 0, 0) + +-- Sentinel: a valid input must still execute on the Comet codegen path. If the dispatcher +-- silently rejects MakeTimestamp at runtime, the operator falls back to Spark and the +-- error queries above pass vacuously (Spark and fallback throw identical messages). This +-- non-error query uses `checkSparkAnswerAndOperator` which fails if Comet did not run the +-- expression natively, so a regression that breaks the dispatch path surfaces here. +query +SELECT make_timestamp(2024, 6, 15, 10, 30, 45.0) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/months_between.sql b/spark/src/test/resources/sql-tests/expressions/datetime/months_between.sql new file mode 100644 index 0000000000..6086578013 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/months_between.sql @@ -0,0 +1,41 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Routes months_between through the codegen dispatcher. +-- Config: spark.sql.session.timeZone=America/Los_Angeles +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +statement +CREATE TABLE test_months_between(t1 timestamp, t2 timestamp) USING parquet + +statement +INSERT INTO test_months_between VALUES + (timestamp('2024-06-15 10:30:00'), timestamp('2024-01-15 10:30:00')), + (timestamp('2024-01-31 00:00:00'), timestamp('2024-02-29 00:00:00')), + (timestamp('1970-01-01 00:00:00'), timestamp('1970-01-01 00:00:00')), + (NULL, timestamp('2024-01-01 00:00:00')), + (timestamp('2024-01-01 00:00:00'), NULL) + +query +SELECT months_between(t1, t2) FROM test_months_between + +query +SELECT months_between(t1, t2, false) FROM test_months_between + +-- literal arguments +query +SELECT months_between(timestamp('1997-02-28 10:30:00'), timestamp('1996-10-30 00:00:00')) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_micros.sql b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_micros.sql new file mode 100644 index 0000000000..253a97f95f --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_micros.sql @@ -0,0 +1,33 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Routes timestamp_micros through the codegen dispatcher. +-- Config: spark.sql.session.timeZone=America/Los_Angeles +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +statement +CREATE TABLE test_timestamp_micros(v long) USING parquet + +statement +INSERT INTO test_timestamp_micros VALUES (0), (1718451045000000), (-1), (NULL), (86400000000) + +query +SELECT timestamp_micros(v) FROM test_timestamp_micros + +-- literal arguments +query +SELECT timestamp_micros(1718451045000000) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_millis.sql b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_millis.sql new file mode 100644 index 0000000000..1ede954c9e --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_millis.sql @@ -0,0 +1,33 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Routes timestamp_millis through the codegen dispatcher. +-- Config: spark.sql.session.timeZone=America/Los_Angeles +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +statement +CREATE TABLE test_timestamp_millis(v long) USING parquet + +statement +INSERT INTO test_timestamp_millis VALUES (0), (1718451045000), (-1), (NULL), (86400000) + +query +SELECT timestamp_millis(v) FROM test_timestamp_millis + +-- literal arguments +query +SELECT timestamp_millis(1718451045000) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_millis_ansi.sql b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_millis_ansi.sql new file mode 100644 index 0000000000..d3c26a99d2 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_millis_ansi.sql @@ -0,0 +1,34 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ANSI mode: timestamp_millis throws on overflow. The codegen dispatcher inherits the +-- throw from Spark's own MillisToTimestamp.doGenCode. +-- Config: spark.sql.session.timeZone=UTC +-- Config: spark.sql.ansi.enabled=true +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +query expect_error(overflow) +SELECT timestamp_millis(9223372036854775807L) + +query expect_error(overflow) +SELECT timestamp_millis(-9223372036854775808L) + +-- Sentinel: confirms Comet ran the expression natively. If the dispatcher silently rejects +-- MillisToTimestamp, the error queries above pass vacuously via Spark fallback. This valid +-- query uses `checkSparkAnswerAndOperator` and fails if Comet did not execute it natively. +query +SELECT timestamp_millis(1718451045000) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/to_unix_timestamp.sql b/spark/src/test/resources/sql-tests/expressions/datetime/to_unix_timestamp.sql new file mode 100644 index 0000000000..7654d5ff32 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_unix_timestamp.sql @@ -0,0 +1,40 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Routes to_unix_timestamp through the codegen dispatcher. +-- Config: spark.sql.session.timeZone=America/Los_Angeles +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +statement +CREATE TABLE test_to_unix_timestamp(s string) USING parquet + +statement +INSERT INTO test_to_unix_timestamp VALUES + ('2024-06-15 10:30:45'), + ('1970-01-01 00:00:00'), + ('1969-12-31 23:59:59'), + (NULL) + +query +SELECT to_unix_timestamp(s, 'yyyy-MM-dd HH:mm:ss') FROM test_to_unix_timestamp + +query +SELECT to_unix_timestamp(s) FROM test_to_unix_timestamp + +-- literal argument +query +SELECT to_unix_timestamp('2024-06-15 10:30:45', 'yyyy-MM-dd HH:mm:ss') diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/to_unix_timestamp_ansi.sql b/spark/src/test/resources/sql-tests/expressions/datetime/to_unix_timestamp_ansi.sql new file mode 100644 index 0000000000..09250b9b06 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_unix_timestamp_ansi.sql @@ -0,0 +1,41 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ANSI mode: to_unix_timestamp throws on parse failure. The codegen dispatcher inherits +-- the throw from Spark's own ToUnixTimestamp.doGenCode. The time parser policy is pinned +-- to CORRECTED so the JDK java.time formatter (and the CANNOT_PARSE_TIMESTAMP error class) +-- is exercised regardless of the runtime default — LEGACY uses SimpleDateFormat with a +-- different exception text. +-- Config: spark.sql.session.timeZone=UTC +-- Config: spark.sql.ansi.enabled=true +-- Config: spark.sql.legacy.timeParserPolicy=CORRECTED +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true +-- The CANNOT_PARSE_TIMESTAMP error class was standardized in Spark 3.5; earlier versions +-- surface a different SparkUpgradeException / DateTimeParseException wording. +-- MinSparkVersion: 3.5 + +query expect_error(CANNOT_PARSE_TIMESTAMP) +SELECT to_unix_timestamp('not a date', 'yyyy-MM-dd') + +query expect_error(CANNOT_PARSE_TIMESTAMP) +SELECT to_unix_timestamp('2024-13-99', 'yyyy-MM-dd') + +-- Sentinel: confirms Comet ran the expression natively. If the dispatcher silently rejects +-- ToUnixTimestamp, the error queries above pass vacuously via Spark fallback. This valid +-- query uses `checkSparkAnswerAndOperator` and fails if Comet did not execute it natively. +query +SELECT to_unix_timestamp('2024-06-15 10:30:45', 'yyyy-MM-dd HH:mm:ss') diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/to_unix_timestamp_ansi_spark34.sql b/spark/src/test/resources/sql-tests/expressions/datetime/to_unix_timestamp_ansi_spark34.sql new file mode 100644 index 0000000000..4fed3ff654 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_unix_timestamp_ansi_spark34.sql @@ -0,0 +1,39 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ANSI mode: to_unix_timestamp throws on parse failure. Same coverage as +-- to_unix_timestamp_ansi.sql, but the expect_error substring targets the JDK +-- DateTimeParseException message text because Spark 3.4 does not yet wrap these in the +-- CANNOT_PARSE_TIMESTAMP error class (that classification arrived in Spark 3.5). The JDK +-- message "Text 'not a date' could not be parsed" is stable on Spark 3.4. +-- Config: spark.sql.session.timeZone=UTC +-- Config: spark.sql.ansi.enabled=true +-- Config: spark.sql.legacy.timeParserPolicy=CORRECTED +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true +-- MaxSparkVersion: 3.4 + +query expect_error(could not be parsed) +SELECT to_unix_timestamp('not a date', 'yyyy-MM-dd') + +query expect_error(could not be parsed) +SELECT to_unix_timestamp('2024-13-99', 'yyyy-MM-dd') + +-- Sentinel: confirms Comet ran the expression natively. If the dispatcher silently rejects +-- ToUnixTimestamp, the error queries above pass vacuously via Spark fallback. This valid +-- query uses `checkSparkAnswerAndOperator` and fails if Comet did not execute it natively. +query +SELECT to_unix_timestamp('2024-06-15 10:30:45', 'yyyy-MM-dd HH:mm:ss') diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/unix_micros.sql b/spark/src/test/resources/sql-tests/expressions/datetime/unix_micros.sql new file mode 100644 index 0000000000..07c0b1d715 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/unix_micros.sql @@ -0,0 +1,37 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Routes unix_micros through the codegen dispatcher. +-- Config: spark.sql.session.timeZone=America/Los_Angeles +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +statement +CREATE TABLE test_unix_micros(ts timestamp) USING parquet + +statement +INSERT INTO test_unix_micros VALUES + (timestamp('2024-06-15 10:30:45.123456')), + (timestamp('1970-01-01 00:00:00')), + (timestamp('1969-12-31 23:59:59.999999')), + (NULL) + +query +SELECT unix_micros(ts) FROM test_unix_micros + +-- literal argument +query +SELECT unix_micros(timestamp('2024-06-15 10:30:45.123456')) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/unix_millis.sql b/spark/src/test/resources/sql-tests/expressions/datetime/unix_millis.sql new file mode 100644 index 0000000000..4f13ee8bf3 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/unix_millis.sql @@ -0,0 +1,37 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Routes unix_millis through the codegen dispatcher. +-- Config: spark.sql.session.timeZone=America/Los_Angeles +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +statement +CREATE TABLE test_unix_millis(ts timestamp) USING parquet + +statement +INSERT INTO test_unix_millis VALUES + (timestamp('2024-06-15 10:30:45.123')), + (timestamp('1970-01-01 00:00:00')), + (timestamp('1969-12-31 23:59:59.999')), + (NULL) + +query +SELECT unix_millis(ts) FROM test_unix_millis + +-- literal argument +query +SELECT unix_millis(timestamp('2024-06-15 10:30:45.123456')) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/unix_seconds.sql b/spark/src/test/resources/sql-tests/expressions/datetime/unix_seconds.sql new file mode 100644 index 0000000000..a8d5ba92a1 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/unix_seconds.sql @@ -0,0 +1,37 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Routes unix_seconds through the codegen dispatcher. +-- Config: spark.sql.session.timeZone=America/Los_Angeles +-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true + +statement +CREATE TABLE test_unix_seconds(ts timestamp) USING parquet + +statement +INSERT INTO test_unix_seconds VALUES + (timestamp('2024-06-15 10:30:45')), + (timestamp('1970-01-01 00:00:00')), + (timestamp('1969-12-31 23:59:59')), + (NULL) + +query +SELECT unix_seconds(ts) FROM test_unix_seconds + +-- literal argument +query +SELECT unix_seconds(timestamp('2024-06-15 10:30:45.123')) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 7591279786..797020aed0 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -247,7 +247,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp .withColumn("arrUnsupportedArgs", expr("array_insert(arr, idx, 1)")) checkSparkAnswerAndFallbackReasons( df.select("arrUnsupportedArgs"), - Set("ScalaUDF has no native path", "unsupported arguments for ArrayInsert")) + Set("expression has no native path", "unsupported arguments for ArrayInsert")) } } } diff --git a/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala b/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala index 27a5830c6d..e12a0dd147 100644 --- a/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala @@ -22,9 +22,10 @@ package org.apache.comet import org.scalatest.funsuite.AnyFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Add, BoundReference, Coalesce, Concat, CreateArray, CreateMap, ElementAt, Expression, GetStructField, LeafExpression, Length, Literal, Nondeterministic, Rand, Size, Unevaluable, Upper} +import org.apache.spark.sql.catalyst.expressions.{Add, AddMonths, BoundReference, Coalesce, Concat, CreateArray, CreateMap, DateFormatClass, ElementAt, Expression, GetStructField, LeafExpression, Length, Literal, MakeTimestamp, MicrosToTimestamp, MillisToTimestamp, MonthsBetween, Nondeterministic, Rand, Size, ToUnixTimestamp, Unevaluable, UnixMicros, UnixMillis, UnixSeconds, Upper} import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.codegen.CometBatchKernelCodegen import org.apache.comet.codegen.CometBatchKernelCodegen.{ArrayColumnSpec, ArrowColumnSpec, MapColumnSpec, ScalarColumnSpec, StructColumnSpec, StructFieldSpec} @@ -61,6 +62,26 @@ class CometCodegenSourceSuite extends AnyFunSuite { specs: ArrowColumnSpec*): String = CometBatchKernelCodegen.generateSource(expr, specs.toIndexedSeq).body + test("NullIntolerant short-circuit uses isNullAt for CometPlainVector-wrapped columns") { + // Primitive Arrow vectors (timestamp / int / float / ...) are wrapped in `CometPlainVector` + // at input-cast time. The short-circuit must call `isNullAt(i)`, not `isNull(i)`, otherwise + // Janino fails to compile the kernel with "method isNull not declared". Verified end-to-end + // by `CometTemporalExpressionSuite` date_format tests over `TimeStampMicroTZVector` inputs. + val tsVec = CometBatchKernelCodegen.vectorClassBySimpleName("TimeStampMicroTZVector") + val spec = ArrowColumnSpec(tsVec, nullable = true) + val expr = DateFormatClass( + BoundReference(0, TimestampType, nullable = true), + Literal(UTF8String.fromString("yyyy-MM-dd EEEE"), StringType), + Some("UTC")) + val src = CometBatchKernelCodegen.generateSource(expr, IndexedSeq(spec)).body + assert( + src.contains("if (this.col0.isNullAt(i))"), + s"expected short-circuit to use isNullAt for CometPlainVector-wrapped col0; got:\n$src") + assert( + !src.contains("if (this.col0.isNull(i))"), + s"expected no raw Arrow isNull on the CometPlainVector-wrapped col0; got:\n$src") + } + test("non-nullable column emits literal-false isNullAt case") { val expr = Length(BoundReference(0, StringType, nullable = false)) val src = gen(expr, nonNullableString) @@ -1033,6 +1054,94 @@ class CometCodegenSourceSuite extends AnyFunSuite { s"expected no null guard on non-nullable map field; got:\n$src") } + // Bucket 4 datetime expressions routed through CometCodegenDispatch. Each entry pairs a + // bound Catalyst expression with the Arrow column specs the kernel would see at runtime. + // The test asserts `generateSource` returns a non-empty body, which means Spark's own + // `doGenCode` succeeded under the codegen context (no NotImplementedError, no rewrite to + // a CodegenFallback path). + test("Bucket 4 datetime expressions produce non-empty generated kernel source") { + val intCol = ArrowColumnSpec( + CometBatchKernelCodegen.vectorClassBySimpleName("IntVector"), + nullable = true) + val longCol = ArrowColumnSpec( + CometBatchKernelCodegen.vectorClassBySimpleName("BigIntVector"), + nullable = true) + val decCol = ArrowColumnSpec( + CometBatchKernelCodegen.vectorClassBySimpleName("DecimalVector"), + nullable = true) + val dateCol = ArrowColumnSpec( + CometBatchKernelCodegen.vectorClassBySimpleName("DateDayVector"), + nullable = true) + val tsCol = ArrowColumnSpec( + CometBatchKernelCodegen.vectorClassBySimpleName("TimeStampMicroTZVector"), + nullable = true) + val strCol = ArrowColumnSpec( + CometBatchKernelCodegen.vectorClassBySimpleName("VarCharVector"), + nullable = true) + + val cases: Seq[(String, Expression, IndexedSeq[ArrowColumnSpec])] = Seq( + ( + "AddMonths", + AddMonths( + BoundReference(0, DateType, nullable = true), + BoundReference(1, IntegerType, nullable = true)), + IndexedSeq(dateCol, intCol)), + ( + "MonthsBetween", + MonthsBetween( + BoundReference(0, TimestampType, nullable = true), + BoundReference(1, TimestampType, nullable = true), + Literal(true), + Some("UTC")), + IndexedSeq(tsCol, tsCol)), + ( + "MakeTimestamp", + MakeTimestamp( + BoundReference(0, IntegerType, nullable = true), + BoundReference(1, IntegerType, nullable = true), + BoundReference(2, IntegerType, nullable = true), + BoundReference(3, IntegerType, nullable = true), + BoundReference(4, IntegerType, nullable = true), + BoundReference(5, DecimalType(8, 6), nullable = true), + timezone = None, + timeZoneId = Some("UTC")), + IndexedSeq(intCol, intCol, intCol, intCol, intCol, decCol)), + ( + "MillisToTimestamp", + MillisToTimestamp(BoundReference(0, LongType, nullable = true)), + IndexedSeq(longCol)), + ( + "MicrosToTimestamp", + MicrosToTimestamp(BoundReference(0, LongType, nullable = true)), + IndexedSeq(longCol)), + ( + "UnixSeconds", + UnixSeconds(BoundReference(0, TimestampType, nullable = true)), + IndexedSeq(tsCol)), + ( + "UnixMillis", + UnixMillis(BoundReference(0, TimestampType, nullable = true)), + IndexedSeq(tsCol)), + ( + "UnixMicros", + UnixMicros(BoundReference(0, TimestampType, nullable = true)), + IndexedSeq(tsCol)), + ( + "ToUnixTimestamp", + ToUnixTimestamp( + BoundReference(0, StringType, nullable = true), + Literal(UTF8String.fromString("yyyy-MM-dd HH:mm:ss"), StringType), + Some("UTC")), + IndexedSeq(strCol))) + cases.foreach { case (name, expr, specs) => + val src = CometBatchKernelCodegen.generateSource(expr, specs).body + assert(src.nonEmpty, s"$name: expected non-empty generated source") + assert( + src.contains("public java.lang.Object generate(Object[] references)"), + s"$name: generated source missing kernel class entry point") + } + } + test("CacheKey discriminates on ArrowColumnSpec.nullable") { // Structural regression: same expression bytes and same Arrow vector class with different // `nullable` must produce non-equal cache keys. The dispatcher today hardcodes `nullable=true` diff --git a/spark/src/test/scala/org/apache/comet/CometSqlFileTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometSqlFileTestSuite.scala index 47642f2357..53f21fe500 100644 --- a/spark/src/test/scala/org/apache/comet/CometSqlFileTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometSqlFileTestSuite.scala @@ -35,6 +35,31 @@ class CometSqlFileTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { (current(0) == required(0) && current(1) >= required(1)) } + /** + * Check if the current Spark version is at or below a maximum version. Used by paired fixtures + * where each version range has its own expected error class or output format. + */ + private def meetsMaxSparkVersion(maxVersion: String): Boolean = { + val current = org.apache.spark.SPARK_VERSION.split("[.-]").take(2).map(_.toInt) + val ceiling = maxVersion.split("[.-]").take(2).map(_.toInt) + (current(0) < ceiling(0)) || + (current(0) == ceiling(0) && current(1) <= ceiling(1)) + } + + /** + * Build a human-readable reason string describing why a fixture is skipped on the current Spark + * version. Returns None when both constraints are satisfied. + */ + private def skipReason(parsed: SqlTestFile): Option[String] = { + val minViolation = parsed.minSparkVersion.filter(!meetsMinSparkVersion(_)) + val maxViolation = parsed.maxSparkVersion.filter(!meetsMaxSparkVersion(_)) + (minViolation, maxViolation) match { + case (Some(m), _) => Some(s"requires Spark >= $m") + case (_, Some(m)) => Some(s"requires Spark <= $m") + case _ => None + } + } + private val testResourceDir = { val url = getClass.getClassLoader.getResource("sql-tests") assert(url != null, "Could not find sql-tests resource directory") @@ -133,17 +158,18 @@ class CometSqlFileTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { val parsed = SqlFileTestParser.parse(file) val combinations = configMatrix(parsed.configMatrix) - // Skip tests that require a newer Spark version - val skip = parsed.minSparkVersion.exists(!meetsMinSparkVersion(_)) + // Skip tests that fall outside the file's declared Spark version range. + val skip = skipReason(parsed) if (combinations.size <= 1) { // No matrix or single combination test(s"sql-file: $relativePath") { - if (skip) { - logInfo(s"SKIPPED (requires Spark ${parsed.minSparkVersion.get}): $relativePath") - } else { - val effectiveConfigs = parsed.configs ++ combinations.headOption.getOrElse(Seq.empty) - runTestFile(relativePath, parsed.copy(configs = effectiveConfigs)) + skip match { + case Some(reason) => + logInfo(s"SKIPPED ($reason): $relativePath") + case None => + val effectiveConfigs = parsed.configs ++ combinations.headOption.getOrElse(Seq.empty) + runTestFile(relativePath, parsed.copy(configs = effectiveConfigs)) } } } else { @@ -151,10 +177,11 @@ class CometSqlFileTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { combinations.foreach { matrixConfigs => val label = matrixConfigs.map { case (k, v) => s"$k=$v" }.mkString(", ") test(s"sql-file: $relativePath [$label]") { - if (skip) { - logInfo(s"SKIPPED (requires Spark ${parsed.minSparkVersion.get}): $relativePath") - } else { - runTestFile(relativePath, parsed.copy(configs = parsed.configs ++ matrixConfigs)) + skip match { + case Some(reason) => + logInfo(s"SKIPPED ($reason): $relativePath") + case None => + runTestFile(relativePath, parsed.copy(configs = parsed.configs ++ matrixConfigs)) } } } diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index a8147089d9..20ad90a91c 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -214,26 +214,21 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } test("date_format - timestamp_ntz input") { - // TimestampNTZ is timezone-independent, so date_format should produce the same - // formatted string regardless of session timezone. Comet currently only runs this - // natively for UTC; for non-UTC it falls back to Spark. We verify correctness - // (matching Spark's output) in all cases. + // TimestampNTZ is timezone-independent, so date_format must produce the same string + // regardless of session timezone. With the codegen dispatcher enabled, non-UTC sessions + // stay in Comet by running Spark's own `DateFormatClass.doGenCode` via the dispatcher. val r = new Random(42) val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions()) ntzDF.createOrReplaceTempView("ntz_tbl") val supportedFormats = CometDateFormat.supportedFormats.keys.toSeq.filterNot(_.contains("'")) - for (tz <- crossTimezones) { - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { - for (format <- supportedFormats) { - if (tz == "UTC") { + withSQLConf(CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "true") { + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + for (format <- supportedFormats) { checkSparkAnswerAndOperator( s"SELECT ts_ntz, date_format(ts_ntz, '$format') from ntz_tbl order by ts_ntz") - } else { - // Non-UTC falls back to Spark but should still produce correct results - checkSparkAnswer( - s"SELECT ts_ntz, date_format(ts_ntz, '$format') from ntz_tbl order by ts_ntz") } } } @@ -476,34 +471,62 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } - test("date_format unsupported format falls back to Spark") { + test("date_format unsupported format routes via codegen dispatcher") { createTimestampTestData.createOrReplaceTempView("tbl") - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { - // Unsupported format string + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", + CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "true") { + checkSparkAnswerAndOperator( + "SELECT c0, date_format(c0, 'yyyy-MM-dd EEEE') from tbl order by c0") + } + } + + test("date_format unsupported format falls back when codegen dispatcher disabled") { + createTimestampTestData.createOrReplaceTempView("tbl") + + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", + CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "false") { checkSparkAnswerAndFallbackReason( "SELECT c0, date_format(c0, 'yyyy-MM-dd EEEE') from tbl order by c0", - "Format 'yyyy-MM-dd EEEE' is not supported") + CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key) } } - test("date_format with non-UTC timezone falls back to Spark") { + test("date_format with non-UTC timezone routes via codegen dispatcher") { createTimestampTestData.createOrReplaceTempView("tbl") val nonUtcTimezones = Seq("America/New_York", "America/Los_Angeles", "Europe/London", "Asia/Tokyo") for (tz <- nonUtcTimezones) { - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { - // Non-UTC timezones should fall back to Spark as Incompatible + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz, + CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "true") { + checkSparkAnswerAndOperator( + "SELECT c0, date_format(c0, 'yyyy-MM-dd HH:mm:ss') from tbl order by c0") + } + } + } + + test("date_format with non-UTC timezone falls back when codegen dispatcher disabled") { + createTimestampTestData.createOrReplaceTempView("tbl") + + val nonUtcTimezones = Seq("America/New_York", "Europe/London") + + for (tz <- nonUtcTimezones) { + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz, + CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "false") { checkSparkAnswerAndFallbackReason( "SELECT c0, date_format(c0, 'yyyy-MM-dd HH:mm:ss') from tbl order by c0", - s"Non-UTC timezone '$tz' may produce different results") + CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key) } } } - test("date_format with non-UTC timezone works when allowIncompatible is enabled") { + test("date_format with non-UTC timezone takes native path when allowIncompatible is enabled") { createTimestampTestData.createOrReplaceTempView("tbl") val nonUtcTimezones = Seq("America/New_York", "Europe/London", "Asia/Tokyo") @@ -511,10 +534,13 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH for (tz <- nonUtcTimezones) { withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz, - "spark.comet.expr.DateFormatClass.allowIncompatible" -> "true") { - // With allowIncompatible enabled, Comet will execute the expression - // Results may differ from Spark but should not throw errors - checkSparkAnswer("SELECT c0, date_format(c0, 'yyyy-MM-dd') from tbl order by c0") + "spark.comet.expression.DateFormatClass.allowIncompatible" -> "true") { + // Native to_char results may diverge from Spark for non-UTC timezones (the reason the + // JVM UDF is the default), so we only check that execution stays inside Comet. ORDER BY + // is omitted to keep the plan free of AQEShuffleRead. + val df = sql("SELECT c0, date_format(c0, 'yyyy-MM-dd') from tbl") + df.collect() + checkCometOperators(stripAQEPlan(df.queryExecution.executedPlan)) } } } diff --git a/spark/src/test/scala/org/apache/comet/SqlFileTestParser.scala b/spark/src/test/scala/org/apache/comet/SqlFileTestParser.scala index 45198ed176..db01bc166f 100644 --- a/spark/src/test/scala/org/apache/comet/SqlFileTestParser.scala +++ b/spark/src/test/scala/org/apache/comet/SqlFileTestParser.scala @@ -69,20 +69,27 @@ case class ExpectError(pattern: String) extends QueryAssertionMode * @param tables * Table names extracted from CREATE TABLE statements (for cleanup). * @param minSparkVersion - * Optional minimum Spark version required to run this test (e.g. "3.5"). + * Optional minimum Spark version required to run this test (e.g. "3.5"). The test is skipped on + * older versions. + * @param maxSparkVersion + * Optional maximum Spark version this test applies to (e.g. "3.4"). The test is skipped on + * newer versions. Useful for paired fixtures where each version range has its own expected + * error class or output format. */ case class SqlTestFile( configs: Seq[(String, String)], configMatrix: Seq[(String, Seq[String])], records: Seq[SqlTestRecord], tables: Seq[String], - minSparkVersion: Option[String] = None) + minSparkVersion: Option[String] = None, + maxSparkVersion: Option[String] = None) object SqlFileTestParser { private val ConfigPattern = """--\s*Config:\s*(.+)=(.+)""".r private val ConfigMatrixPattern = """--\s*ConfigMatrix:\s*(.+)=(.+)""".r private val MinSparkVersionPattern = """--\s*MinSparkVersion:\s*(.+)""".r + private val MaxSparkVersionPattern = """--\s*MaxSparkVersion:\s*(.+)""".r private val CreateTablePattern = """(?i)CREATE\s+TABLE\s+(\w+)""".r.unanchored def parse(file: File): SqlTestFile = { @@ -98,6 +105,7 @@ object SqlFileTestParser { var configs = Seq.empty[(String, String)] var configMatrix = Seq.empty[(String, Seq[String])] var minSparkVersion: Option[String] = None + var maxSparkVersion: Option[String] = None val records = Seq.newBuilder[SqlTestRecord] val tables = Seq.newBuilder[String] @@ -118,6 +126,10 @@ object SqlFileTestParser { minSparkVersion = Some(version.trim) lineIdx += 1 + case MaxSparkVersionPattern(version) => + maxSparkVersion = Some(version.trim) + lineIdx += 1 + case "statement" => lineIdx += 1 val startLine = lineIdx + 1 @@ -141,7 +153,13 @@ object SqlFileTestParser { } } - SqlTestFile(configs, configMatrix, records.result(), tables.result(), minSparkVersion) + SqlTestFile( + configs, + configMatrix, + records.result(), + tables.result(), + minSparkVersion, + maxSparkVersion) } private val FallbackPattern = """query\s+expect_fallback\((.+)\)""".r