From 3dac6e71ced7950c5edef614d892bfd29ea99b28 Mon Sep 17 00:00:00 2001 From: architch Date: Sat, 9 Feb 2019 21:48:38 +0530 Subject: [PATCH 1/2] Changes the output label of CC to min of original ID --- .../org/graphframes/lib/ConnectedComponents.scala | 13 ++++++++++--- .../graphframes/lib/ConnectedComponentsSuite.scala | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala index 1d1b96d54..fa6da5bde 100644 --- a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala +++ b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala @@ -388,8 +388,15 @@ object ConnectedComponents extends Logging { logInfo(s"$logPrefix Connected components converged in ${iteration - 1} iterations.") logInfo(s"$logPrefix Join and return component assignments with original vertex IDs.") - vv.join(ee, vv(ID) === ee(DST), "left_outer") - .select(vv(ATTR), when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT)) - .select(col(s"$ATTR.*"), col(COMPONENT)) + val indexedLabel=vv.join(ee, vv(ID) === ee(DST), "left_outer") + .select(vv(ATTR), when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT) + ,col(ATTR+"."+ID).as(ID)) + if(graph.hasIntegralIdType) + indexedLabel.select(col(s"$ATTR.*"), col(COMPONENT)) + else + indexedLabel.join(indexedLabel.groupBy(col(COMPONENT)).agg(min(col(ID)).as(ORIG_ID)) + .select(col(COMPONENT),col(ORIG_ID)) + ,COMPONENT) + .select(col(s"$ATTR.*"),col(ORIG_ID).as(COMPONENT)) } } diff --git a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala index 65534a463..c602a38fb 100644 --- a/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala +++ b/src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala @@ -235,7 +235,7 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon expected: Set[Set[T]]): Unit = { import actual.sqlContext.implicits._ // note: not using agg + collect_list because collect_list is not available in 1.6.2 w/o hive - val actualComponents = actual.select("component", "id").as[(Long, T)].rdd + val actualComponents = actual.select("component", "id").as[(T, T)].rdd .groupByKey() .values .map(_.toSeq) From 23c771c9c8b0f7fffefa2d0ac32dbedaf2dc0f93 Mon Sep 17 00:00:00 2001 From: architch Date: Tue, 1 Apr 2025 23:03:10 +0530 Subject: [PATCH 2/2] fix formatting and add ORIG_ID --- .../graphframes/lib/ConnectedComponents.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala index ce709a70f..34daf0717 100644 --- a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala +++ b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala @@ -171,6 +171,7 @@ object ConnectedComponents extends Logging { import org.graphframes.GraphFrame._ private val COMPONENT = "component" + private val ORIG_ID = "orig_id" private val MIN_NBR = "min_nbr" private val CNT = "cnt" private val CHECKPOINT_NAME_PREFIX = "connected-components" @@ -435,16 +436,24 @@ object ConnectedComponents extends Logging { logInfo(s"$logPrefix Connected components converged in ${iteration - 1} iterations.") logInfo(s"$logPrefix Join and return component assignments with original vertex IDs.") - val indexedLabel = vv.join(ee, vv(ID) === ee(DST), "left_outer") - .select(vv(ATTR), when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT), + val indexedLabel = vv + .join(ee, vv(ID) === ee(DST), "left_outer") + .select( + vv(ATTR), + when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT), col(ATTR + "." + ID).as(ID)) val output = if (graph.hasIntegralIdType) { - indexedLabel.select(col(s"$ATTR.*"), col(COMPONENT)) + indexedLabel + .select(col(s"$ATTR.*"), col(COMPONENT)) .persist(intermediateStorageLevel) } else { - indexedLabel.join(indexedLabel.groupBy(col(COMPONENT)) - .agg(min(col(ID)).as(ORIG_ID)) - .select(col(COMPONENT), col(ORIG_ID)), COMPONENT) + indexedLabel + .join( + indexedLabel + .groupBy(col(COMPONENT)) + .agg(min(col(ID)).as(ORIG_ID)) + .select(col(COMPONENT), col(ORIG_ID)), + COMPONENT) .select(col(s"$ATTR.*"), col(ORIG_ID).as(COMPONENT)) .persist(intermediateStorageLevel) }