Skip to content

Optimization: Add pruning for connected component 2 phase algorithm#846

Open
WeichenXu123 wants to merge 5 commits into
graphframes:mainfrom
WeichenXu123:connected-component-2phase-pruning
Open

Optimization: Add pruning for connected component 2 phase algorithm#846
WeichenXu123 wants to merge 5 commits into
graphframes:mainfrom
WeichenXu123:connected-component-2phase-pruning

Conversation

@WeichenXu123
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Optimization: Add pruning for connected component 2 phase algorithm.

Why are the changes needed?

boost performance

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Copilot AI review requested due to automatic review settings June 8, 2026 14:19
@WeichenXu123
Copy link
Copy Markdown
Contributor Author

This is an optimization that has been implemented in Databricks internal graphframe lib. Now we decide to contribute it to OSS. CC @SemyonSinchenko

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a pruning-based optimization to the two-phase connected components implementation, aiming to speed up convergence on sparse graphs by temporarily shrinking the graph (removing certain “leaf” nodes) and then joining results back to the original graph.

Changes:

  • Add leaf-node pruning (pruneLeafNodes) and reconstruction logic (joinBack) to TwoPhase.
  • Add heuristic gating for when to attempt pruning during iterations, plus a checkpoint-retention tweak to keep required checkpoint data around.
  • Add unit tests covering pruning behavior, shrinkage gating, and join-back reconstruction.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
core/src/main/scala/org/graphframes/lib/TwoPhase.scala Implements pruning/join-back optimization and related convergence/iteration logic changes.
core/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala Adds tests for pruning, shrinkage condition, and join-back behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/org/graphframes/lib/TwoPhase.scala
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 8, 2026

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 81.92%. Comparing base (a28a4e8) to head (0390ea6).
⚠️ Report is 13 commits behind head on main.
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #846      +/-   ##
==========================================
+ Coverage   80.75%   81.92%   +1.16%     
==========================================
  Files          78       78              
  Lines        4421     4447      +26     
  Branches      543      559      +16     
==========================================
+ Hits         3570     3643      +73     
+ Misses        851      804      -47     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@SemyonSinchenko
Copy link
Copy Markdown
Collaborator

First of all, thanks a lot for the contribution @WeichenXu123 !

I run local benchmarks on one of small LDBC' graphs. The first results are very promising ❤️‍🔥❤️‍🔥❤️‍🔥

Wiki-Talk, 2M/5M -- the same is run in CI and published on website.
Spark 4.0.2

This PR

[info] Result "org.graphframes.benchmarks.ConnectedComponentsBenchmark.benchmarkConnectedComponents":
[info]   87.427 ±(99.9%) 38.537 s/op [Average]
[info]   (min, avg, max) = (85.203, 87.427, 89.406), stdev = 2.112
[info]   CI (99.9%): [48.890, 125.964] (assumes normal distribution)

Main

[info] Result "org.graphframes.benchmarks.ConnectedComponentsBenchmark.benchmarkConnectedComponents":
[info]   129.323 ±(99.9%) 32.521 s/op [Average]
[info]   (min, avg, max) = (127.966, 129.323, 131.341), stdev = 1.783
[info]   CI (99.9%): [96.802, 161.843] (assumes normal distribution)

On the defaults in GraphFrames

GraphFrames is very old project. And ConnectedComponents is a widely used algorithm. So, we are trying not to change defaults in the minor-version updates and we are considering changing defaults as a breaking changes. At the same time, a couple of versions ago I was experimenting with skewedJoin in CC and found that even on the graphs with gigantic component AQE handles skew better than manual collect + broadcast: that is the note in documentation "AQE-mode".

So, we have actually two different implementations of the two_phase: run and runAQE. Can I ask you to put the same leaf-nodes optimization to the runAQE as well? Because at the moment it is still 4x time faster than run with leaf-node optimization (and 6x faster than master)?

[info] Result "org.graphframes.benchmarks.ConnectedComponentsBenchmark.benchmarkConnectedComponents":
[info]   20.206 ±(99.9%) 12.791 s/op [Average]
[info]   (min, avg, max) = (19.401, 20.206, 20.685), stdev = 0.701
[info]   CI (99.9%): [7.415, 32.998] (assumes normal distribution)

P.S.
To reproduce:

sbt:graphframes> benchmarks/Jmh/run -p graphName=wiki-Talk -p algorithm=two_phase -p broadcastThreshold="10000" org.graphframes.benchmarks.ConnectedComponentsBenchmark

and to run in "AQE-mode":

sbt:graphframes> benchmarks/Jmh/run -p graphName=wiki-Talk -p algorithm=two_phase -p broadcastThreshold="-1" org.graphframes.benchmarks.ConnectedComponentsBenchmark

And thanks a again for this contribution! The idea is brilliant and the results look very promising!

if ((edgeCnt < sparsityThreshold * numNodes) && (edgeCnt > 0)
&& (iteration >= optStartIter) && (!triedToOptimize)) {
edgesBeforePruning = ee
pruneLeafNodes(ee, intermediateStorageLevel, numNodes, shrinkageThreshold) match {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the currSum here? I mean if we shrink the graph and convergence should happen next iteration we won't catch that algorithm is converged because the preSum at the next iteration will be currSum from the iteration when shrinking happened but that sum was computed before shrinking and it is always be bigger.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants