Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
28fb854
fix(codegen): use isNullAt on CometPlainVector-wrapped columns in nul…
andygrove May 23, 2026
d344ec0
feat: route date_format JVM fallback through codegen dispatcher
andygrove May 23, 2026
4823f33
Merge remote-tracking branch 'apache/main' into feat/date-format-jvm-udf
andygrove May 24, 2026
7cbb8e4
test: update ArrayInsertUnsupportedArgs fallback reason wording
andygrove May 24, 2026
7f781e1
feat(serde): add CometCodegenDispatch helper for codegen-routed expre…
andygrove May 24, 2026
34de47c
feat(datetime): route AddMonths through codegen dispatcher [skip ci]
andygrove May 24, 2026
15b4193
feat(datetime): route MonthsBetween through codegen dispatcher [skip ci]
andygrove May 24, 2026
4d352fb
feat(datetime): route MakeTimestamp through codegen dispatcher [skip ci]
andygrove May 24, 2026
6de2d2d
feat(datetime): route MillisToTimestamp through codegen dispatcher [s…
andygrove May 24, 2026
d22e355
feat(datetime): route MicrosToTimestamp through codegen dispatcher [s…
andygrove May 24, 2026
2d0a117
feat(datetime): route UnixSeconds through codegen dispatcher [skip ci]
andygrove May 24, 2026
2826f15
feat(datetime): route UnixMillis through codegen dispatcher [skip ci]
andygrove May 24, 2026
f4e1c0e
feat(datetime): route UnixMicros through codegen dispatcher [skip ci]
andygrove May 24, 2026
eebeef8
feat(datetime): route ToUnixTimestamp through codegen dispatcher [ski…
andygrove May 24, 2026
771f367
test(datetime): ANSI fixtures for codegen-routed throw-capable expres…
andygrove May 24, 2026
5871339
test(codegen): unit coverage for Bucket 4 datetime kernel source [ski…
andygrove May 24, 2026
fadcbfe
style: apply spotless formatting
andygrove May 24, 2026
dd1723f
fix: address code-review feedback for codegen-routed datetime fixtures
andygrove May 24, 2026
6a22b63
test: add MaxSparkVersion annotation and Spark 3.4 ANSI fixtures
andygrove May 24, 2026
1b952e7
style: apply spotless to MaxSparkVersion javadoc
andygrove May 24, 2026
9789272
fix: use JDK java.time field names in make_timestamp_ansi expect_error
andygrove May 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
val subExprsCode = ctx.subexprFunctionsCode
val (cls, setup, snippet) =
CometBatchKernelCodegenOutput.emitOutputWriter(boundExpr.dataType, ev.value, ctx)
(cls, setup, defaultBody(boundExpr, ev, snippet, subExprsCode))
(cls, setup, defaultBody(boundExpr, inputSchema, ev, snippet, subExprsCode))
}

val typedFieldDecls = CometBatchKernelCodegenInput.emitInputFieldDecls(inputSchema)
Expand Down Expand Up @@ -343,6 +343,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
*/
private def defaultBody(
boundExpr: Expression,
inputSchema: Seq[ArrowColumnSpec],
ev: ExprCode,
writeSnippet: String,
subExprsCode: String): String = {
Expand All @@ -353,9 +354,17 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
// make this incorrect (`coalesce(null, x)` is `x`); `allNullIntolerant` rejects those.
val inputOrdinals =
boundExpr.collect { case b: BoundReference => b.ordinal }.distinct
// Primitive Arrow vectors are wrapped in `CometPlainVector` at input-cast time, which
// exposes `isNullAt(int)` rather than the raw Arrow `isNull(int)`. Pick the right method
// per ordinal so the short-circuit compiles for timestamp / int / float columns too,
// not just VarChar / Decimal vectors that stay as raw Arrow types.
def nullCheckCall(ord: Int): String = {
val method = CometBatchKernelCodegenInput.nullCheckMethod(inputSchema(ord))
s"this.col$ord.$method(i)"
}
val nullCheck =
if (inputOrdinals.isEmpty) "false"
else inputOrdinals.map(ord => s"this.col$ord.isNull(i)").mkString(" || ")
else inputOrdinals.map(nullCheckCall).mkString(" || ")
s"""
|if ($nullCheck) {
| output.setNull(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,10 @@ private[codegen] object CometBatchKernelCodegenInput {
/**
* Java method name for the per-column null check. Primitive scalars wrapped in
* [[CometPlainVector]] expose `isNullAt`; Arrow typed fields expose `isNull`. Same semantics.
* Used both by `emitTypedGetters` (for the kernel's `isNullAt` switch) and by
* `CometBatchKernelCodegen.defaultBody` (for the `NullIntolerant` short-circuit).
*/
private def nullCheckMethod(spec: ArrowColumnSpec): String = spec match {
def nullCheckMethod(spec: ArrowColumnSpec): String = spec match {
case sc: ScalarColumnSpec if wrapsInCometPlainVector(sc.vectorClass) => "isNullAt"
case _ => "isNull"
}
Expand Down
46 changes: 42 additions & 4 deletions spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.comet.serde

import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Literal, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, Literal, ScalaUDF}
import org.apache.spark.sql.types.BinaryType

import org.apache.comet.CometConf
Expand All @@ -45,15 +45,35 @@ import org.apache.comet.udf.codegen.CometScalaUDFCodegen
*
* Gated by [[CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED]]. When disabled, plans containing a
* `ScalaUDF` fall back to Spark for the enclosing operator.
*
* [[emitJvmCodegenDispatch]] exposes the same closure-serialize + dispatcher-proto path to other
* serdes that want to keep a built-in Spark expression inside the Comet pipeline when no native
* lowering is viable. See [[CometDateFormat]] for an example.
*/
object CometScalaUDF extends CometExpressionSerde[ScalaUDF] {

override def convert(expr: ScalaUDF, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
override def convert(expr: ScalaUDF, inputs: Seq[Attribute], binding: Boolean): Option[Expr] =
emitJvmCodegenDispatch(expr, inputs, binding)

/**
* Bind `expr`, closure-serialize it, and emit a `JvmScalarUdf` proto routed through
* [[CometScalaUDFCodegen]] so that native execution evaluates the expression inside the
* Arrow-direct codegen dispatcher. The dispatcher will Janino-compile `expr.doGenCode` into a
* batch kernel on first invocation per task.
*
* Returns `None` (with `withInfo` tagging the reason) when the dispatcher is disabled via
* [[CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED]] or when [[CometBatchKernelCodegen.canHandle]]
* refuses the expression tree. Callers should treat `None` as a clean Spark-fallback signal.
*/
def emitJvmCodegenDispatch(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
if (!CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.get()) {
withInfo(
expr,
s"${CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key}=false; ScalaUDF has no native path " +
"so the plan falls back to Spark")
s"${CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key}=false; expression has no native " +
"path so the plan falls back to Spark")
return None
}

Expand Down Expand Up @@ -100,3 +120,21 @@ object CometScalaUDF extends CometExpressionSerde[ScalaUDF] {
.build())
}
}

/**
* Convenience base for serdes that route a non-ScalaUDF Spark expression through the codegen
* dispatcher. Delegates `convert` to [[CometScalaUDF.emitJvmCodegenDispatch]] and marks the
* expression `Compatible()` because the dispatcher runs Spark's own `doGenCode` inside the
* kernel: behavior matches Spark exactly when [[CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED]] is
* enabled, and the operator falls back to Spark cleanly when it is not.
*/
class CometCodegenDispatch[T <: Expression] extends CometExpressionSerde[T] {
override def getSupportLevel(expr: T): SupportLevel = Compatible()
// Intentionally no getCompatibleNotes override: the docs generator emits compat notes under
// a heading that promises "no additional configuration required". The dispatcher flag is a
// global concern documented elsewhere; tagging each expression here would contradict the
// heading. When the flag is off, `convert` returns None with a clear withInfo reason that
// shows up in EXPLAIN, which is the right place for that signal.
override def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr] =
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {

private[comet] val temporalExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] =
Map(
classOf[AddMonths] -> CometAddMonths,
classOf[ConvertTimezone] -> CometConvertTimezone,
classOf[DateAdd] -> CometDateAdd,
classOf[DateDiff] -> CometDateDiff,
Expand All @@ -234,12 +235,20 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
classOf[LastDay] -> CometLastDay,
classOf[Hour] -> CometHour,
classOf[MakeDate] -> CometMakeDate,
classOf[MakeTimestamp] -> CometMakeTimestamp,
classOf[MicrosToTimestamp] -> CometMicrosToTimestamp,
classOf[MillisToTimestamp] -> CometMillisToTimestamp,
classOf[MonthsBetween] -> CometMonthsBetween,
classOf[Minute] -> CometMinute,
classOf[NextDay] -> CometNextDay,
classOf[Second] -> CometSecond,
classOf[SecondsToTimestamp] -> CometSecondsToTimestamp,
classOf[TruncDate] -> CometTruncDate,
classOf[TruncTimestamp] -> CometTruncTimestamp,
classOf[ToUnixTimestamp] -> CometToUnixTimestamp,
classOf[UnixMicros] -> CometUnixMicros,
classOf[UnixMillis] -> CometUnixMillis,
classOf[UnixSeconds] -> CometUnixSeconds,
classOf[UnixTimestamp] -> CometUnixTimestamp,
classOf[Year] -> CometYear,
classOf[Month] -> CometMonth,
Expand Down
123 changes: 66 additions & 57 deletions spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.catalyst.expressions.{AddMonths, Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, MakeTimestamp, MicrosToTimestamp, MillisToTimestamp, Minute, Month, MonthsBetween, NextDay, Quarter, Second, SecondsToTimestamp, ToUnixTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixMicros, UnixMillis, UnixSeconds, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.CometGetDateField.CometGetDateField
Expand Down Expand Up @@ -593,17 +594,23 @@ object CometTruncTimestamp extends CometExpressionSerde[TruncTimestamp] {
}

/**
* Converts Spark DateFormatClass expression to DataFusion's to_char function.
* Converts Spark `DateFormatClass` to DataFusion's `to_char` when format and timezone are
* mappable, otherwise routes the expression through the Arrow-direct codegen dispatcher so that
* Spark's own `DateFormatClass.doGenCode` runs inside the Comet pipeline.
*
* Spark uses Java SimpleDateFormat patterns while DataFusion uses strftime patterns. This
* implementation supports a whitelist of common format strings that can be reliably mapped
* between the two systems.
* Routing:
* - format is a literal in `supportedFormats` AND timezone is UTC -> native `to_char`
* - format is a literal in `supportedFormats` AND timezone is non-UTC, with the per-expression
* `allowIncompatible` flag set -> native `to_char` (results may differ from Spark)
* - all other cases -> JVM codegen dispatcher ([[CometScalaUDF.emitJvmCodegenDispatch]]), gated
* by [[CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED]]. When that flag is disabled the operator
* falls back to Spark.
*/
object CometDateFormat extends CometExpressionSerde[DateFormatClass] {

/**
* Mapping from Spark SimpleDateFormat patterns to strftime patterns. Only formats in this map
* are supported.
* are supported by the native path.
*/
val supportedFormats: Map[String, String] = Map(
// Full date formats
Expand Down Expand Up @@ -637,66 +644,50 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
// ISO formats
"yyyy-MM-dd'T'HH:mm:ss" -> "%Y-%m-%dT%H:%M:%S")

override def getIncompatibleReasons(): Seq[String] = Seq(
"Non-UTC timezones may produce different results than Spark")

override def getUnsupportedReasons(): Seq[String] = Seq(
"Only the following formats are supported:" +
supportedFormats.keys.toSeq.sorted
.map(k => s"`$k`")
.mkString("\n - ", "\n - ", ""))
// Compatibility is decided inside `convert`: the native path covers a subset, and the codegen
// dispatcher covers everything else when enabled. Plan-time tagging happens via `withInfo` on
// the path that returns None.
override def getSupportLevel(expr: DateFormatClass): SupportLevel = Compatible()

override def getSupportLevel(expr: DateFormatClass): SupportLevel = {
// Check timezone - only UTC is fully compatible
val timezone = expr.timeZoneId.getOrElse("UTC")
val isUtc = timezone == "UTC" || timezone == "Etc/UTC"

expr.right match {
case Literal(fmt: UTF8String, _) =>
val format = fmt.toString
if (supportedFormats.contains(format)) {
if (isUtc) {
Compatible()
} else {
Incompatible(Some(s"Non-UTC timezone '$timezone' may produce different results"))
}
} else {
Unsupported(
Some(
s"Format '$format' is not supported. Supported formats: " +
supportedFormats.keys.mkString(", ")))
}
case _ =>
Unsupported(Some("Only literal format strings are supported"))
}
}
override def getCompatibleNotes(): Seq[String] = Seq(
"Format strings in a curated allow-list run natively via DataFusion's `to_char` for UTC " +
"sessions. Other format strings (including non-literal formats), as well as non-UTC " +
"sessions, route through Spark's own `DateFormatClass.doGenCode` via the Arrow-direct " +
"codegen dispatcher when `spark.comet.exec.scalaUDF.codegen.enabled=true`. When the " +
"codegen dispatcher is disabled (default) the operator falls back to Spark in those " +
"cases.")

override def convert(
expr: DateFormatClass,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
// Get the format string - must be a literal for us to map it
val strftimeFormat = expr.right match {
case Literal(fmt: UTF8String, _) =>
supportedFormats.get(fmt.toString)
val timezone = expr.timeZoneId.getOrElse("UTC")
val isUtc = timezone == "UTC" || timezone == "Etc/UTC"

val nativeFormat: Option[String] = expr.right match {
case Literal(fmt: UTF8String, _) => supportedFormats.get(fmt.toString)
case _ => None
}

strftimeFormat match {
case Some(format) =>
val childExpr = exprToProtoInternal(expr.left, inputs, binding)
val formatExpr = exprToProtoInternal(Literal(format), inputs, binding)

val optExpr = scalarFunctionExprToProtoWithReturnType(
"to_char",
StringType,
false,
childExpr,
formatExpr)
optExprWithInfo(optExpr, expr, expr.left, expr.right)
case None =>
withInfo(expr, expr.left, expr.right)
None
val canUseNative = nativeFormat.isDefined && {
isUtc || CometConf.isExprAllowIncompat(getExprConfigName(expr))
}

if (canUseNative) {
val childExpr = exprToProtoInternal(expr.left, inputs, binding)
val formatExpr = exprToProtoInternal(Literal(nativeFormat.get), inputs, binding)
val optExpr = scalarFunctionExprToProtoWithReturnType(
"to_char",
StringType,
false,
childExpr,
formatExpr)
optExprWithInfo(optExpr, expr, expr.left, expr.right)
} else {
// Hand the full `DateFormatClass` (with `timeZoneId` already stamped by `ResolveTimeZone`)
// to the codegen dispatcher. It closure-serializes the bound tree, so non-UTC timezones
// and non-whitelisted / non-literal format strings produce Spark-identical results.
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
}
}
}
Expand Down Expand Up @@ -780,3 +771,21 @@ object CometDays extends CometExpressionSerde[Days] {
optExprWithInfo(optExpr, expr, expr.child)
}
}

object CometAddMonths extends CometCodegenDispatch[AddMonths]

object CometMonthsBetween extends CometCodegenDispatch[MonthsBetween]

object CometMakeTimestamp extends CometCodegenDispatch[MakeTimestamp]

object CometMicrosToTimestamp extends CometCodegenDispatch[MicrosToTimestamp]

object CometMillisToTimestamp extends CometCodegenDispatch[MillisToTimestamp]

object CometUnixSeconds extends CometCodegenDispatch[UnixSeconds]

object CometUnixMillis extends CometCodegenDispatch[UnixMillis]

object CometUnixMicros extends CometCodegenDispatch[UnixMicros]

object CometToUnixTimestamp extends CometCodegenDispatch[ToUnixTimestamp]
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Routes add_months through the codegen dispatcher. Spark's own AddMonths.doGenCode
-- runs inside the Janino-compiled kernel.
-- Config: spark.sql.session.timeZone=America/Los_Angeles
-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true

statement
CREATE TABLE test_add_months(d date, n int) USING parquet

statement
INSERT INTO test_add_months VALUES
(date('2024-01-15'), 1),
(date('2024-01-31'), 1),
(date('2024-12-15'), -13),
(date('1970-01-01'), 0),
(NULL, 1),
(date('2024-06-15'), NULL)

query
SELECT add_months(d, n) FROM test_add_months

query
SELECT add_months(d, 12) FROM test_add_months

-- literal arguments
query
SELECT add_months(date('2024-02-29'), 12)
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@
-- specific language governing permissions and limitations
-- under the License.

-- Pin the session timezone so the test exercises the non-UTC path regardless of the JVM
-- default. Enable the codegen dispatcher so non-UTC and non-whitelisted formats stay inside
-- Comet via Spark's own DateFormatClass.doGenCode instead of falling back to Spark.
-- Config: spark.sql.session.timeZone=America/Los_Angeles
-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true

statement
CREATE TABLE test_date_format(ts timestamp) USING parquet

statement
INSERT INTO test_date_format VALUES (timestamp('2024-06-15 10:30:45')), (timestamp('1970-01-01 00:00:00')), (NULL)

query expect_fallback(Non-UTC timezone)
query
SELECT date_format(ts, 'yyyy-MM-dd') FROM test_date_format

query expect_fallback(Non-UTC timezone)
query
SELECT date_format(ts, 'HH:mm:ss') FROM test_date_format

query expect_fallback(Non-UTC timezone)
query
SELECT date_format(ts, 'yyyy-MM-dd HH:mm:ss') FROM test_date_format

-- literal arguments
query expect_fallback(Non-UTC timezone)
query
SELECT date_format(timestamp('2024-06-15 10:30:45'), 'yyyy-MM-dd')
Loading
Loading