From ac208bdbf25ad0296e3f0ad894ab7ec1b6f5b791 Mon Sep 17 00:00:00 2001 From: ericsun95 Date: Sun, 6 Apr 2025 09:31:22 -0700 Subject: [PATCH 1/7] use the dataframe checkpoint in connected component computation use the dataframe checkpoint in connected component computation --- .../org/graphframes/lib/ConnectedComponents.scala | 15 +-------------- src/test/resources/log4j.properties | 4 ++-- .../lib/ConnectedComponentsSuite.scala | 12 +++--------- 3 files changed, 6 insertions(+), 25 deletions(-) diff --git a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala index dd5f59647..1547a548f 100644 --- a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala +++ b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala @@ -355,22 +355,9 @@ object ConnectedComponents extends Logging { // checkpointing if (shouldCheckpoint && (iteration % checkpointInterval == 0)) { - // TODO: remove this after DataFrame.checkpoint is implemented - val out = s"${checkpointDir.get}/$iteration" - ee.write.parquet(out) - // may hit S3 eventually consistent issue - ee = spark.read.parquet(out) - - // remove previous checkpoint - if (iteration > checkpointInterval) { - val path = new Path(s"${checkpointDir.get}/${iteration - checkpointInterval}") - path.getFileSystem(sc.hadoopConfiguration).delete(path, true) - } - - System.gc() // hint Spark to clean shuffle directories + ee = ee.checkpoint(eager = true) } - ee.persist(intermediateStorageLevel) currRoundPersistedDFs = currRoundPersistedDFs :+ ee minNbrs1 = minNbrs(ee) // src >= min_nbr diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index eb67747a6..c8639d9fd 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -6,7 +6,7 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark=WARN -log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO -log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN # Hide many "WARN CacheManager: Asked to cache already cached data" warnings. log4j.logger.org.apache.spark.sql.execution.CacheManager=ERROR diff --git a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala index 3b3fcf4ea..657a78b5f 100644 --- a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala +++ b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala @@ -194,16 +194,10 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon } } - // Checks whether the input DataFrame is from some checkpoint data. - // TODO: The implemetnation is a little hacky. - def isFromCheckpoint(df: DataFrame): Boolean = { - df.queryExecution.logical.toString().toLowerCase.contains("parquet") - } - val components0 = cc.setCheckpointInterval(0).run() assertComponents(components0, expected) assert( - !isFromCheckpoint(components0), + !components0.rdd.isCheckpointed, "The result shouldn't depend on checkpoint data if checkpointing is disabled.") sc.setCheckpointDir(checkpointDir.get) @@ -211,13 +205,13 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon val components1 = cc.setCheckpointInterval(1).run() assertComponents(components1, expected) assert( - isFromCheckpoint(components1), + components1.rdd.isCheckpointed, "The result should depend on checkpoint data if checkpoint interval is 1.") val components10 = cc.setCheckpointInterval(10).run() assertComponents(components10, expected) assert( - !isFromCheckpoint(components10), + !components10.rdd.isCheckpointed, "The result shouldn't depend on checkpoint data if converged before first checkpoint.") } From ad88256337f30f7df02d662d0ca7c19153c54476 Mon Sep 17 00:00:00 2001 From: ericsun95 Date: Sun, 6 Apr 2025 09:31:22 -0700 Subject: [PATCH 2/7] use the dataframe checkpoint in connected component computation use the dataframe checkpoint in connected component computation --- .../org/graphframes/lib/ConnectedComponents.scala | 15 +-------------- .../lib/ConnectedComponentsSuite.scala | 12 +++--------- 2 files changed, 4 insertions(+), 23 deletions(-) diff --git a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala index dd5f59647..1547a548f 100644 --- a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala +++ b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala @@ -355,22 +355,9 @@ object ConnectedComponents extends Logging { // checkpointing if (shouldCheckpoint && (iteration % checkpointInterval == 0)) { - // TODO: remove this after DataFrame.checkpoint is implemented - val out = s"${checkpointDir.get}/$iteration" - ee.write.parquet(out) - // may hit S3 eventually consistent issue - ee = spark.read.parquet(out) - - // remove previous checkpoint - if (iteration > checkpointInterval) { - val path = new Path(s"${checkpointDir.get}/${iteration - checkpointInterval}") - path.getFileSystem(sc.hadoopConfiguration).delete(path, true) - } - - System.gc() // hint Spark to clean shuffle directories + ee = ee.checkpoint(eager = true) } - ee.persist(intermediateStorageLevel) currRoundPersistedDFs = currRoundPersistedDFs :+ ee minNbrs1 = minNbrs(ee) // src >= min_nbr diff --git a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala index 3b3fcf4ea..657a78b5f 100644 --- a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala +++ b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala @@ -194,16 +194,10 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon } } - // Checks whether the input DataFrame is from some checkpoint data. - // TODO: The implemetnation is a little hacky. - def isFromCheckpoint(df: DataFrame): Boolean = { - df.queryExecution.logical.toString().toLowerCase.contains("parquet") - } - val components0 = cc.setCheckpointInterval(0).run() assertComponents(components0, expected) assert( - !isFromCheckpoint(components0), + !components0.rdd.isCheckpointed, "The result shouldn't depend on checkpoint data if checkpointing is disabled.") sc.setCheckpointDir(checkpointDir.get) @@ -211,13 +205,13 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon val components1 = cc.setCheckpointInterval(1).run() assertComponents(components1, expected) assert( - isFromCheckpoint(components1), + components1.rdd.isCheckpointed, "The result should depend on checkpoint data if checkpoint interval is 1.") val components10 = cc.setCheckpointInterval(10).run() assertComponents(components10, expected) assert( - !isFromCheckpoint(components10), + !components10.rdd.isCheckpointed, "The result shouldn't depend on checkpoint data if converged before first checkpoint.") } From ea8cac7d60ca51b29fdcaebde7d1d49632d7a1f5 Mon Sep 17 00:00:00 2001 From: ericsun95 Date: Wed, 23 Apr 2025 00:12:55 -0700 Subject: [PATCH 3/7] Update the test to correctly reflect the checkpoint behavior --- .../graphframes/lib/ConnectedComponents.scala | 10 +++++ .../lib/ConnectedComponentsSuite.scala | 43 ++++++++++++++----- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala index 439cb9623..2b0976be9 100644 --- a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala +++ b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala @@ -327,7 +327,17 @@ object ConnectedComponents extends Logging { // checkpointing if (shouldCheckpoint && (iteration % checkpointInterval == 0)) { + // enable checkpointing if not yet done + if (spark.sparkContext.getCheckpointDir.isEmpty) { + spark.sparkContext.setCheckpointDir(checkpointDir.get) + } ee = ee.checkpoint(eager = true) + // remove previous checkpoint manually if needed + if (iteration > checkpointInterval) { + val oldCheckpointPath = new Path(s"${checkpointDir.get}/${iteration - checkpointInterval}") + oldCheckpointPath.getFileSystem(sc.hadoopConfiguration).delete(oldCheckpointPath, true) + } + System.gc() // hint Spark to clean shuffle directories } currRoundPersistedDFs = currRoundPersistedDFs :+ ee diff --git a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala index 886cad04c..66bf7c418 100644 --- a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala +++ b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala @@ -17,17 +17,17 @@ package org.graphframes.lib -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.Row -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.functions.lit +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types.DataTypes +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.storage.StorageLevel import org.graphframes.GraphFrame._ import org.graphframes._ import org.graphframes.examples.Graphs import java.io.IOException +import java.net.URI import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -173,6 +173,18 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon assert(components.groupBy("component").count().count() === 1L) } + def listCheckpointFiles(path: String): Set[String] = { + val fs = FileSystem.get(new URI(path), spark.sparkContext.hadoopConfiguration) + val checkpointPath = new Path(path) + if (!fs.exists(checkpointPath)) return Set.empty + fs.listStatus(checkpointPath) + .flatMap { + case status if status.isDirectory => + fs.listStatus(status.getPath).map(_.getPath.toString) + case status => Seq(status.getPath.toString) + }.toSet + } + test("checkpoint interval") { val friends = Graphs.friends val expected = Set(Set("a", "b", "c", "d", "e", "f"), Set("g")) @@ -182,8 +194,12 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon cc.getCheckpointInterval === 2, s"Default checkpoint interval should be 2, but got ${cc.getCheckpointInterval}.") - val checkpointDir = sc.getCheckpointDir - assert(checkpointDir.nonEmpty) + val checkpointDirOpt = sc.getCheckpointDir + assert(checkpointDirOpt.nonEmpty) + val checkpointDir = checkpointDirOpt.get + // clean up the checkpoint dir first to avoid test conflicts + val checkpointPath = new Path(s"${checkpointDir}/") + checkpointPath.getFileSystem(sc.hadoopConfiguration).delete(checkpointPath, true) sc.setCheckpointDir(null) withClue( @@ -194,24 +210,31 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon } } + // The `dataframe.rdd.isCheckpointed` behavior changed and can't be used to verify whether + // a dataframe is checkpointed, so we have check the files instead val components0 = cc.setCheckpointInterval(0).run() + val after0 = listCheckpointFiles(checkpointDir) assertComponents(components0, expected) assert( - !components0.rdd.isCheckpointed, + after0.isEmpty, "The result shouldn't depend on checkpoint data if checkpointing is disabled.") - sc.setCheckpointDir(checkpointDir.get) + sc.setCheckpointDir(checkpointDir) + val before1 = listCheckpointFiles(checkpointDir) val components1 = cc.setCheckpointInterval(1).run() + val after1 = listCheckpointFiles(checkpointDir) assertComponents(components1, expected) assert( - components1.rdd.isCheckpointed, + (after1 -- before1).nonEmpty, "The result should depend on checkpoint data if checkpoint interval is 1.") + val before10 = listCheckpointFiles(checkpointDir) val components10 = cc.setCheckpointInterval(10).run() + val after10 = listCheckpointFiles(checkpointDir) assertComponents(components10, expected) assert( - !components10.rdd.isCheckpointed, + (after10 -- before10).isEmpty, "The result shouldn't depend on checkpoint data if converged before first checkpoint.") } From 6bd31f6fae81e72c08305f430b5d20d8c745b4f2 Mon Sep 17 00:00:00 2001 From: ericsun95 Date: Wed, 23 Apr 2025 00:15:52 -0700 Subject: [PATCH 4/7] format the code --- .../scala/org/graphframes/lib/ConnectedComponents.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala index 2b0976be9..b49c85eb1 100644 --- a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala +++ b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala @@ -334,8 +334,11 @@ object ConnectedComponents extends Logging { ee = ee.checkpoint(eager = true) // remove previous checkpoint manually if needed if (iteration > checkpointInterval) { - val oldCheckpointPath = new Path(s"${checkpointDir.get}/${iteration - checkpointInterval}") - oldCheckpointPath.getFileSystem(sc.hadoopConfiguration).delete(oldCheckpointPath, true) + val oldCheckpointPath = new Path( + s"${checkpointDir.get}/${iteration - checkpointInterval}") + oldCheckpointPath + .getFileSystem(sc.hadoopConfiguration) + .delete(oldCheckpointPath, true) } System.gc() // hint Spark to clean shuffle directories } From 47feeb45ee3821ace0bb367a0af800d67666da9e Mon Sep 17 00:00:00 2001 From: ericsun95 Date: Wed, 23 Apr 2025 00:21:06 -0700 Subject: [PATCH 5/7] format the code --- .../scala/org/graphframes/lib/ConnectedComponentsSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala index 66bf7c418..7512da35a 100644 --- a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala +++ b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala @@ -182,7 +182,8 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon case status if status.isDirectory => fs.listStatus(status.getPath).map(_.getPath.toString) case status => Seq(status.getPath.toString) - }.toSet + } + .toSet } test("checkpoint interval") { From d1c0b07148367a0b1f46facdb4ea6be1626c7247 Mon Sep 17 00:00:00 2001 From: ericsun95 Date: Wed, 23 Apr 2025 00:33:11 -0700 Subject: [PATCH 6/7] format the code --- .../scala/org/graphframes/lib/ConnectedComponentsSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala index 7512da35a..c1b678733 100644 --- a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala +++ b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala @@ -180,8 +180,9 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon fs.listStatus(checkpointPath) .flatMap { case status if status.isDirectory => - fs.listStatus(status.getPath).map(_.getPath.toString) - case status => Seq(status.getPath.toString) + fs.listStatus(status.getPath).toSeq.map(_.getPath.toString) + case status => + Seq(status.getPath.toString) } .toSet } From e3bc8b45f9a116f667a39a701aa89d79eb2ffcc5 Mon Sep 17 00:00:00 2001 From: ericsun95 <31479694+ericsun95@users.noreply.github.com> Date: Wed, 23 Apr 2025 10:25:58 -0700 Subject: [PATCH 7/7] Update log4j.properties --- src/test/resources/log4j.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index c8639d9fd..eb67747a6 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -6,7 +6,7 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark=WARN -log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN -log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO # Hide many "WARN CacheManager: Asked to cache already cached data" warnings. log4j.logger.org.apache.spark.sql.execution.CacheManager=ERROR