diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index 075f9f582..3ec1d94dd 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -8,16 +8,7 @@ jobs: include: - spark-version: 3.5.0 scala-version: 2.12.18 - python-version: 3.9 - - spark-version: 3.4.1 - scala-version: 2.12.17 - python-version: 3.9 - - spark-version: 3.3.3 - scala-version: 2.12.15 - python-version: 3.9 - - spark-version: 3.2.4 - scala-version: 2.12.15 - python-version: 3.9 + python-version: 3.9.19 runs-on: ubuntu-22.04 env: # define Java options for both official sbt and sbt-extras @@ -36,7 +27,7 @@ jobs: ~/.ivy2/cache key: sbt-ivy-cache-spark-${{ matrix.spark-version}}-scala-${{ matrix.scala-version }} - name: Assembly - run: sbt -v ++${{ matrix.scala-version }} -Dspark.version=${{ matrix.spark-version }} "set test in assembly := {}" assembly + run: build/sbt -v ++${{ matrix.scala-version }} -Dspark.version=${{ matrix.spark-version }} "set test in assembly := {}" assembly - uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} diff --git a/.github/workflows/scala-ci.yml b/.github/workflows/scala-ci.yml index 240f45853..b47dd2491 100644 --- a/.github/workflows/scala-ci.yml +++ b/.github/workflows/scala-ci.yml @@ -10,18 +10,6 @@ jobs: scala-version: 2.13.8 - spark-version: 3.5.0 scala-version: 2.12.12 - - spark-version: 3.4.1 - scala-version: 2.13.8 - - spark-version: 3.4.1 - scala-version: 2.12.12 - - spark-version: 3.3.3 - scala-version: 2.13.8 - - spark-version: 3.3.3 - scala-version: 2.12.12 - - spark-version: 3.2.4 - scala-version: 2.13.5 - - spark-version: 3.2.4 - scala-version: 2.12.12 runs-on: ubuntu-22.04 env: # fixing this error after tests success: sbt.ForkMain failed with exit code 134 @@ -39,5 +27,5 @@ jobs: ~/.ivy2/cache key: sbt-ivy-cache-spark-${{ matrix.spark-version}}-scala-${{ matrix.scala-version }} - name: Build and Test - run: sbt -v ++${{ matrix.scala-version }} -Dspark.version=${{ matrix.spark-version }} coverage test coverageReport + run: build/sbt -v ++${{ matrix.scala-version }} -Dspark.version=${{ matrix.spark-version }} coverage test coverageReport - uses: codecov/codecov-action@v3 diff --git a/build.sbt b/build.sbt index df608c00e..108b371c2 100644 --- a/build.sbt +++ b/build.sbt @@ -3,21 +3,15 @@ import ReleaseTransformations._ -resolvers += "Spark snapshot repository" at "https://repository.apache.org/snapshots/" - val sparkVer = sys.props.getOrElse("spark.version", "3.5.0") val sparkBranch = sparkVer.substring(0, 3) val defaultScalaVer = sparkBranch match { case "3.5" => "2.12.18" - case "3.4" => "2.12.17" - case "3.3" => "2.12.15" - case "3.2" => "2.12.15" case _ => throw new IllegalArgumentException(s"Unsupported Spark version: $sparkVer.") } val scalaVer = sys.props.getOrElse("scala.version", defaultScalaVer) val defaultScalaTestVer = scalaVer match { case s if s.startsWith("2.12") || s.startsWith("2.13") => "3.0.8" - case s if s.startsWith("2.11") => "2.2.6" // scalatest_2.11 does not have 2.0 published } sparkVersion := sparkVer diff --git a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala index 96d8669c5..33c6b1ab5 100644 --- a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala +++ b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala @@ -313,22 +313,49 @@ object ConnectedComponents extends Logging { var converged = false var iteration = 1 - var prevSum: BigDecimal = null + + def _calcMinNbrSum(minNbrsDF: DataFrame): BigDecimal = { + // Taking the sum in DecimalType to preserve precision. + // We use 20 digits for long values and Spark SQL will add 10 digits for the sum. + // It should be able to handle 200 billion edges without overflow. + val (minNbrSum, cnt) = minNbrsDF.select(sum(col(MIN_NBR).cast(DecimalType(20, 0))), count("*")).rdd + .map { r => + (r.getAs[BigDecimal](0), r.getLong(1)) + }.first() + if (cnt != 0L && minNbrSum == null) { + throw new ArithmeticException( + s""" + |The total sum of edge src IDs is used to determine convergence during iterations. + |However, the total sum at iteration $iteration exceeded 30 digits (1e30), + |which should happen only if the graph contains more than 200 billion edges. + |If not, please file a bug report at https://github.com/graphframes/graphframes/issues. + """.stripMargin) + } + minNbrSum + } + // compute min neighbors (including self-min) + var minNbrs1: DataFrame = minNbrs(ee) // src >= min_nbr + .persist(intermediateStorageLevel) + + var prevSum: BigDecimal = _calcMinNbrSum(minNbrs1) + + var lastRoundPersistedDFs = Seq[DataFrame](ee, minNbrs1) while (!converged) { + var currRoundPersistedDFs = Seq[DataFrame]() // large-star step - // compute min neighbors (including self-min) - val minNbrs1 = minNbrs(ee) // src >= min_nbr - .persist(intermediateStorageLevel) // connect all strictly larger neighbors to the min neighbor (including self) ee = skewedJoin(ee, minNbrs1, broadcastThreshold, logPrefix) .select(col(DST).as(SRC), col(MIN_NBR).as(DST)) // src > dst .distinct() .persist(intermediateStorageLevel) + currRoundPersistedDFs = currRoundPersistedDFs :+ ee // small-star step // compute min neighbors (excluding self-min) val minNbrs2 = ee.groupBy(col(SRC)).agg(min(col(DST)).as(MIN_NBR), count("*").as(CNT)) // src > min_nbr .persist(intermediateStorageLevel) + currRoundPersistedDFs = currRoundPersistedDFs :+ minNbrs2 + // connect all smaller neighbors to the min neighbor ee = skewedJoin(ee, minNbrs2, broadcastThreshold, logPrefix) .select(col(MIN_NBR).as(SRC), col(DST)) // src <= dst @@ -355,25 +382,14 @@ object ConnectedComponents extends Logging { } ee.persist(intermediateStorageLevel) + currRoundPersistedDFs = currRoundPersistedDFs :+ ee - // test convergence + minNbrs1 = minNbrs(ee) // src >= min_nbr + .persist(intermediateStorageLevel) + currRoundPersistedDFs = currRoundPersistedDFs :+ minNbrs1 - // Taking the sum in DecimalType to preserve precision. - // We use 20 digits for long values and Spark SQL will add 10 digits for the sum. - // It should be able to handle 200 billion edges without overflow. - val (currSum, cnt) = ee.select(sum(col(SRC).cast(DecimalType(20, 0))), count("*")).rdd - .map { r => - (r.getAs[BigDecimal](0), r.getLong(1)) - }.first() - if (cnt != 0L && currSum == null) { - throw new ArithmeticException( - s""" - |The total sum of edge src IDs is used to determine convergence during iterations. - |However, the total sum at iteration $iteration exceeded 30 digits (1e30), - |which should happen only if the graph contains more than 200 billion edges. - |If not, please file a bug report at https://github.com/graphframes/graphframes/issues. - """.stripMargin) - } + // test convergence + val currSum = _calcMinNbrSum(minNbrs1) logInfo(s"$logPrefix Sum of assigned components in iteration $iteration: $currSum.") if (currSum == prevSum) { // This also covers the case when cnt = 0 and currSum is null, which means no edges. @@ -382,6 +398,15 @@ object ConnectedComponents extends Logging { prevSum = currSum } + // materialize all persisted DataFrames in current round, + // then we can unpersist last round persisted DataFrames. + for (persisted_df <- currRoundPersistedDFs) { + persisted_df.count() // materialize it. + } + for (persisted_df <- lastRoundPersistedDFs) { + persisted_df.unpersist() + } + lastRoundPersistedDFs = currRoundPersistedDFs iteration += 1 }