Skip to content

[Python] Expand SDF by default in PortableRunner#37965

Merged
damccorm merged 5 commits intoapache:masterfrom
Eliaaazzz:users/qingfengrumeng/issue-24422-expand-sdf-default-portable-runner
Apr 20, 2026
Merged

[Python] Expand SDF by default in PortableRunner#37965
damccorm merged 5 commits intoapache:masterfrom
Eliaaazzz:users/qingfengrumeng/issue-24422-expand-sdf-default-portable-runner

Conversation

@Eliaaazzz
Copy link
Copy Markdown
Contributor

Fixes #24422.

Python iobase.Read uses a Splittable DoFn internally. PortableRunner's default batch pre-optimization path did not include translations.expand_sdf, so portable runners that do not support SDFs natively can receive a single SDF ParDo for reads. In Spark, that means Python reads such as ReadFromParquet may execute on a single partition without parallelization.

This change:

  • adds translations.expand_sdf to PortableRunner's default optimization phases
  • allows --experiments=pre_optimize=expand_sdf as an explicit custom phase
  • adds optimizer coverage for:
    • default SDF expansion
    • explicit pre_optimize=expand_sdf
    • beam.io.Read(BoundedSource) expansion, including the expected RESHUFFLE

The bounded read test covers the issue scenario behind ReadFromParquet and similar Python Read transforms by verifying that the optimized pipeline contains the SDF component stages:

  • PAIR_WITH_RESTRICTION
  • SPLIT_AND_SIZE_RESTRICTIONS
  • PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS
  • RESHUFFLE

Local verification:

  • git diff --check
  • python -m compileall sdks/python/apache_beam/runners/portability/portable_runner.py sdks/python/apache_beam/runners/portability/portable_runner_test.py

Enable translations.expand_sdf in PortableRunner's default pre-optimization path so Python Read transforms are expanded for portable runners like Spark. Also add optimizer coverage for default SDF expansion, explicit pre_optimize=expand_sdf, and bounded Read expansion.\n\nRefs apache#24422.
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical issue in Apache Beam's PortableRunner where Python iobase.Read operations, which internally utilize Splittable DoFns (SDFs), were not being properly parallelized on certain portable runners like Spark. The fix involves integrating the translations.expand_sdf optimization phase into the PortableRunner's default pipeline optimization process. This ensures that SDFs are correctly expanded into their constituent stages, thereby enabling parallel execution for read transforms and preventing performance bottlenecks, particularly for operations like ReadFromParquet.

Highlights

  • Default SDF Expansion: The expand_sdf optimization phase was added to the PortableRunner's default pre-optimization path to ensure proper parallelization of iobase.Read operations.
  • Explicit SDF Expansion: Users can now explicitly enable SDF expansion using the --experiments=pre_optimize=expand_sdf flag.
  • Enhanced Test Coverage: New tests were introduced to verify the correct expansion of SDFs in both default and explicit optimization scenarios, including beam.io.Read(BoundedSource) with expected RESHUFFLE for parallelization.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@Eliaaazzz Eliaaazzz force-pushed the users/qingfengrumeng/issue-24422-expand-sdf-default-portable-runner branch from 60a831e to 169645d Compare March 26, 2026 14:26
@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @damccorm for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Eliaaazzz Eliaaazzz force-pushed the users/qingfengrumeng/issue-24422-expand-sdf-default-portable-runner branch 4 times, most recently from e54f3d3 to 2641d1f Compare March 27, 2026 12:25
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 3, 2026

Reminder, please take a look at this pr: @damccorm

@damccorm
Copy link
Copy Markdown
Contributor

damccorm commented Apr 3, 2026

I'm not sure we can do this safely without potentially impacting performance of portable runners which do implement SDF; so I'm a bit worried about taking this

@Abacn
Copy link
Copy Markdown
Contributor

Abacn commented Apr 3, 2026

Thanks for sharing the finding. Would you mind triggering the following PostCommit:

Make any change to

  • github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json

  • .github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json

  • .github/trigger_files/beam_PostCommit_XVR_Spark3.json

  • .github/trigger_files/beam_PostCommit_XVR_Flink.json

and push the commit

