We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 71c6e43 commit e3ee263Copy full SHA for e3ee263
1 file changed
src/main/scala/org/graphframes/lib/ConnectedComponents.scala
@@ -340,18 +340,7 @@ object ConnectedComponents extends Logging {
340
341
// checkpointing
342
if (shouldCheckpoint && (iteration % checkpointInterval == 0)) {
343
- // TODO: remove this after DataFrame.checkpoint is implemented
344
- val out = s"${checkpointDir.get}/$iteration"
345
- ee.write.parquet(out)
346
- // may hit S3 eventually consistent issue
347
- ee = sqlContext.read.parquet(out)
348
-
349
- // remove previous checkpoint
350
- if (iteration > checkpointInterval) {
351
- val path = new Path(s"${checkpointDir.get}/${iteration - checkpointInterval}")
352
- path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
353
- }
354
+ ee = ee.checkpoint()
355
System.gc() // hint Spark to clean shuffle directories
356
}
357
0 commit comments