diff --git a/core/src/main/scala/org/graphframes/lib/KCore.scala b/core/src/main/scala/org/graphframes/lib/KCore.scala index 3db02ab28..f83e5d47a 100644 --- a/core/src/main/scala/org/graphframes/lib/KCore.scala +++ b/core/src/main/scala/org/graphframes/lib/KCore.scala @@ -28,6 +28,11 @@ import org.graphframes.WithLocalCheckpoints * * Mandal, Aritra, and Mohammad Al Hasan. "A distributed k-core decomposition algorithm on spark." * 2017 IEEE International Conference on Big Data (Big Data). IEEE, 2017. + * + * '''Edge representation''': K-core decomposition is defined for undirected graphs. Since + * GraphFrames represents edges as directed, each undirected edge `{u, v}` should be supplied as a + * single directed edge in either direction — the algorithm symmetrizes internally. Supplying both + * `(u, v)` and `(v, u)` will double-count the edge and produce incorrect results. */ class KCore private[graphframes] (private val graph: GraphFrame) extends Serializable @@ -80,8 +85,8 @@ object KCore extends Serializable with Logging { kCoreColumnName, col("degree"), call_function("_kcoreMerge", Pregel.msg, col(kCoreColumnName))) - .sendMsgToSrc(Pregel.src(kCoreColumnName)) - .sendMsgToDst(Pregel.dst(kCoreColumnName)) + .sendMsgToSrc(Pregel.dst(kCoreColumnName)) + .sendMsgToDst(Pregel.src(kCoreColumnName)) .setInitialActiveVertexExpression(lit(true)) .setUpdateActiveVertexExpression( col(kCoreColumnName) =!= call_function("_kcoreMerge", Pregel.msg, col(kCoreColumnName))) diff --git a/core/src/test/scala/org/graphframes/lib/KCoreSuite.scala b/core/src/test/scala/org/graphframes/lib/KCoreSuite.scala index 1a3e27376..aeb09db5d 100644 --- a/core/src/test/scala/org/graphframes/lib/KCoreSuite.scala +++ b/core/src/test/scala/org/graphframes/lib/KCoreSuite.scala @@ -65,31 +65,27 @@ class KCoreSuite extends SparkFunSuite with GraphFrameTestSparkContext { TestUtils.checkColumnType(result.schema, "kcore", DataTypes.IntegerType) assert(result.count() === 4) val rows = result.collect() - // Center vertex should have k-core value of 3, leaf vertices should have k-core value of 1 + // All vertices have k-core = 1: despite the center having degree 3, each leaf has + // only one neighbor so no 2-core can form, pulling everything down to 1. rows.foreach { row => - val id = row.getAs[Long]("id") - val kcore = row.getAs[Int]("kcore") - if (id == 0L) { - assert(kcore === 3) - } else { - assert(kcore === 1) - } + assert(row.getAs[Int]("kcore") === 1) } result.unpersist() } test("chain graph") { + // Open chain: 0 - 1 - 2. + // No 2-core exists: endpoints have degree 1, so the whole graph is only a 1-core. + // All vertices get kcore = 1. val v = spark.createDataFrame(Seq((0L, "a"), (1L, "b"), (2L, "c"))).toDF("id", "name") - val e = spark.createDataFrame(Seq((0L, 1L), (1L, 2L), (2L, 0L))).toDF("src", "dst") + val e = spark.createDataFrame(Seq((0L, 1L), (1L, 2L))).toDF("src", "dst") val g = GraphFrame(v, e) val result = g.kCore.run() TestUtils.checkColumnType(result.schema, "kcore", DataTypes.IntegerType) assert(result.count() === 3) val rows = result.collect() - // All vertices should have k-core value of 2 - // because graph is cosidered as undirected rows.foreach { row => - assert(row.getAs[Int]("kcore") === 2) + assert(row.getAs[Int]("kcore") === 1) } result.unpersist() } @@ -188,14 +184,14 @@ class KCoreSuite extends SparkFunSuite with GraphFrameTestSparkContext { val rows = result.collect() // Check that we have a range of k-core values val kcoreValues = rows.map(_.getAs[Int]("kcore")).distinct.sorted - assert(kcoreValues.length > 3, "Should have more than 3 distinct k-core values") + assert(kcoreValues.length > 2, "Should have at least 3 distinct k-core values") // Verify specific expected patterns val kcoreMap = rows.map(row => row.getAs[Long]("id") -> row.getAs[Int]("kcore")).toMap // Vertices in the highly connected cluster should have higher k-core values - assert(kcoreMap(0L) >= 4, "Central vertex should have high k-core") - assert(kcoreMap(1L) >= 3, "Well-connected vertex should have medium-high k-core") + assert(kcoreMap(0L) >= 3, "Central vertex should have high k-core") + assert(kcoreMap(1L) >= 3, "Well-connected vertex should have high k-core") // Leaf nodes should have lower k-core values assert(kcoreMap(18L) <= 2, "Leaf node should have low k-core") @@ -294,4 +290,36 @@ class KCoreSuite extends SparkFunSuite with GraphFrameTestSparkContext { result.unpersist() } + + test("triangle with tail - exact kcore values") { + // This graph has vertices where degree != kcore, which is important to test correctness: + // it would catch a buggy implementation that converges too early (e.g. after one superstep), which + // would return kcore = degree for all vertices and pass simpler tests. + // + // Undirected graph: + // + // Triangle: 1 - 2 - 3 - 1 (kcore = 2, they form the 2-core) + // Tail: 1 - 4 - 5 (kcore = 1, pendant chain excluded from the 2-core) + // + // Degrees: 1→3, 2→2, 3→2, 4→2, 5→1 (degree != kcore for vertices 1 and 4) + val v = spark + .createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e"))) + .toDF("id", "name") + val e = spark + .createDataFrame(Seq((1L, 2L), (2L, 3L), (3L, 1L), (1L, 4L), (4L, 5L))) + .toDF("src", "dst") + val g = GraphFrame(v, e) + val result = g.kCore.run() + TestUtils.checkColumnType(result.schema, "kcore", DataTypes.IntegerType) + assert(result.count() === 5) + val kcoreMap = result.collect().map(r => r.getAs[Long]("id") -> r.getAs[Int]("kcore")).toMap + // Triangle vertices form the 2-core + assert(kcoreMap(1L) === 2) + assert(kcoreMap(2L) === 2) + assert(kcoreMap(3L) === 2) + // Tail vertices are only in the 1-core + assert(kcoreMap(4L) === 1) + assert(kcoreMap(5L) === 1) + result.unpersist() + } }