@@ -332,7 +336,7 @@ def _optimize_pipeline(
phases = []
for phase_name in pre_optimize.split(','):
# For now, these are all we allow.
if phase_name in ('pack_combiners', 'lift_combiners'):
if phase_name in ('pack_combiners', 'lift_combiners', 'expand_sdf'):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are compatibility/performance worries, this one is safer (opt in)

@Eliaaazzz
Copy link
Copy Markdown
Contributor Author

I'm not sure we can do this safely without potentially impacting performance of portable runners which do implement SDF; so I'm a bit worried about taking this

Thanks, that makes sense. I agree that enabling this in the default pre-optimization path is too broad if it may impact runners that already support SDF well.

I’ll scope this down to the opt-in path only by dropping the default expand_sdf change and keeping only explicit pre_optimize=expand_sdf support together with the related tests.

@Eliaaazzz
Copy link
Copy Markdown
Contributor Author

Thanks for sharing the finding. Would you mind triggering the following PostCommit:

Make any change to

  • github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json
  • .github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json
  • .github/trigger_files/beam_PostCommit_XVR_Spark3.json
  • .github/trigger_files/beam_PostCommit_XVR_Flink.json

and push the commit

Thanks, happy to do that.

I’ll make a no-op change to those trigger files to kick off the requested PostCommits and push the commit.

@Eliaaazzz
Copy link
Copy Markdown
Contributor Author

All postCommit tests passed.

Copy link
Copy Markdown
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@Abacn
Copy link
Copy Markdown
Contributor

Abacn commented Apr 8, 2026

test_async_computation_result_cancel (apache_beam.runners.interactive.interactive_beam_test.InteractiveBeamComputeTest) failed

>     for p in self._pid_to_pipelines.values():
E     RuntimeError: dictionary changed size during iteration

test failure not related to the change. Wait until PreCommit YAML Xlang Direct becomes green on HEAD to merge

@Eliaaazzz
Copy link
Copy Markdown
Contributor Author

test_async_computation_result_cancel (apache_beam.runners.interactive.interactive_beam_test.InteractiveBeamComputeTest) failed

>     for p in self._pid_to_pipelines.values():
E     RuntimeError: dictionary changed size during iteration

test failure not related to the change. Wait until PreCommit YAML Xlang Direct becomes green on HEAD to merge

Thanks a lot for taking the time to review this, I really appreciate it!

@github-actions
Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @damccorm

@damccorm
Copy link
Copy Markdown
Contributor

test_async_computation_result_cancel (apache_beam.runners.interactive.interactive_beam_test.InteractiveBeamComputeTest) failed

>     for p in self._pid_to_pipelines.values():
E     RuntimeError: dictionary changed size during iteration

test failure not related to the change. Wait until PreCommit YAML Xlang Direct becomes green on HEAD to merge

This has happened - @Eliaaazzz would you mind pulling in the latest changes from the master branch and committing to retrigger the test suite?

…g/issue-24422-expand-sdf-default-portable-runner
@Eliaaazzz
Copy link
Copy Markdown
Contributor Author

test_async_computation_result_cancel (apache_beam.runners.interactive.interactive_beam_test.InteractiveBeamComputeTest) failed

>     for p in self._pid_to_pipelines.values():
E     RuntimeError: dictionary changed size during iteration

test failure not related to the change. Wait until PreCommit YAML Xlang Direct becomes green on HEAD to merge

This has happened - @Eliaaazzz would you mind pulling in the latest changes from the master branch and committing to retrigger the test suite?

Thanks, Done. I have pulled in the latest from master and pushed to retrigger CI.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 17, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 56.74%. Comparing base (5a65f6e) to head (6fac01e).
⚠️ Report is 186 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #37965      +/-   ##
============================================
- Coverage     56.85%   56.74%   -0.11%     
  Complexity     3424     3424              
============================================
  Files          1178     1182       +4     
  Lines        187910   188752     +842     
  Branches       3586     3586              
============================================
+ Hits         106828   107111     +283     
- Misses        77694    78253     +559     
  Partials       3388     3388              
Flag Coverage Δ
python 79.59% <100.00%> (-0.50%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Abacn
Copy link
Copy Markdown
Contributor

Abacn commented Apr 17, 2026

there is a known GHA issue that slowest matrix workflows get cancelled randomly. Otherwise all tests passed, thanks!

@Eliaaazzz
Copy link
Copy Markdown
Contributor Author

there is a known GHA issue that slowest matrix workflows get cancelled randomly. Otherwise all tests passed, thanks!

Thank you for letting me know that, appreciate it!

@damccorm damccorm merged commit 79fb360 into apache:master Apr 20, 2026
133 of 137 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: No parallelization for ReadFromParquet (or any Python Read transforms) in Spark RDD Runner

3 participants