Skip to content

[SPARK-56350][SQL] Skip ColumnarToRow for Arrow-backed input to Python UDFs#55120

Closed
viirya wants to merge 32 commits intoapache:masterfrom
viirya:arrow-columnar-python-udf-input
Closed

[SPARK-56350][SQL] Skip ColumnarToRow for Arrow-backed input to Python UDFs#55120
viirya wants to merge 32 commits intoapache:masterfrom
viirya:arrow-columnar-python-udf-input

Conversation

@viirya
Copy link
Copy Markdown
Member

@viirya viirya commented Mar 31, 2026

What changes were proposed in this pull request?

This PR adds a columnar execution path to ArrowEvalPythonExec that allows Arrow-backed ColumnarBatch input (e.g., from DataSource V2 connectors that produce ArrowColumnVector columns) to be serialized directly to Arrow IPC for Python UDF evaluation, bypassing the existing ColumnarToRow → ArrowWriter round-trip.

Specifically:

  • ArrowEvalPythonExec now declares supportsColumnar = child.supportsColumnar and supportsRowBased = true, so Spark's transition rules no longer insert ColumnarToRowExec when the child supports columnar output.
  • A new doExecuteColumnar() method produces RDD[ColumnarBatch] where each output batch combines pass-through columns (kept as ColumnVector references without row conversion) with UDF result columns from Python.
  • doExecute() delegates to doExecuteColumnar().flatMap(_.rowIterator()) when the child supports columnar, and falls back to the existing row-based path otherwise.
  • A new ColumnarArrowPythonInput trait detects ArrowColumnVector columns at runtime and extracts the underlying Arrow FieldVectors directly via VectorSchemaRoot.of() + VectorUnloader for zero-copy IPC serialization. Non-Arrow ColumnarBatch inputs fall back to row-by-row ArrowWriter.
  • When UDF inputs contain complex expressions (not simple column references), the evaluator falls back to row-based projection for UDF input serialization while still keeping pass-through columns in columnar format.

Benchmark:

Scenario Rows String Length UDF Sink Arrow Columnar (ms) Row-based (ms) Speedup
string concat 2M 200 chars name + data noop 2086 2401 1.15x
string identity 1M 1000 chars return data noop 4506 4839 1.07x
string identity 2M 200 chars return data noop 2118 2324 1.10x
string identity 10M 100 chars return data noop 6067 7124 1.17x
string identity 20M 100 chars return data noop 11917 14106 1.18x

Why are the changes needed?

When a Spark operator produces Arrow-backed ColumnarBatch (e.g., connectors that read columnar formats like Parquet into Arrow vectors), the current execution path for Arrow Python UDFs performs a wasteful columnar → row → columnar round-trip: ColumnarToRowExec converts the columnar data to InternalRow, then ArrowWriter converts each row back to Arrow columnar format for IPC serialization to the Python worker.

This round-trip is expensive due to per-row virtual dispatch, type conversion, null checking, and poor cache locality in ArrowWriter.write(). Since the data is already in Arrow format, the conversion is entirely unnecessary.

With this change, Arrow FieldVectors are extracted directly from ArrowColumnVector and serialized to IPC via VectorUnloader — skipping both ColumnarToRowExec and ArrowWriter. Pass-through columns are kept as ColumnVector references throughout, avoiding row materialization entirely in the fully columnar path.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.6

Comment on lines +34 to +37
Usage:
cd $SPARK_HOME
python python/pyspark/sql/tests/pandas/bench_arrow_columnar_udf.py \
[--rows N] [--iterations N] [--partitions N]
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to some constraints on local dev environment for now, I cannot build Spark locally to run this benchmark. I will update the result once the constraints are removed.

Copy link
Copy Markdown
Member Author

@viirya viirya Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scenario Rows String Length UDF Sink Arrow Columnar (ms) Row-based (ms) Speedup
string concat 2M 200 chars name + data noop 2086 2401 1.15x
string identity 1M 1000 chars return data noop 4506 4839 1.07x
string identity 2M 200 chars return data noop 2118 2324 1.10x
string identity 10M 100 chars return data noop 6067 7124 1.17x
string identity 20M 100 chars return data noop 11917 14106 1.18x

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrow columnar path:

*(1) Project [id#0, name#1, value#2, data#3, pythonUDF0#6 AS identity_udf(data)#5]
+- ArrowEvalPython [identity_udf(data#3)#4], [pythonUDF0#6], 200
   +- BatchScan ArrowBackedTestTable[id#0, name#1, value#2, data#3]

Row-based path:

*(2) Project [id#14, name#15, value#16, data#17, pythonUDF0#20 AS identity_udf(data)#19]
+- ArrowEvalPython [identity_udf(data#17)#18], [pythonUDF0#20], 200
   +- *(1) ColumnarToRow
      +- BatchScan ArrowBackedTestTable[id#14, name#15, value#16, data#17]

@dongjoon-hyun dongjoon-hyun marked this pull request as draft March 31, 2026 22:17
@dongjoon-hyun
Copy link
Copy Markdown
Member

Please create and use a JIRA ID in the PR title before converting back from Draft status~

@viirya
Copy link
Copy Markdown
Member Author

viirya commented Mar 31, 2026

Please create and use a JIRA ID in the PR title before converting back from Draft status~

Okay. Thank you @dongjoon-hyun

@viirya viirya force-pushed the arrow-columnar-python-udf-input branch 2 times, most recently from 20604d4 to b7586ec Compare April 4, 2026 01:14
@viirya viirya changed the title [SPARK-xxxxx][SQL] Skip ColumnarToRow for Arrow-backed input to Python UDFs [SPARK-56350][SQL] Skip ColumnarToRow for Arrow-backed input to Python UDFs Apr 4, 2026
@viirya viirya marked this pull request as ready for review April 5, 2026 19:17
dongjoon-hyun
dongjoon-hyun previously approved these changes Apr 8, 2026
@dongjoon-hyun
Copy link
Copy Markdown
Member

Oh, I didn't notice this is still under developing.

@viirya
Copy link
Copy Markdown
Member Author

viirya commented Apr 8, 2026

Oh, I didn't notice this is still under developing.

I was unable to run benchmark due to some constraints on local environment. I found a way to overcome it and ran the benchmark. These changes are adjustment to only benchmark related stuffs + a new config to control this new behavior. They are just for making the benchmark able to measure the difference correctly.

@HyukjinKwon HyukjinKwon marked this pull request as draft April 8, 2026 22:15
viirya and others added 17 commits April 8, 2026 20:14
When a DSv2 connector produces Arrow-backed ColumnarBatch (with
ArrowColumnVector), ArrowEvalPythonExec now extracts the underlying
FieldVectors directly and serializes them to Arrow IPC, bypassing
the wasteful columnar -> row -> columnar round-trip.

Key changes:
- ArrowEvalPythonExec declares supportsColumnar = child.supportsColumnar
  so the transition rule no longer inserts ColumnarToRowExec.
- New ColumnarArrowPythonInput trait handles the Arrow-direct path
  (VectorSchemaRoot.of + VectorUnloader) with a row-by-row fallback
  for non-Arrow ColumnarBatch.
- New ColumnarArrowEvalPythonEvaluatorFactory resolves UDF input
  columns by index when all inputs are simple AttributeReferences,
  falling back to row-based evaluation for complex expressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Redesign the columnar evaluator factory to stay fully in columnar
format instead of buffering pass-through columns as rows:

- doExecuteColumnar() returns RDD[ColumnarBatch] with combined
  pass-through + UDF result columns.
- doExecute() delegates to doExecuteColumnar().flatMap(_.rowIterator)
  when child supports columnar, mirroring DBR's approach.
- Pass-through columns are kept as ColumnVector references in a queue
  instead of being converted to rows via HybridRowQueue.
- combineResults() directly concatenates pass-through and result
  ColumnVector arrays into a new ColumnarBatch.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
remove internal references from comments

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
A test-only DataSource V2 that produces ColumnarBatch with
ArrowColumnVector columns (backed by Arrow IntVector, VarCharVector,
Float8Vector). Used for testing and benchmarking the columnar Arrow
Python UDF path.

Schema: (id INT, name STRING, value DOUBLE)
Configurable via options: numRows (default 10000), numPartitions (default 2).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
PySpark benchmark script that compares end-to-end execution time of
scalar Arrow UDFs with two data sources:
- ArrowBackedDataSourceV2 (columnar path, direct FieldVector extraction)
- spark.range() (row-based path, ColumnarToRow + ArrowWriter)

Uses a minimal UDF (id + value) to isolate data transfer overhead.
Prints physical plans to verify the different execution paths.

Usage:
    python python/pyspark/sql/tests/pandas/bench_arrow_columnar_udf.py \
        [--rows N] [--iterations N] [--partitions N]

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add SupportsRead and InternalRow imports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Tests using ArrowBackedDataSourceV2 and TestScalarPandasUDF:

- Plan test: no ColumnarToRowExec before ArrowEvalPythonExec when
  reading from Arrow-backed source
- Correctness: scalar pandas UDF produces correct results on
  Arrow-backed columnar input
- Multiple UDF columns: pass-through and multiple UDF results
- Multiple partitions: correct results across partition boundaries
- Baseline: row-based source does not trigger columnar path

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The Python worker does not preserve input batch boundaries for scalar
UDFs -- it may combine multiple input batches into one output batch.
The previous columnar pass-through queue (ArrayDeque of ColumnVector
references) assumed 1:1 batch alignment, causing assertion failures
on all columnar-capable sources (Parquet vectorized reader,
InMemoryTableScan, etc.).

Fix: use HybridRowQueue to buffer pass-through rows (same as the
existing row-based evaluator), which handles arbitrary output batch
sizes correctly. The key optimization -- sending Arrow FieldVectors
directly to Python instead of converting through ArrowWriter -- is
preserved on the input side.

Also remove doExecuteColumnar() since the evaluator now returns
Iterator[InternalRow] (required for row-based result joining).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…wQueue)

The Python scalar UDF worker preserves 1:1 batch correspondence (one
output batch per input batch), so the evalColumnar path can safely
combine pass-through ColumnVector references with UDF result columns
at the columnar level.

However, the evalRowFallback path uses BatchedPythonArrowInput which
re-batches rows by maxRecordsPerBatch, breaking the original batch
boundaries. This path must use HybridRowQueue for row-based joining.

evalColumnar: ColumnarBatch pass-through queue + columnar combining
evalRowFallback: HybridRowQueue + JoinedRow (same as existing path)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Specify returnType = Some(StringType) for TestScalarPandasUDF to
  avoid VOID type inference when called via DataFrame API
- Use selectExpr() with registered UDF names instead of direct
  UDF function calls for proper SQL function resolution
- Fix supportsColumnar test to check for columnar-capable scan nodes
  instead of the top-level plan (which is ColumnarToRowExec)
- Fix plan assertion to check under ArrowEvalPythonExec subtree only

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When supportsColumnar = true, Spark may call executeColumnar() on
this node (e.g., when a parent also supports columnar). Without
doExecuteColumnar(), the default SparkPlan implementation throws
"has column support mismatch".

Add doExecuteColumnar() that delegates to doExecute() and converts
row output to ColumnarBatch via RowToColumnarEvaluatorFactory.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Both evalColumnar and evalRowFallback now maintain 1:1 batch
correspondence, enabling columnar pass-through combining in both paths:

- evalColumnar: ColumnarArrowPythonInput sends one IPC RecordBatch
  per ColumnarBatch (1:1 by design).

- evalRowFallback: uses BasicPythonArrowInput (NOT BatchedPythonArrowInput)
  where each ColumnarBatch's projected rows form one inner iterator
  that becomes one IPC RecordBatch. This preserves 1:1 correspondence
  instead of re-batching by maxRecordsPerBatch.

The evaluator returns Iterator[ColumnarBatch] (pass-through columns
combined with UDF result columns). doExecuteColumnar() delegates
directly to the evaluator; doExecute() calls doExecuteColumnar()
and flattens to rows.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Non-Arrow columnar readers (Parquet vectorized, InMemoryTableScan)
reuse ColumnVector objects across batches. Storing references in the
pass-through queue is unsafe because the Writer may advance to the
next batch before the Reader consumes the previous result, causing
stale data or NPE (nulls array set to null on reset).

Arrow-backed sources allocate independent vectors per batch, so
references are safe.

Fix: peek at the first batch to check if Arrow-backed:
- Arrow: columnar pass-through queue + direct FieldVector IPC (path 1)
- Non-Arrow: HybridRowQueue for pass-through buffering (path 2)
- Complex expressions: HybridRowQueue + MutableProjection (path 3)

The evaluator returns Iterator[InternalRow]. doExecuteColumnar() wraps
via RowToColumnarEvaluatorFactory for upstream columnar consumers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
viirya and others added 15 commits April 8, 2026 20:14
ArrowEvalPythonExec does not define a numOutputRows metric.
Create local metrics for RowToColumnarEvaluatorFactory instead
of referencing non-existent plan metrics.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When only UDF input columns are selected, the optimizer inserts a
ProjectExec for column pruning. ProjectExec does not support columnar,
so ArrowEvalPythonExec.child.supportsColumnar becomes false and the
optimization does not apply.

Fix: select all source columns (id, name, value) plus the UDF result,
so the scan output matches the child output and no Project is needed.
Also change assertion to check child.supportsColumnar directly instead
of scanning for ColumnarToRowExec in the subtree.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…mnarBatch

Restore the correct design:
- Evaluator: PartitionEvaluatorFactory[ColumnarBatch, ColumnarBatch]
- doExecuteColumnar(): child.executeColumnar() -> evaluator -> ColumnarBatch
- doExecute(): doExecuteColumnar().flatMap(_.rowIterator()) when columnar,
  else super.doExecute()

Path 1 (Arrow): columnar combining produces ColumnarBatch directly.
Paths 2 & 3 (non-Arrow/complex): HybridRowQueue + JoinedRow produces
InternalRow, then RowToColumnConverter wraps back to ColumnarBatch
inside the evaluator via rowsToColumnarBatches().

This removes the RowToColumnarEvaluatorFactory hack from doExecuteColumnar
and the associated numOutputRows metric-not-found issue.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
ColumnarBatch.rowIterator() returns ColumnarBatchRow which is NOT
UnsafeRow. Downstream operators (e.g., outer EvalPythonExec in
nested UDF queries) cast rows to UnsafeRow for HybridRowQueue,
causing ClassCastException.

Fix: apply UnsafeProjection.create(schema) to convert each row
to UnsafeRow before returning from doExecute().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
ArrowEvalPythonExec:
- Add spark.sql.execution.arrow.pythonUDF.columnarInput.enabled config
  to toggle the columnar optimization, enabling fair A/B benchmarking
  with the same data source.

ArrowBackedDataSourceV2:
- Add "columnar" option (default true) to control supportColumnarReads,
  allowing row-based output from the same source.
- Add "data" column (StringType) with configurable-length random strings
  to stress ArrowWriter's per-element VarChar serialization.
- Fix Arrow memory lifecycle: use single allocator per partition reader
  instead of per-batch allocators that get closed while pass-through
  ColumnVector references are still held.

Benchmark:
- Use noop sink instead of collect() to avoid driver-side I/O overhead.
- Use identity UDF (return input as-is) to minimize Python-side cost.
- Use same data source for both paths, toggled by config.
- Separate config lifecycle: run arrow benchmark first (config=true),
  then row benchmark (config=false).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add ARROW_PYSPARK_UDF_COLUMNAR_INPUT_ENABLED to SQLConf with proper
buildConf, doc, version, and accessor method. Replace the inline
getConfString in ArrowEvalPythonExec with conf.arrowPySparkUDFColumnarInputEnabled.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
ArrowBackedDataSourceV2 now has 4 columns (added data). The plan test
must select all columns to prevent column pruning from inserting a
ProjectExec.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@viirya viirya force-pushed the arrow-columnar-python-udf-input branch from 65e3b20 to 9882045 Compare April 9, 2026 03:14
@viirya viirya marked this pull request as ready for review April 9, 2026 06:13
@viirya
Copy link
Copy Markdown
Member Author

viirya commented Apr 9, 2026

Restored the CIs broken due to change to ArrowBackedDataSourceV2 used both in benchmark and in test code.

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, it looks good to me again. Could you insert the performance result table into the PR description (Up to 20%?), @viirya ?

@viirya
Copy link
Copy Markdown
Member Author

viirya commented Apr 9, 2026

Thank you @dongjoon-hyun. I updated the PR description with the benchmark result table.

@dongjoon-hyun
Copy link
Copy Markdown
Member

Thank you, @viirya . Merged to master.

@viirya
Copy link
Copy Markdown
Member Author

viirya commented Apr 10, 2026

Thank you @dongjoon-hyun

@viirya viirya deleted the arrow-columnar-python-udf-input branch April 10, 2026 17:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants