Skip to content

feat: Add experimental support for accelerated PyArrow UDFs#4234

Open
andygrove wants to merge 44 commits into
apache:mainfrom
andygrove:pyarrow-udf
Open

feat: Add experimental support for accelerated PyArrow UDFs#4234
andygrove wants to merge 44 commits into
apache:mainfrom
andygrove:pyarrow-udf

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented May 6, 2026

Which issue does this PR close?

Closes #957
Closes #4240

Rationale for this change

Spark's mapInArrow / mapInPandas plans insert ColumnarToRow between a Comet scan and the Python operator, and MapInBatchExec then projects the Python output back to rows. Both projections copy data per row even though the upstream Comet plan and the Python runner both speak Arrow. Inside the Python runner, ArrowPythonRunner then re-encodes those rows into Arrow IPC, so the data round-trips Arrow → Row → Arrow on the JVM side before ever reaching Python.

This PR replaces the row-based plumbing with a custom CometArrowPythonRunner that consumes Iterator[ColumnarBatch] directly. The destination VectorSchemaRoot (in Spark's allocator, used for the IPC stream) is populated per batch by a single Unsafe.copyMemory per buffer per column, copying straight from Comet's vectors into the runner's root. No row materialization, no ArrowWriter.write(InternalRow) loop.

Not zero-copy in the strict sense: Comet's Parquet readers each construct their own RootAllocator, separate from ArrowUtils.rootAllocator, so Arrow's TransferPair cannot share buffers across the boundary. Bulk per-buffer memcpy is the next-best alternative and is materially faster than the per-row write loop, especially on wide rows where the row path is dominated by the InternalRow accessor overhead. A future PR that unifies the allocator parent would unlock true zero-copy via TransferPair.

I/O asymmetry: why memcpy on input but not on output

The input path memcpys; the output path does not, and that is structural rather than an oversight.

  • Input (JVM → Python). Two Arrow vector trees already exist in different allocators. The source is Comet's ColumnarBatch, allocated from each Parquet reader's private RootAllocator. The destination is Spark's persistent IPC root, allocated from ArrowUtils.rootAllocator. Because the two roots are unrelated, Arrow's TransferPair / VectorLoader.load cannot rebind buffers across the boundary. CometColumnarPythonInput.copyVector walks the trees in lockstep and setBytes-copies each buffer. This is the bridge between the two allocator trees.
  • Output (Python → JVM). There is no second tree to bridge to. Spark's BasicPythonArrowOutput constructs an ArrowStreamReader against a per-iterator BufferAllocator; loadNextBatch decodes the IPC frames coming back from the Python worker directly into a VectorSchemaRoot allocated from that allocator. Each FieldVector in the root is wrapped in an ArrowColumnVector and emitted as a ColumnarBatch. CometMapInBatchExec consumes those vectors as-is - it just unwraps the single struct child and forwards them. Nothing else in the JVM owns those buffers, so there is no boundary to cross.

In short, the input has two pre-existing trees in different allocators (memcpy required); the output has one tree, decoded straight into the right allocator (no copy possible or needed). The future allocator-unification work that would unlock TransferPair on input does not apply to output - the output is already as good as it can get.

Plan (Spark 4.0)

Baseline (spark.comet.exec.pyarrowUdf.enabled=false):

*(1) Filter (isnotnull(id#6L) AND (id#6L < 100))
+- MapInArrow transform(id#3L, value#4)#5, [id#6L, value#7], false
   +- CometNativeColumnarToRow
      +- CometNativeScan parquet [id#3L,value#4] Batched: true, DataFilters: [],
         Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/...],
         PartitionFilters: [], PushedFilters: [],
         ReadSchema: struct<id:bigint,value:double>

Optimized (spark.comet.exec.pyarrowUdf.enabled=true):

*(1) Filter (isnotnull(id#18L) AND (id#18L < 100))
+- CometMapInBatch transform(id#15L, value#16)#17, [id#18L, value#19], false, 207
   +- CometNativeScan parquet [id#15L,value#16] Batched: true, DataFilters: [],
      Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/...],
      PartitionFilters: [], PushedFilters: [],
      ReadSchema: struct<id:bigint,value:double>

What changes are included in this PR?

  • New input trait CometColumnarPythonInput under spark/src/main/spark-4.x/.../execution/python/. Extends Spark's private[python] PythonArrowInput[Iterator[ColumnarBatch]]; implements writeNextBatchToArrowStream by walking the destination struct's children, allocating each sized to match the corresponding Comet column, collecting buffer addresses, and issuing one bulk copy for the whole tree.
  • New CometArrowPythonRunner per Spark minor (4.0, 4.1, 4.2) under the same package. 4.0 extends BasePythonRunner directly because Spark 4.0's BaseArrowPythonRunner is bound to Iterator[InternalRow]; 4.1/4.2 extend the generic BaseArrowPythonRunner[IN, OUT]. All three mix in CometColumnarPythonInput plus Spark's BasicPythonArrowOutput.
  • New helper CometVectorIpcCopier in comet-common crosses the shading boundary using only long primitives. comet-common shades org.apache.arrow.* into org.apache.comet.shaded.arrow.*; comet-spark references the unshaded Spark Arrow classes. The helper does the byte-level walk over the source vector tree inside common, exposing bufferReadableBytes(): long[], valueCounts(): int[], and copyBuffersToAddresses(addresses: long[]): Unit. No shaded type crosses the API.
  • CometMapInBatchExec loses all row plumbing (rowIterator, BatchIterator, ContextAwareIterator, InternalRow(_) wrap). Feeds Iterator[ColumnarBatch] straight to the runner; on output, unwraps the single struct column into the user's output columns as before. isBarrier is propagated through RDD.barrier() so mapInArrow(..., barrier=True) keeps its gang-scheduling semantics.
  • EliminateRedundantTransitions rewrite unchanged in shape: matches ColumnarToRow + (PythonMapInArrowExec | MapInPandasExec | MapInArrowExec) over a columnar Comet child.
  • Per-Spark-version shim for the differing ArrowPythonRunner constructors and the 4.0+ MapInArrowExec rename. Shared 4.x bits live in spark-4.x/.../Spark4xMapInBatchSupport.scala; per-minor shims provide only the runner factory.
  • Spark 3.4 and 3.5 are no-ops. 3.5's PythonArrowInput trait has a different contract (writeIteratorToArrowStream one-shot vs 4.x's writeNextBatchToArrowStream batch-at-a-time) and a separate implementation has not been written. The matchers in the 3.4 / 3.5 shims return None; vanilla Spark handles the operation. 3.5 support can be added later if there is user demand.
  • New conf spark.comet.exec.pyarrowUdf.enabled, default false while experimental.
  • User guide page documenting usage, plan flow, barrier semantics, the buffer-copy boundary, and limitations.

Limitations

  • The optimization currently applies only to mapInArrow and mapInPandas. Scalar pandas UDFs (@pandas_udf) and grouped operations (applyInPandas) are not yet supported.
  • Spark 4.0 or newer is required. Spark 3.4 and 3.5 are documented no-ops.
  • The optimization fires across a shuffle only with Comet's columnar shuffle. A vanilla Spark Exchange outputs rows and breaks the precondition.
  • Not zero-copy; bulk per-buffer memcpy. Comet's Parquet readers each construct their own RootAllocator, so cross-root TransferPair cannot be used. A future PR that has the readers allocate from ArrowUtils.rootAllocator would unlock zero-copy.

How are these changes tested?

  • CometMapInBatchSuite under spark/src/test/spark-4.x/ covers the JVM-side rule and an end-to-end check: constructs a MapInArrowExec over a stub CometPlan leaf and asserts EliminateRedundantTransitions rewrites it to CometMapInBatchExec; runs a Parquet → mapInArrow query with primitives + nullable varchar and asserts row-equivalence with the un-rewritten output.
  • pytest module test_pyarrow_udf.py covers mapInArrow and mapInPandas end-to-end against a real Python worker: nulls, empty input, Python exception propagation, decimal / date / timestamp / array / struct types, post-shuffle correctness, and barrier=True gang scheduling. 24 / 24 cases pass locally on Spark 4.0.
  • Dedicated pyarrow_udf_test.yml workflow runs the pytest module on every PR against Spark 4.0 (covers the 4.x shim path; paths allowlist scoped to the feature files).

Wall-clock benchmark (benchmark_pyarrow_udf.py, 1M rows, 5 iters, local[2], Spark 4.0.2 / PySpark 4.0.1):

api workload vanilla median optimized median rows/s speedup
mapInArrow narrow primitives 0.268 s 0.140 s 7.5M 1.92x
mapInPandas narrow primitives 0.269 s 0.141 s 7.4M 1.91x
mapInArrow mixed with strings 0.489 s 0.273 s 3.8M 1.79x
mapInPandas mixed with strings 0.504 s 0.283 s 3.7M 1.78x
mapInArrow wide rows (50 cols) 2.542 s 1.795 s 584K 1.42x
mapInPandas wide rows (50 cols) 2.588 s 1.846 s 568K 1.40x

The benchmark UDF is a pure passthrough on local[2], so most of the wall time is Spark's Python fork / IPC overhead. Real UDFs (PyArrow compute, pandas ops, model inference) increase the per-row Python cost and shrink the speedup ratio. The bulk-copy path improves the wide-row case the most because that's where the row path's per-InternalRow overhead is most concentrated.

andygrove added 5 commits May 5, 2026 18:06
When Comet operators produce Arrow columnar data and the next operator
is a Python UDF (mapInArrow/mapInPandas), Spark currently inserts an
unnecessary ColumnarToRow transition. The Python runner then converts
those rows back to Arrow to send to Python, creating a wasteful
Arrow->Row->Arrow round-trip.

This adds CometPythonMapInArrowExec which:
- Accepts columnar input directly from Comet operators
- Uses lightweight batch.rowIterator() instead of UnsafeProjection
- Keeps the Python output as ColumnarBatch (no output row conversion)

The optimization is detected in EliminateRedundantTransitions and
controlled by spark.comet.exec.pythonMapInArrow.enabled (default: true).
Documents the CometPythonMapInArrowExec optimization, including
supported APIs, configuration, usage example, and how to verify
the optimization is active in query plans.
…ions

Fix three issues that prevented test_pyarrow_udf.py from running:

1. mapInArrow callbacks must accept Iterator[pa.RecordBatch] and yield
   batches. The previous single-batch signatures crashed with
   "'map' object has no attribute 'to_pandas'".
2. PySpark DataFrame has no `queryExecution` attribute. Use
   `_jdf.queryExecution().executedPlan().toString()` instead.
3. Replace soft plan-string heuristics with assertions that fail loudly
   if the optimization regresses. Match on `CometPythonMapInArrow` (no
   `Exec` suffix in the plan toString) and assert no `ColumnarToRow`
   transition is present.
- Rewrite test_pyarrow_udf.py as a pytest module. A session-scoped
  SparkSession fixture builds the Comet-enabled session once and a
  parametrized `accelerated` fixture toggles
  spark.comet.exec.pythonMapInArrow.enabled per test, so each case runs
  under both the optimized and fallback paths and asserts the expected
  plan operator (`CometPythonMapInArrow` vs vanilla `PythonMapInArrow`).
  The jar is auto-discovered from spark/target by matching the installed
  pyspark version, or taken from the COMET_JAR env var.
- Add a dedicated `PyArrow UDF Tests` workflow that builds Comet against
  Spark 3.5 / Scala 2.12, installs pyspark/pyarrow/pandas/pytest, and
  runs the new pytest module.
- Add CometPythonMapInArrowSuite to the `exec` suite list in both
  pr_build_linux.yml and pr_build_macos.yml so the JVM-side suite is
  exercised on every PR.
Comment thread .github/workflows/pyarrow_udf_test.yml Fixed
andygrove added 6 commits May 5, 2026 18:46
Replace the narrow paths allowlist with the same paths-ignore list used
by pr_build_linux.yml so the workflow runs on any source change that
could affect Comet's PyArrow UDF execution path, not just the few files
explicitly named.
The PR's `CometPythonMapInArrowExec` and `EliminateRedundantTransitions`
rule directly reference Spark 3.5 APIs that differ across supported
Spark versions: the `ArrowPythonRunner` constructor (4 distinct
signatures across 3.4/3.5/4.0/4.1+/4.2), `arrowUseLargeVarTypes`,
`JobArtifactSet`, `MapInBatchExec.isBarrier`, and the `PythonMapInArrowExec`
type itself (renamed to `MapInArrowExec` in 4.0+). This breaks compile
on every profile other than 3.5.

Introduce a per-version `ShimCometPythonMapInArrow` trait under
`org.apache.spark.sql.comet.shims` (placed in the spark namespace so
it can reach `private[spark]` members) that:

* matches the Spark-version-specific MapInArrow / MapInPandas exec types
  and exposes their `(func, output, child, isBarrier, evalType)` tuple,
* constructs the right `ArrowPythonRunner` for the version,
* hides `arrowUseLargeVarTypes` / `JobArtifactSet` / `getPythonRunnerConfMap`
  behind helper methods.

Spark 3.4 lacks the prerequisite APIs (no `isBarrier`, no `JobArtifactSet`,
no `arrowUseLargeVarTypes`), so its shim returns `None` from the matchers
and the optimization is a no-op there.
The default `amd64/rust` image is Debian 13 (trixie), where the system
`python3` is 3.13 and there is no `python3.11` apt package. The workflow
installed `python3.11` explicitly, which fails on trixie with `Unable to
locate package python3.11`.

Switching to `rust:bookworm` gives a Debian 12 base where `python3` is
3.11, matching the job name and pyspark 3.5.x's supported runtime.
Spark launches Python workers in fresh subprocesses that look up python3
on PATH. Without PYSPARK_PYTHON, workers use the system python (no pyarrow
installed) and UDF execution fails with ModuleNotFoundError. Point both
PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON at /tmp/venv/bin/python so workers
inherit the same interpreter that pytest uses.
@andygrove andygrove changed the title feat: Add support for accelerated PyArrow UDFs [experimental] feat: Add experimental support for accelerated PyArrow UDFs May 6, 2026
andygrove added 4 commits May 6, 2026 07:44
Flip spark.comet.exec.pythonMapInArrow.enabled default from true to false
and prefix the config doc with "Experimental:" so the default matches the
"[experimental]" label on the feature. Update the user guide to instruct
users to opt in explicitly.
Add coverage for cases that the original pytest module did not exercise:

- mapInPandas (claimed supported, previously zero coverage)
- Null preservation across long and string columns via Arrow passthrough
- Empty input from a CometScan via filter pushdown
- Python exception propagation (sentinel must surface in driver-side error)
- DecimalType(18,6), DateType, TimestampType round-trip with nulls
- ArrayType<Int> and nested StructType, including null arrays/structs and
  arrays containing null elements
- repartition between scan and UDF (correctness only; the optimization
  itself does not fire across a vanilla Exchange and is documented as
  such in the test)

Generalize _assert_plan_matches_mode to take the vanilla node name so the
fallback assertion can match either PythonMapInArrow or MapInPandas.
Expand the user guide with the limitations a user should know before
enabling the experimental optimization:

- The remaining row-to-Arrow round-trip inside the Python runner is
  documented more precisely (the input goes through ColumnarBatch.rowIterator
  to feed ArrowPythonRunner, which re-encodes to Arrow IPC).
- A vanilla Spark Exchange between the Comet scan and the UDF prevents
  the optimization from firing. Users must configure Comet's native
  shuffle manager at session startup to keep the data columnar.
- Spark 3.4 lacks the prerequisite APIs and the feature is a no-op there.
- isBarrier is captured by the operator constructor but not yet
  propagated to the Python runner.

Also explain the AQE display quirk: with AQE on and a shuffle present,
the pre-execution plan shows the unoptimized form because the rule
only sees the materialized subplan after stage execution. Running an
action and re-inspecting explain() reveals the optimized plan.
Standalone Python script that times df.mapInArrow(passthrough).count()
and the equivalent mapInPandas query with the optimization toggled on
and off. Numbers are wall-clock seconds, so they include the Python
worker, Arrow IPC, and downstream count() costs. That is the right
unit for a feature whose user surface is Python: it shows what
fraction of end-to-end time the optimization shaves off, not just the
JVM-side delta in isolation.

Three workloads exercise the dimension where the optimization helps
most:

- narrow primitives (long, int, double)
- mixed with strings (variable-length encoding)
- wide rows (50 columns, projection cost scales with column count)

Local smoke run with 200k rows shows 1.17x to 1.45x speedup across
mapInArrow and mapInPandas, narrow/wide schemas. The script is
configurable via BENCHMARK_ROWS / BENCHMARK_WARMUP / BENCHMARK_ITERS
env vars for users who want longer or shorter runs.
The operator captured isBarrier in its constructor but always called
inputRDD.mapPartitionsInternal, dropping the barrier execution mode
semantics that mapInArrow(..., barrier=True) requests. Stages running
under the optimization lost gang scheduling and the BarrierTaskContext
APIs the UDF expects.

Branch on isBarrier and route through inputRDD.barrier().mapPartitions
in the barrier case, matching what Spark's MapInBatchExec.doExecute
does. Add a pytest case that calls BarrierTaskContext.get() inside the
UDF, which raises if the task is not running in a barrier stage; runs
in both vanilla and optimized modes. Drop the isBarrier limitation
note from the user guide.
@comphead
Copy link
Copy Markdown
Contributor

comphead commented May 6, 2026

I assume not only speed is improved, would also be interesting to check memory metrics

@andygrove andygrove moved this from Todo to In progress in Comet Development May 6, 2026
Copy link
Copy Markdown
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments. Otherwise great PR!

* Optimized flow: CometNativeExec (Arrow) -> CometPythonMapInArrowExec (batch.rowIterator() ->
* Arrow -> Python -> Arrow columnar output)
*
* This eliminates:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 !

child: SparkPlan,
isBarrier: Boolean,
pythonEvalType: Int)
extends UnaryExecNode
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other Comet operators extend CometPlan. The idea was that all Comet specific common behavior (say some Comet specific metrics, for example) can be in a single place.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment thread .github/workflows/pyarrow_udf_test.yml Outdated

jobs:
pyarrow-udf:
name: PyArrow UDF (Spark 3.5, JDK 17, Python 3.11)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Spark 4 now?
I'm assuming that this is enabled for only one version of Spark because it is experimental?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the latest changes, this feature is now only supported for Spark 4.x. If there is user demand, we can create a separate PR to support for 3.x

.booleanConf
.createWithDefault(false)

val COMET_PYTHON_MAP_IN_ARROW_ENABLED: ConfigEntry[Boolean] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAP is confusing IMO, so many things related to map in Spark

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe COMET_PYARROW_SUPPORT or something like that?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated docs to clarify.

Mirror the pr_build_linux fix: the macos workflow also still referenced the old CometPythonMapInArrowSuite FQN.
@andygrove
Copy link
Copy Markdown
Member Author

@mbutrovich thanks for the thorough pass — full mapping below.

Framing. PR description updated to "drop two UnsafeProjection copies and keep the stage columnar". #4240 still tracks full Arrow-to-Arrow round-trip elimination.

Bigger items

  1. Conf + class renamed to spark.comet.exec.pyarrowUdf.enabled and CometMapInBatchExec.
  2. Shared 4.x bits live in spark-4.x/.../Spark4xMapInBatchSupport.scala (matchers, RunnerInputs, runnerInputs helper). Each minor's ShimCometMapInBatch is just the per-version ArrowPythonRunner factory.
  3. Scala suite replaced with per-version CometMapInBatchSuite (spark/src/test/spark-{3.5,4.x}) that constructs a PythonMapInArrowExec / MapInArrowExec over a stub CometPlan leaf and asserts EliminateRedundantTransitions rewrites it (plus the disabled-conf negative).
  4. CometPlan mixed into CometMapInBatchExec.

Smaller things

  1. 5-tuple replaced with MapInBatchInfo case class.
  2. Single match on the matcher result in EliminateRedundantTransitions.
  3. Unreachable raw ColumnarToRowExec branch removed from extractColumnarChild; reasoning documented in the scaladoc above it.
  4. Why-comment added on the doExecute fallback explaining the row transition reintroduction.
  5. computeArrowPython parameter count cut by deriving timeZoneId / largeVarTypes / pythonRunnerConf / jobArtifactUUID inside the shim — but that introduced the NPE @wForget caught on the executor (SQLConf.ConfigReader is driver-only). The latest commit (6f5aca3b3) moves resolution back to the driver in doExecuteColumnar and passes a serializable RunnerInputs case class into computeArrowPython, so the param count is still small without dereferencing SQLConf in the task closure.
  6. local* prefixes dropped.
  7. Code-restating comments dropped; the struct-wrap and barrier notes kept.

Test infra

  1. Jar resolution consolidated into spark/src/test/resources/pyspark/conftest.py; pytest and the benchmark script both import resolve_comet_jar from there. The workflow no longer needs the inline ls | grep.
  2. pyarrow_udf_test.yml switched from paths-ignore to a feature-file allowlist; build flavor switched to debug (no -Prelease, no cargo --release).
  3. CI workflow pinned to Spark 4.0 (covers the 4.x shim path). Spark 3.5 still gets matcher coverage through the existing CometMapInBatchSuite runs in pr_build_linux / pr_build_macos. Compile-only 4.1/4.2 isn't wired up here yet — happy to add a matrix if you want it; my read was that the existing 4.1 build job already provides that, but let me know.
  4. Benchmark caveat (passthrough UDF + local[2] measures Python fork/IPC, real UDFs shrink the delta further) included in the PR description.

@andygrove andygrove marked this pull request as draft May 12, 2026 01:55
@andygrove
Copy link
Copy Markdown
Member Author

Moving this to draft while I implement #4240 as part of this PR. I don't think it makes sense to ship this feature without it.

andygrove added 15 commits May 11, 2026 20:34
…er" [skip ci]

Probe's cross-allocator transfer invariant turned out false: Comet's Parquet readers each construct their own RootAllocator, separate from ArrowUtils.rootAllocator. The original probe also had a silent-pass bug (AtomicReference doesn't cross Spark task boundaries). The redesigned trait uses per-buffer byte copy instead of TransferPair, so the probe is no longer load-bearing.
…p ci]

Also add arrow-vector as provided scope to spark/pom.xml so Scala can
resolve org.apache.arrow.vector during compilation; the partial
org/apache/arrow/c/ tree in common/target/classes otherwise masks the
package and causes "object vector is not a member of package
org.apache.arrow" errors.

The unloader is created inline per-batch rather than as a class field to
stay compatible across Spark 4.0/4.1/4.2, which differ in whether
PythonArrowInput declares unloader as abstract.
…ddress [skip ci]

Adds a helper class in comet-common that copies Arrow buffer bytes from a
CometDecodedVector to caller-supplied memory addresses without exposing shaded
Arrow types across the module boundary. Uses getFieldBuffers()/getChildrenFromFields()
traversal consistent with VectorUnloader so buffer ordering matches between source
(shaded) and destination (unshaded) sides.
…[skip ci]

Rewrites CometColumnarPythonInput to copy Comet vector bytes via
CometVectorIpcCopier (long-address API) rather than casting shaded FieldVector
to unshaded FieldVector, which caused ClassCastException at runtime.

Additional fixes for correct Arrow IPC semantics:
- Fill struct validity buffer with 0xFF so Python sees non-null struct rows
- Set lastSet before setValueCount on variable-width and list vectors to prevent
  fillHoles from overwriting correctly copied offset buffers
- Process nodes bottom-up so parent setValueCount cascade does not clobber
  children that have not yet had lastSet updated
@andygrove andygrove marked this pull request as ready for review May 12, 2026 13:47
@andygrove
Copy link
Copy Markdown
Member Author

@comphead @parthchandra @mbutrovich @wForget this is ready for another round of reviews whenever you have time

@andygrove
Copy link
Copy Markdown
Member Author

@comphead @parthchandra @mbutrovich @wForget this is ready for another round of reviews whenever you have time

The PR description is updated with the new scope

mbutrovich and others added 4 commits May 12, 2026 13:38
# Conflicts:
#	spark/pom.xml
#	spark/src/main/java/org/apache/comet/vector/CometVectorIpcCopier.java
The long[]-address indirection through CometVectorIpcCopier existed because
comet-common shaded org.apache.arrow.* into org.apache.comet.shaded.arrow.*,
making source vectors and Spark's IPC root different JVM types. After apache#4325
moved most JVM code into comet-spark and dropped the shading, both sides see
the same Arrow classes — the helper is no longer needed.

Replace with a direct walk of the source/destination FieldVector trees using
ArrowBuf.setBytes for the buffer copy. Same per-buffer memcpy semantics; the
cross-RootAllocator constraint that blocks true zero-copy is independent of
shading and still tracked in apache#4294.
Adds pytest cases for the data-type branches in CometColumnarPythonInput
that were previously unexercised: numeric scalars (boolean/byte/short/float),
binary, timestamp NTZ, map, and a deeply nested array/struct combination.

Falls back to vanilla Spark when spark.sql.execution.arrow.useLargeVarTypes
is enabled. With that conf on, Spark widens StringType/BinaryType to
8-byte-offset variants in the destination IPC root while Comet's source
vectors keep 4-byte offsets, so the per-buffer memcpy in copyVector would
corrupt the offset buffer.

While narrowing the rule to gate on largeVarTypes, also fix a pre-existing
greedy match: the MapInBatch case used `p: SparkPlan` with the pyarrow conf
as a guard, which matched every plan when the conf was on and consumed the
later CometShuffleExchangeExec arm. The case now gates on a structural check
via eligibleMapInBatchInfo so unrelated plans flow through.
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for continuing to refine this, @andygrove! I know this is an experimental feature, but I've got a few more changes and clarifications before we merge this:

Allocator framing

The PR description states:

Comet's Parquet readers each construct their own RootAllocator, separate from ArrowUtils.rootAllocator, so Arrow's TransferPair cannot share buffers across the boundary.

The in-code header on CometColumnarPythonInput (lines 1335-1340 of the diff) says the source and destination "live in different RootAllocator trees", and pyarrow-udfs.md (lines 376-379) blames "Comet's Parquet readers allocating from ArrowUtils.rootAllocator (rather than each reader constructing its own independent RootAllocator)".

The actual situation in main looks different from all three:

  • spark/src/main/scala/org/apache/comet/package.scala:36 declares one RootAllocator for the whole process. The doc on lines 29-35 says "we use a single allocator for the whole execution process".
  • grep -rn 'new RootAllocator' over the repo (excluding tests/target) returns only that one line.
  • For native scan, the Parquet reader is on the Rust side. native/core/src/parquet/mod.rs:310-311 produces an arrow-rs RecordBatch and calls move_to_spark, which writes FFI_ArrowArray::new(self) into the caller-provided struct (native/core/src/execution/utils.rs:91). The buffers are Rust-allocated and refcounted via Arrow C Data Interface release callbacks.
  • JVM side imports those via ArrowImporter.importVector (NativeUtil.scala:264). CometArrowAllocator is passed to the importer for tracking, but the underlying data buffers are not allocated against it in the Arrow Java sense.

So:

  • There is no per-reader JVM RootAllocator to point at as the blocker.
  • The thing that blocks TransferPair on imported vectors is not allocator separation, it is that imported buffers carry a different ReferenceManager whose release routes through FFI.
  • The destination root in PythonArrowInput is a child of ArrowUtils.rootAllocator, which is genuinely a different JVM root, but that asymmetry is downstream of "the source isn't in a JVM allocator tree at all" rather than the framing in the PR.

Could the description and the two doc blocks be rewritten so the follow-up issue (#4294) is grounded in what the code actually does?

PythonArrowInput.allocator visibility differs across Spark versions

Verified by reading sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala at each tag in apache/spark:

  • 3.4.x / 3.5.x: allocator is a local var inside writeIteratorToArrowStream, not a field. Different trait shape.
  • 4.0.0 / 4.0.1: private val allocator = ArrowUtils.rootAllocator.newChildAllocator(...). Not overridable.
  • 4.1.1 / 4.1.2: protected val allocator = .... Overridable.
  • 4.2.0-preview5: protected lazy val allocator: BufferAllocator = .... Overridable.

This partly defends the per-buffer copy: on 4.0 the visibility blocks any subclass from sharing a parent allocator with the destination root. On 4.1+ it does not.

Open questions:

  • Was the design fixed against 4.0 first, with 4.1/4.2 carried along on the same path for symmetry?
  • Is a version-conditional fast path on 4.1+ in scope, or a follow-up?

"Next-best alternative to zero-copy" framing

Across the JVM/Python boundary, Spark's transport is fork plus pipe plus Arrow IPC. The buffer bytes have to be written to a pipe at least once, so true zero-copy isn't on the table without a transport-level change in PySpark. The accurate floor is one copy per batch, the IPC serialize step in MessageSerializer.serialize.

The current path does it twice:

  1. Comet vector to Spark IPC root vector via copyVector (diff line 1393-1395).
  2. Spark IPC root to pipe via VectorUnloader then MessageSerializer.serialize (diff line 1405-1410).

A path built on Utils.serializeBatches (spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:209-233) does step 2 directly off Comet's vectors and skips step 1. That utility already builds a VectorSchemaRoot from existing Comet FieldVectors via getBatchFieldVectors (lines 360-388). The Python worker reads the resulting bytes via ArrowStreamReader either way.

Because the source buffers on the native scan path are foreign-imported (see allocator section above), the IPC writer reads bytes through the ArrowBuf memory addresses without any JVM allocator involvement, which removes the apparent allocator asymmetry from the picture entirely.

Is "two copies, dropped to one" a more accurate framing than "next-best to zero-copy"?

allocateNew per batch

copyVector (diff line 1429-1438) calls dst.allocateNew(...) on every destination child every batch. BaseFixedWidthVector.allocateNew(int) in Arrow Java calls clear() first and re-allocates buffers. Spark's vanilla BasicPythonArrowInput reuses the persistent root and lets ArrowWriter.reset() keep buffers attached.

  • Has this been measured at high batch rates?
  • Would setValueCount(0) plus a grow-on-demand check on existing buffer capacity buy back the persistent-root behavior?

Per-batch new VectorUnloader

Diff line 1405-1406 constructs new VectorUnloader(root, true, cometCodec, true) per batch. Spark's PythonArrowInput.scala:96 constructs the unloader once as a field and reuses it. Is the per-batch construction load-bearing or hoistable?

cometCodec reimplementation

CometColumnarPythonInput resolves the Arrow compression codec via SQLConf.getConfString("spark.sql.execution.arrow.compression.codec", ...) (diff line 1350-1364) and reimplements the none / lz4 / zstd switch.

Spark's PythonArrowInput.scala:80-95 does the same resolution. The diff comment says the reimplementation is needed because 4.0 "lacks SQLConf.arrowCompressionCodec". SQLConf.arrowCompressionCodec appears in 4.0.0, so it'd be worth double-checking whether the actual blocker was the Lz4CompressionCodec / ZstdCompressionCodec import path from arrow-compression. If so, a ShimSQLConf accessor would let the rest collapse to Spark's idiom.

Three near-duplicate CometArrowPythonRunner files

spark/src/main/spark-{4.0,4.1,4.2}/.../CometArrowPythonRunner.scala, sizes 72/64/63 lines.

  • 4.0 is structurally different (extends BasePythonRunner directly because 4.0's BaseArrowPythonRunner is bound to Iterator[InternalRow]).
  • 4.1 and 4.2 both extend BaseArrowPythonRunner[Iterator[ColumnarBatch], ColumnarBatch], differing in runnerConf override (4.2 dropped the workerConf constructor arg) and writeUDFs arity.

Would a small CometArrowPythonRunnerBase in spark-4.x/ covering 4.1 and 4.2 read more like the existing Spark4xMapInBatchSupport pattern?

EliminateRedundantTransitions arm

Diff line 450-461. The pattern guard calls eligibleMapInBatchInfo(p).isDefined and the body calls eligibleMapInBatchInfo(p).get, so the matchers and the conf reads run twice. The other arms in this rule are 1-3 line plain pattern matches. An extractor object with unapply returning Option[(MapInBatchInfo, SparkPlan)] would collapse this to a single arm.

Separately, getConfString("spark.sql.execution.arrow.useLargeVarTypes", "false") at diff line ~496 reads stringly. The repo already has ShimSQLConf per Spark version, where an arrowUseLargeVarTypes(conf): Boolean accessor would fit.

Output / input class compatibility

CometMapInBatchExec.processPartition (diff line 618-621) emits a ColumnarBatch whose columns are plain ArrowColumnVector instances pulled from the struct vector returned by the Python IPC reader. Not CometVector wrappers.

Two places downstream that require CometVector:

  1. CometColumnarPythonInput.writeNextBatchToArrowStream (diff line 1386-1391) on the input side of the same operator class:
    val src = cometBatch.column(i).asInstanceOf[CometDecodedVector].getValueVector.asInstanceOf[FieldVector]
  2. NativeUtil.exportBatch (spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala:117-169), the JVM-to-Rust FFI handoff used by every native Comet operator. Throws on anything that is not CometSelectionVector or CometVector.

Filters and projections that only reference UDF inputs get pushed past the UDF, so they don't reach this position. Plan shapes that do put ArrowColumnVector into one of those two consumers:

  • Two sequential pyarrow UDFs. df.mapInArrow(udf1, s1).mapInArrow(udf2, s2). Spark does not fuse consecutive mapInArrow / mapInPandas calls, each is its own MapInArrowExec. With this PR, both rewrite to CometMapInBatchExec, and the second one's asInstanceOf[CometDecodedVector] on the first input column throws ClassCastException on the first batch.
  • A filter referencing a UDF output column (mapInArrow(udf, schema).filter("output_col > 0")). Nothing to push past.
  • A partial aggregate fed by the UDF output. CometHashAggregate on UDF output would invoke exportBatch and hit the third branch.
  • The UDF as the probe side of a join with a Comet build side.

Questions:

  • Is the sequential-UDF case caught by a planner guard, or does it reach runtime as ClassCastException? If the latter, this looks like a correctness bug, not a perf concern.
  • For the other three shapes, does exportBatch throw, or is there a wrap step that re-imports ArrowColumnVector into CometVector? If a wrap exists, what does it cost (rewrap of the same FieldVector is cheap, full copy is not).
  • None of these arrangements appear in test_pyarrow_udf.py. Sequential mapInArrow.mapInArrow, filter-on-UDF-output, and mapInArrow.groupBy.agg would pin the contract.

Spark 3.4 / 3.5 stub shims

The two ShimCometMapInBatch.scala stubs under spark-3.4/ and spark-3.5/ are byte-identical. spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ already exists and houses several shared 3.x shims (ShimDataSourceRDDPartition.scala, ShimCometShuffleWriteProcessor.scala, ShimCometBatchScanExec.scala). The stub fits there and would drop one of the two copies.

Tests

The pytest module covers happy paths well. Compared with CometCodegenFuzzSuite (introduced in #4267), the gaps that look most relevant given this PR's mechanism (recursive vector-tree memcpy, validity-bit handling, per-batch buffer reallocation):

  1. No fuzz harness. CometCodegenFuzzSuite generates random schemas and runs identity ScalaUDF over every column. The same harness translates: random schema, write parquet, mapInArrow(passthrough), assert row-equivalence. The bulk-copy in copyVector is exactly the kind of "walk a vector tree and memcpy buffers" code where structured fuzz catches what hand-written cases miss.
  2. Decimal coverage is narrow. Only DecimalType(18, 6) is tested. BaseFixedWidthVector handles short decimals (8 bytes, long-backed) and long decimals (16-byte FixedSizeBinary) on different paths. The MAX_LONG_DIGITS=18 boundary is where bugs hide. Sweeping precision over {1, 9, 17, 18, 19, 28, 38} and scale over {0, half, max} would close it.
  3. Null density. Validity-buffer memcpy is historically where Arrow Java vector copies break. None of the cases cover all-null batches or single-non-null-in-batch. Fuzz over null fraction in {0.0, 0.01, 0.5, 0.99, 1.0} would close this.
  4. Multi-batch in one partition. Every test fits in a single Arrow IPC batch (max ~100 rows). The IPC root is reused across batches with allocateNew / setValueCount / setBytes each call, and variable-width data-buffer growth is a separate path. Forcing batch_size << row_count and asserting correctness across N batches would cover that.
  5. Transforming UDFs for complex types. array_and_struct, deeply_nested, map_type, binary_type all use Python-side passthrough. A symmetric encode/decode mistake (wrong offset arithmetic that the inverse undoes) hides there. One mutating UDF per complex type (reverse(array), swap struct fields, drop one map entry) would surface that class of bug.
  6. Wide schemas. Benchmark tests 50 cols but no correctness test does. The bulk-copy walks an addresses[] array indexed across the whole tree. Off-by-one in flattening logic surfaces at depth times width. A 50-col mixed-type correctness case would close that.
  7. Empty batches mid-stream. test_map_in_arrow_empty_input filters everything at source so the operator sees zero batches. The non-empty stream containing a 0-row batch (e.g. from a selective filter that keeps later rows) is the more interesting case and isn't currently exercised.

Borrowing the CometCodegenFuzzSuite harness would close 1, 3, and 4 in one move. 2, 5, 6, 7 are addable as targeted hand-written cases if the harness is too heavy for this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In progress

Development

Successfully merging this pull request may close these issues.

Eliminate remaining row<->Arrow round-trip in CometPythonMapInArrowExec Is it possible to support PyArrow backed UDFs in Comet natively?

6 participants