[Python] Expand SDF by default in PortableRunner#37965
Conversation
Enable translations.expand_sdf in PortableRunner's default pre-optimization path so Python Read transforms are expanded for portable runners like Spark. Also add optimizer coverage for default SDF expansion, explicit pre_optimize=expand_sdf, and bounded Read expansion.\n\nRefs apache#24422.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves a critical issue in Apache Beam's PortableRunner where Python Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
60a831e to
169645d
Compare
|
Assigning reviewers: R: @damccorm for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
e54f3d3 to
2641d1f
Compare
|
Reminder, please take a look at this pr: @damccorm |
|
I'm not sure we can do this safely without potentially impacting performance of portable runners which do implement SDF; so I'm a bit worried about taking this |
|
Thanks for sharing the finding. Would you mind triggering the following PostCommit: Make any change to
and push the commit |
| @@ -332,7 +336,7 @@ def _optimize_pipeline( | |||
| phases = [] | |||
| for phase_name in pre_optimize.split(','): | |||
| # For now, these are all we allow. | |||
| if phase_name in ('pack_combiners', 'lift_combiners'): | |||
| if phase_name in ('pack_combiners', 'lift_combiners', 'expand_sdf'): | |||
There was a problem hiding this comment.
If there are compatibility/performance worries, this one is safer (opt in)
Thanks, that makes sense. I agree that enabling this in the default pre-optimization path is too broad if it may impact runners that already support SDF well. I’ll scope this down to the opt-in path only by dropping the default |
Thanks, happy to do that. I’ll make a no-op change to those trigger files to kick off the requested PostCommits and push the commit. |
|
All postCommit tests passed. |
test failure not related to the change. Wait until PreCommit YAML Xlang Direct becomes green on HEAD to merge |
Thanks a lot for taking the time to review this, I really appreciate it! |
|
Reminder, please take a look at this pr: @damccorm |
This has happened - @Eliaaazzz would you mind pulling in the latest changes from the master branch and committing to retrigger the test suite? |
…g/issue-24422-expand-sdf-default-portable-runner
Thanks, Done. I have pulled in the latest from master and pushed to retrigger CI. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #37965 +/- ##
============================================
- Coverage 56.85% 56.74% -0.11%
Complexity 3424 3424
============================================
Files 1178 1182 +4
Lines 187910 188752 +842
Branches 3586 3586
============================================
+ Hits 106828 107111 +283
- Misses 77694 78253 +559
Partials 3388 3388
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
there is a known GHA issue that slowest matrix workflows get cancelled randomly. Otherwise all tests passed, thanks! |
Thank you for letting me know that, appreciate it! |
Fixes #24422.
Python
iobase.Readuses a Splittable DoFn internally. PortableRunner's default batch pre-optimization path did not includetranslations.expand_sdf, so portable runners that do not support SDFs natively can receive a single SDFParDofor reads. In Spark, that means Python reads such asReadFromParquetmay execute on a single partition without parallelization.This change:
translations.expand_sdfto PortableRunner's default optimization phases--experiments=pre_optimize=expand_sdfas an explicit custom phasepre_optimize=expand_sdfbeam.io.Read(BoundedSource)expansion, including the expectedRESHUFFLEThe bounded read test covers the issue scenario behind
ReadFromParquetand similar PythonReadtransforms by verifying that the optimized pipeline contains the SDF component stages:PAIR_WITH_RESTRICTIONSPLIT_AND_SIZE_RESTRICTIONSPROCESS_SIZED_ELEMENTS_AND_RESTRICTIONSRESHUFFLELocal verification:
git diff --checkpython -m compileall sdks/python/apache_beam/runners/portability/portable_runner.py sdks/python/apache_beam/runners/portability/portable_runner_test.py