Skip to content

Commit e3ee263

Browse files
committed
Replace edge checkpointing with spark dataset.checkpoint
1 parent 71c6e43 commit e3ee263

1 file changed

Lines changed: 1 addition & 12 deletions

File tree

src/main/scala/org/graphframes/lib/ConnectedComponents.scala

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -340,18 +340,7 @@ object ConnectedComponents extends Logging {
340340

341341
// checkpointing
342342
if (shouldCheckpoint && (iteration % checkpointInterval == 0)) {
343-
// TODO: remove this after DataFrame.checkpoint is implemented
344-
val out = s"${checkpointDir.get}/$iteration"
345-
ee.write.parquet(out)
346-
// may hit S3 eventually consistent issue
347-
ee = sqlContext.read.parquet(out)
348-
349-
// remove previous checkpoint
350-
if (iteration > checkpointInterval) {
351-
val path = new Path(s"${checkpointDir.get}/${iteration - checkpointInterval}")
352-
path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
353-
}
354-
343+
ee = ee.checkpoint()
355344
System.gc() // hint Spark to clean shuffle directories
356345
}
357346

0 commit comments

Comments
 (0)