[SPARK-56350][SQL] Skip ColumnarToRow for Arrow-backed input to Python UDFs#55120
[SPARK-56350][SQL] Skip ColumnarToRow for Arrow-backed input to Python UDFs#55120viirya wants to merge 32 commits intoapache:masterfrom
Conversation
| Usage: | ||
| cd $SPARK_HOME | ||
| python python/pyspark/sql/tests/pandas/bench_arrow_columnar_udf.py \ | ||
| [--rows N] [--iterations N] [--partitions N] |
There was a problem hiding this comment.
Due to some constraints on local dev environment for now, I cannot build Spark locally to run this benchmark. I will update the result once the constraints are removed.
There was a problem hiding this comment.
| Scenario | Rows | String Length | UDF | Sink | Arrow Columnar (ms) | Row-based (ms) | Speedup |
|---|---|---|---|---|---|---|---|
| string concat | 2M | 200 chars | name + data |
noop | 2086 | 2401 | 1.15x |
| string identity | 1M | 1000 chars | return data |
noop | 4506 | 4839 | 1.07x |
| string identity | 2M | 200 chars | return data |
noop | 2118 | 2324 | 1.10x |
| string identity | 10M | 100 chars | return data |
noop | 6067 | 7124 | 1.17x |
| string identity | 20M | 100 chars | return data |
noop | 11917 | 14106 | 1.18x |
There was a problem hiding this comment.
Arrow columnar path:
*(1) Project [id#0, name#1, value#2, data#3, pythonUDF0#6 AS identity_udf(data)#5]
+- ArrowEvalPython [identity_udf(data#3)#4], [pythonUDF0#6], 200
+- BatchScan ArrowBackedTestTable[id#0, name#1, value#2, data#3]
Row-based path:
*(2) Project [id#14, name#15, value#16, data#17, pythonUDF0#20 AS identity_udf(data)#19]
+- ArrowEvalPython [identity_udf(data#17)#18], [pythonUDF0#20], 200
+- *(1) ColumnarToRow
+- BatchScan ArrowBackedTestTable[id#14, name#15, value#16, data#17]
|
Please create and use a JIRA ID in the PR title before converting back from |
Okay. Thank you @dongjoon-hyun |
20604d4 to
b7586ec
Compare
|
Oh, I didn't notice this is still under developing. |
I was unable to run benchmark due to some constraints on local environment. I found a way to overcome it and ran the benchmark. These changes are adjustment to only benchmark related stuffs + a new config to control this new behavior. They are just for making the benchmark able to measure the difference correctly. |
When a DSv2 connector produces Arrow-backed ColumnarBatch (with ArrowColumnVector), ArrowEvalPythonExec now extracts the underlying FieldVectors directly and serializes them to Arrow IPC, bypassing the wasteful columnar -> row -> columnar round-trip. Key changes: - ArrowEvalPythonExec declares supportsColumnar = child.supportsColumnar so the transition rule no longer inserts ColumnarToRowExec. - New ColumnarArrowPythonInput trait handles the Arrow-direct path (VectorSchemaRoot.of + VectorUnloader) with a row-by-row fallback for non-Arrow ColumnarBatch. - New ColumnarArrowEvalPythonEvaluatorFactory resolves UDF input columns by index when all inputs are simple AttributeReferences, falling back to row-based evaluation for complex expressions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Redesign the columnar evaluator factory to stay fully in columnar format instead of buffering pass-through columns as rows: - doExecuteColumnar() returns RDD[ColumnarBatch] with combined pass-through + UDF result columns. - doExecute() delegates to doExecuteColumnar().flatMap(_.rowIterator) when child supports columnar, mirroring DBR's approach. - Pass-through columns are kept as ColumnVector references in a queue instead of being converted to rows via HybridRowQueue. - combineResults() directly concatenates pass-through and result ColumnVector arrays into a new ColumnarBatch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
remove internal references from comments Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
A test-only DataSource V2 that produces ColumnarBatch with ArrowColumnVector columns (backed by Arrow IntVector, VarCharVector, Float8Vector). Used for testing and benchmarking the columnar Arrow Python UDF path. Schema: (id INT, name STRING, value DOUBLE) Configurable via options: numRows (default 10000), numPartitions (default 2). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
PySpark benchmark script that compares end-to-end execution time of
scalar Arrow UDFs with two data sources:
- ArrowBackedDataSourceV2 (columnar path, direct FieldVector extraction)
- spark.range() (row-based path, ColumnarToRow + ArrowWriter)
Uses a minimal UDF (id + value) to isolate data transfer overhead.
Prints physical plans to verify the different execution paths.
Usage:
python python/pyspark/sql/tests/pandas/bench_arrow_columnar_udf.py \
[--rows N] [--iterations N] [--partitions N]
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add SupportsRead and InternalRow imports. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Tests using ArrowBackedDataSourceV2 and TestScalarPandasUDF: - Plan test: no ColumnarToRowExec before ArrowEvalPythonExec when reading from Arrow-backed source - Correctness: scalar pandas UDF produces correct results on Arrow-backed columnar input - Multiple UDF columns: pass-through and multiple UDF results - Multiple partitions: correct results across partition boundaries - Baseline: row-based source does not trigger columnar path Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The Python worker does not preserve input batch boundaries for scalar UDFs -- it may combine multiple input batches into one output batch. The previous columnar pass-through queue (ArrayDeque of ColumnVector references) assumed 1:1 batch alignment, causing assertion failures on all columnar-capable sources (Parquet vectorized reader, InMemoryTableScan, etc.). Fix: use HybridRowQueue to buffer pass-through rows (same as the existing row-based evaluator), which handles arbitrary output batch sizes correctly. The key optimization -- sending Arrow FieldVectors directly to Python instead of converting through ArrowWriter -- is preserved on the input side. Also remove doExecuteColumnar() since the evaluator now returns Iterator[InternalRow] (required for row-based result joining). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…wQueue) The Python scalar UDF worker preserves 1:1 batch correspondence (one output batch per input batch), so the evalColumnar path can safely combine pass-through ColumnVector references with UDF result columns at the columnar level. However, the evalRowFallback path uses BatchedPythonArrowInput which re-batches rows by maxRecordsPerBatch, breaking the original batch boundaries. This path must use HybridRowQueue for row-based joining. evalColumnar: ColumnarBatch pass-through queue + columnar combining evalRowFallback: HybridRowQueue + JoinedRow (same as existing path) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Specify returnType = Some(StringType) for TestScalarPandasUDF to avoid VOID type inference when called via DataFrame API - Use selectExpr() with registered UDF names instead of direct UDF function calls for proper SQL function resolution - Fix supportsColumnar test to check for columnar-capable scan nodes instead of the top-level plan (which is ColumnarToRowExec) - Fix plan assertion to check under ArrowEvalPythonExec subtree only Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When supportsColumnar = true, Spark may call executeColumnar() on this node (e.g., when a parent also supports columnar). Without doExecuteColumnar(), the default SparkPlan implementation throws "has column support mismatch". Add doExecuteColumnar() that delegates to doExecute() and converts row output to ColumnarBatch via RowToColumnarEvaluatorFactory. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Both evalColumnar and evalRowFallback now maintain 1:1 batch correspondence, enabling columnar pass-through combining in both paths: - evalColumnar: ColumnarArrowPythonInput sends one IPC RecordBatch per ColumnarBatch (1:1 by design). - evalRowFallback: uses BasicPythonArrowInput (NOT BatchedPythonArrowInput) where each ColumnarBatch's projected rows form one inner iterator that becomes one IPC RecordBatch. This preserves 1:1 correspondence instead of re-batching by maxRecordsPerBatch. The evaluator returns Iterator[ColumnarBatch] (pass-through columns combined with UDF result columns). doExecuteColumnar() delegates directly to the evaluator; doExecute() calls doExecuteColumnar() and flattens to rows. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Non-Arrow columnar readers (Parquet vectorized, InMemoryTableScan) reuse ColumnVector objects across batches. Storing references in the pass-through queue is unsafe because the Writer may advance to the next batch before the Reader consumes the previous result, causing stale data or NPE (nulls array set to null on reset). Arrow-backed sources allocate independent vectors per batch, so references are safe. Fix: peek at the first batch to check if Arrow-backed: - Arrow: columnar pass-through queue + direct FieldVector IPC (path 1) - Non-Arrow: HybridRowQueue for pass-through buffering (path 2) - Complex expressions: HybridRowQueue + MutableProjection (path 3) The evaluator returns Iterator[InternalRow]. doExecuteColumnar() wraps via RowToColumnarEvaluatorFactory for upstream columnar consumers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
ArrowEvalPythonExec does not define a numOutputRows metric. Create local metrics for RowToColumnarEvaluatorFactory instead of referencing non-existent plan metrics. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When only UDF input columns are selected, the optimizer inserts a ProjectExec for column pruning. ProjectExec does not support columnar, so ArrowEvalPythonExec.child.supportsColumnar becomes false and the optimization does not apply. Fix: select all source columns (id, name, value) plus the UDF result, so the scan output matches the child output and no Project is needed. Also change assertion to check child.supportsColumnar directly instead of scanning for ColumnarToRowExec in the subtree. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…mnarBatch Restore the correct design: - Evaluator: PartitionEvaluatorFactory[ColumnarBatch, ColumnarBatch] - doExecuteColumnar(): child.executeColumnar() -> evaluator -> ColumnarBatch - doExecute(): doExecuteColumnar().flatMap(_.rowIterator()) when columnar, else super.doExecute() Path 1 (Arrow): columnar combining produces ColumnarBatch directly. Paths 2 & 3 (non-Arrow/complex): HybridRowQueue + JoinedRow produces InternalRow, then RowToColumnConverter wraps back to ColumnarBatch inside the evaluator via rowsToColumnarBatches(). This removes the RowToColumnarEvaluatorFactory hack from doExecuteColumnar and the associated numOutputRows metric-not-found issue. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
ColumnarBatch.rowIterator() returns ColumnarBatchRow which is NOT UnsafeRow. Downstream operators (e.g., outer EvalPythonExec in nested UDF queries) cast rows to UnsafeRow for HybridRowQueue, causing ClassCastException. Fix: apply UnsafeProjection.create(schema) to convert each row to UnsafeRow before returning from doExecute(). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
ArrowEvalPythonExec: - Add spark.sql.execution.arrow.pythonUDF.columnarInput.enabled config to toggle the columnar optimization, enabling fair A/B benchmarking with the same data source. ArrowBackedDataSourceV2: - Add "columnar" option (default true) to control supportColumnarReads, allowing row-based output from the same source. - Add "data" column (StringType) with configurable-length random strings to stress ArrowWriter's per-element VarChar serialization. - Fix Arrow memory lifecycle: use single allocator per partition reader instead of per-batch allocators that get closed while pass-through ColumnVector references are still held. Benchmark: - Use noop sink instead of collect() to avoid driver-side I/O overhead. - Use identity UDF (return input as-is) to minimize Python-side cost. - Use same data source for both paths, toggled by config. - Separate config lifecycle: run arrow benchmark first (config=true), then row benchmark (config=false). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add ARROW_PYSPARK_UDF_COLUMNAR_INPUT_ENABLED to SQLConf with proper buildConf, doc, version, and accessor method. Replace the inline getConfString in ArrowEvalPythonExec with conf.arrowPySparkUDFColumnarInputEnabled. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
ArrowBackedDataSourceV2 now has 4 columns (added data). The plan test must select all columns to prevent column pruning from inserting a ProjectExec. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
65e3b20 to
9882045
Compare
|
Restored the CIs broken due to change to ArrowBackedDataSourceV2 used both in benchmark and in test code. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, it looks good to me again. Could you insert the performance result table into the PR description (Up to 20%?), @viirya ?
|
Thank you @dongjoon-hyun. I updated the PR description with the benchmark result table. |
|
Thank you, @viirya . Merged to master. |
|
Thank you @dongjoon-hyun |
What changes were proposed in this pull request?
This PR adds a columnar execution path to ArrowEvalPythonExec that allows Arrow-backed ColumnarBatch input (e.g., from DataSource V2 connectors that produce ArrowColumnVector columns) to be serialized directly to Arrow IPC for Python UDF evaluation, bypassing the existing ColumnarToRow → ArrowWriter round-trip.
Specifically:
Benchmark:
name + datareturn datareturn datareturn datareturn dataWhy are the changes needed?
When a Spark operator produces Arrow-backed ColumnarBatch (e.g., connectors that read columnar formats like Parquet into Arrow vectors), the current execution path for Arrow Python UDFs performs a wasteful columnar → row → columnar round-trip: ColumnarToRowExec converts the columnar data to InternalRow, then ArrowWriter converts each row back to Arrow columnar format for IPC serialization to the Python worker.
This round-trip is expensive due to per-row virtual dispatch, type conversion, null checking, and poor cache locality in ArrowWriter.write(). Since the data is already in Arrow format, the conversion is entirely unnecessary.
With this change, Arrow FieldVectors are extracted directly from ArrowColumnVector and serialized to IPC via VectorUnloader — skipping both ColumnarToRowExec and ArrowWriter. Pass-through columns are kept as ColumnVector references throughout, avoiding row materialization entirely in the fully columnar path.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6