Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,7 @@ jobs:
include:
- spark-version: 3.5.0
scala-version: 2.12.18
python-version: 3.9
- spark-version: 3.4.1
scala-version: 2.12.17
python-version: 3.9
- spark-version: 3.3.3
scala-version: 2.12.15
python-version: 3.9
- spark-version: 3.2.4
scala-version: 2.12.15
python-version: 3.9
python-version: 3.9.19
runs-on: ubuntu-22.04
env:
# define Java options for both official sbt and sbt-extras
Expand All @@ -36,7 +27,7 @@ jobs:
~/.ivy2/cache
key: sbt-ivy-cache-spark-${{ matrix.spark-version}}-scala-${{ matrix.scala-version }}
- name: Assembly
run: sbt -v ++${{ matrix.scala-version }} -Dspark.version=${{ matrix.spark-version }} "set test in assembly := {}" assembly
run: build/sbt -v ++${{ matrix.scala-version }} -Dspark.version=${{ matrix.spark-version }} "set test in assembly := {}" assembly
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
Expand Down
14 changes: 1 addition & 13 deletions .github/workflows/scala-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,6 @@ jobs:
scala-version: 2.13.8
- spark-version: 3.5.0
scala-version: 2.12.12
- spark-version: 3.4.1
scala-version: 2.13.8
- spark-version: 3.4.1
scala-version: 2.12.12
- spark-version: 3.3.3
scala-version: 2.13.8
- spark-version: 3.3.3
scala-version: 2.12.12
- spark-version: 3.2.4
scala-version: 2.13.5
- spark-version: 3.2.4
scala-version: 2.12.12
runs-on: ubuntu-22.04
env:
# fixing this error after tests success: sbt.ForkMain failed with exit code 134
Expand All @@ -39,5 +27,5 @@ jobs:
~/.ivy2/cache
key: sbt-ivy-cache-spark-${{ matrix.spark-version}}-scala-${{ matrix.scala-version }}
- name: Build and Test
run: sbt -v ++${{ matrix.scala-version }} -Dspark.version=${{ matrix.spark-version }} coverage test coverageReport
run: build/sbt -v ++${{ matrix.scala-version }} -Dspark.version=${{ matrix.spark-version }} coverage test coverageReport
- uses: codecov/codecov-action@v3
6 changes: 0 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@

import ReleaseTransformations._

resolvers += "Spark snapshot repository" at "https://repository.apache.org/snapshots/"

val sparkVer = sys.props.getOrElse("spark.version", "3.5.0")
val sparkBranch = sparkVer.substring(0, 3)
val defaultScalaVer = sparkBranch match {
case "3.5" => "2.12.18"
case "3.4" => "2.12.17"
case "3.3" => "2.12.15"
case "3.2" => "2.12.15"
case _ => throw new IllegalArgumentException(s"Unsupported Spark version: $sparkVer.")
}
val scalaVer = sys.props.getOrElse("scala.version", defaultScalaVer)
val defaultScalaTestVer = scalaVer match {
case s if s.startsWith("2.12") || s.startsWith("2.13") => "3.0.8"
case s if s.startsWith("2.11") => "2.2.6" // scalatest_2.11 does not have 2.0 published
}

sparkVersion := sparkVer
Expand Down
67 changes: 46 additions & 21 deletions src/main/scala/org/graphframes/lib/ConnectedComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,49 @@ object ConnectedComponents extends Logging {

var converged = false
var iteration = 1
var prevSum: BigDecimal = null

def _calcMinNbrSum(minNbrsDF: DataFrame): BigDecimal = {
// Taking the sum in DecimalType to preserve precision.
// We use 20 digits for long values and Spark SQL will add 10 digits for the sum.
// It should be able to handle 200 billion edges without overflow.
val (minNbrSum, cnt) = minNbrsDF.select(sum(col(MIN_NBR).cast(DecimalType(20, 0))), count("*")).rdd
.map { r =>
(r.getAs[BigDecimal](0), r.getLong(1))
}.first()
if (cnt != 0L && minNbrSum == null) {
throw new ArithmeticException(
s"""
|The total sum of edge src IDs is used to determine convergence during iterations.
|However, the total sum at iteration $iteration exceeded 30 digits (1e30),
|which should happen only if the graph contains more than 200 billion edges.
|If not, please file a bug report at https://github.com/graphframes/graphframes/issues.
""".stripMargin)
}
minNbrSum
}
// compute min neighbors (including self-min)
var minNbrs1: DataFrame = minNbrs(ee) // src >= min_nbr
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to do the first minimization vector connection reduction outside of the while loop. +1

.persist(intermediateStorageLevel)

var prevSum: BigDecimal = _calcMinNbrSum(minNbrs1)

var lastRoundPersistedDFs = Seq[DataFrame](ee, minNbrs1)
while (!converged) {
var currRoundPersistedDFs = Seq[DataFrame]()
// large-star step
// compute min neighbors (including self-min)
val minNbrs1 = minNbrs(ee) // src >= min_nbr
.persist(intermediateStorageLevel)
// connect all strictly larger neighbors to the min neighbor (including self)
ee = skewedJoin(ee, minNbrs1, broadcastThreshold, logPrefix)
.select(col(DST).as(SRC), col(MIN_NBR).as(DST)) // src > dst
.distinct()
.persist(intermediateStorageLevel)
currRoundPersistedDFs = currRoundPersistedDFs :+ ee

// small-star step
// compute min neighbors (excluding self-min)
val minNbrs2 = ee.groupBy(col(SRC)).agg(min(col(DST)).as(MIN_NBR), count("*").as(CNT)) // src > min_nbr
.persist(intermediateStorageLevel)
currRoundPersistedDFs = currRoundPersistedDFs :+ minNbrs2

// connect all smaller neighbors to the min neighbor
ee = skewedJoin(ee, minNbrs2, broadcastThreshold, logPrefix)
.select(col(MIN_NBR).as(SRC), col(DST)) // src <= dst
Expand All @@ -355,25 +382,14 @@ object ConnectedComponents extends Logging {
}

ee.persist(intermediateStorageLevel)
currRoundPersistedDFs = currRoundPersistedDFs :+ ee

// test convergence
minNbrs1 = minNbrs(ee) // src >= min_nbr
.persist(intermediateStorageLevel)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Within the termination of the while loop, should we also explicitly call unpersist() in order to remove the cache references to these pseudo-mutable vars that represent an immutable cache state?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will revisit and fix the persist/unpersist operations

currRoundPersistedDFs = currRoundPersistedDFs :+ minNbrs1

// Taking the sum in DecimalType to preserve precision.
// We use 20 digits for long values and Spark SQL will add 10 digits for the sum.
// It should be able to handle 200 billion edges without overflow.
val (currSum, cnt) = ee.select(sum(col(SRC).cast(DecimalType(20, 0))), count("*")).rdd
.map { r =>
(r.getAs[BigDecimal](0), r.getLong(1))
}.first()
if (cnt != 0L && currSum == null) {
throw new ArithmeticException(
s"""
|The total sum of edge src IDs is used to determine convergence during iterations.
|However, the total sum at iteration $iteration exceeded 30 digits (1e30),
|which should happen only if the graph contains more than 200 billion edges.
|If not, please file a bug report at https://github.com/graphframes/graphframes/issues.
""".stripMargin)
}
// test convergence
val currSum = _calcMinNbrSum(minNbrs1)
logInfo(s"$logPrefix Sum of assigned components in iteration $iteration: $currSum.")
if (currSum == prevSum) {
// This also covers the case when cnt = 0 and currSum is null, which means no edges.
Expand All @@ -382,6 +398,15 @@ object ConnectedComponents extends Logging {
prevSum = currSum
}

// materialize all persisted DataFrames in current round,
// then we can unpersist last round persisted DataFrames.
for (persisted_df <- currRoundPersistedDFs) {
persisted_df.count() // materialize it.
}
for (persisted_df <- lastRoundPersistedDFs) {
persisted_df.unpersist()
}
lastRoundPersistedDFs = currRoundPersistedDFs
iteration += 1
}

Expand Down