From 75e244e87fe66ad407c130d8f09ad3d5af1c4f33 Mon Sep 17 00:00:00 2001 From: David Girardier Date: Tue, 3 Mar 2026 22:26:02 +0100 Subject: [PATCH 1/2] fix(kcore): swap sendMsgToSrc/sendMsgToDst column references Each directed edge (u,v) was sending u's own kcore back to u and v's own kcore back to v, so no vertex ever received a neighbour's value. The algorithm converged immediately after one superstep returning kcore = degree for all vertices. Fix: swap Pregel.src <-> Pregel.dst so each endpoint receives the opposite end's value, matching the Mandal & Hasan (2017) algorithm. Also: - Document that the implementation treats the graph as undirected (total degree + bidirectional message passing per edge); supplying both (u,v) and (v,u) double-counts and produces wrong results. - Fix star-graph test: expected center kcore 3 -> 1 (correct textbook value; no 2-core can form when leaves each have one neighbour). - Replace chain-graph test: was a copy of the triangle test with the same cyclic edges; now uses an actual open chain (0->1->2) with the correct assertion kcore=1 for all vertices and an explanatory comment. - Tighten medium-graph range assertions to values produced by the correct algorithm (distinct count >= 3, central vertex >= 3). - Add triangle-with-tail regression test with exact oracle values (kcore={1:2,2:2,3:2,4:1,5:1}) that catches the swapped-message bug. Co-Authored-By: Claude --- .../scala/org/graphframes/lib/KCore.scala | 9 ++- .../org/graphframes/lib/KCoreSuite.scala | 58 ++++++++++++++----- 2 files changed, 50 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/graphframes/lib/KCore.scala b/core/src/main/scala/org/graphframes/lib/KCore.scala index 3db02ab28..dbd54b12f 100644 --- a/core/src/main/scala/org/graphframes/lib/KCore.scala +++ b/core/src/main/scala/org/graphframes/lib/KCore.scala @@ -28,6 +28,11 @@ import org.graphframes.WithLocalCheckpoints * * Mandal, Aritra, and Mohammad Al Hasan. "A distributed k-core decomposition algorithm on spark." * 2017 IEEE International Conference on Big Data (Big Data). IEEE, 2017. + * + * '''Edge representation''': K-core decomposition is defined for undirected graphs. Since + * GraphFrames represents edges as directed, each undirected edge `{u, v}` should be supplied + * as a single directed edge in either direction — the algorithm symmetrizes internally. + * Supplying both `(u, v)` and `(v, u)` will double-count the edge and produce incorrect results. */ class KCore private[graphframes] (private val graph: GraphFrame) extends Serializable @@ -80,8 +85,8 @@ object KCore extends Serializable with Logging { kCoreColumnName, col("degree"), call_function("_kcoreMerge", Pregel.msg, col(kCoreColumnName))) - .sendMsgToSrc(Pregel.src(kCoreColumnName)) - .sendMsgToDst(Pregel.dst(kCoreColumnName)) + .sendMsgToSrc(Pregel.dst(kCoreColumnName)) + .sendMsgToDst(Pregel.src(kCoreColumnName)) .setInitialActiveVertexExpression(lit(true)) .setUpdateActiveVertexExpression( col(kCoreColumnName) =!= call_function("_kcoreMerge", Pregel.msg, col(kCoreColumnName))) diff --git a/core/src/test/scala/org/graphframes/lib/KCoreSuite.scala b/core/src/test/scala/org/graphframes/lib/KCoreSuite.scala index 1a3e27376..aeb09db5d 100644 --- a/core/src/test/scala/org/graphframes/lib/KCoreSuite.scala +++ b/core/src/test/scala/org/graphframes/lib/KCoreSuite.scala @@ -65,31 +65,27 @@ class KCoreSuite extends SparkFunSuite with GraphFrameTestSparkContext { TestUtils.checkColumnType(result.schema, "kcore", DataTypes.IntegerType) assert(result.count() === 4) val rows = result.collect() - // Center vertex should have k-core value of 3, leaf vertices should have k-core value of 1 + // All vertices have k-core = 1: despite the center having degree 3, each leaf has + // only one neighbor so no 2-core can form, pulling everything down to 1. rows.foreach { row => - val id = row.getAs[Long]("id") - val kcore = row.getAs[Int]("kcore") - if (id == 0L) { - assert(kcore === 3) - } else { - assert(kcore === 1) - } + assert(row.getAs[Int]("kcore") === 1) } result.unpersist() } test("chain graph") { + // Open chain: 0 - 1 - 2. + // No 2-core exists: endpoints have degree 1, so the whole graph is only a 1-core. + // All vertices get kcore = 1. val v = spark.createDataFrame(Seq((0L, "a"), (1L, "b"), (2L, "c"))).toDF("id", "name") - val e = spark.createDataFrame(Seq((0L, 1L), (1L, 2L), (2L, 0L))).toDF("src", "dst") + val e = spark.createDataFrame(Seq((0L, 1L), (1L, 2L))).toDF("src", "dst") val g = GraphFrame(v, e) val result = g.kCore.run() TestUtils.checkColumnType(result.schema, "kcore", DataTypes.IntegerType) assert(result.count() === 3) val rows = result.collect() - // All vertices should have k-core value of 2 - // because graph is cosidered as undirected rows.foreach { row => - assert(row.getAs[Int]("kcore") === 2) + assert(row.getAs[Int]("kcore") === 1) } result.unpersist() } @@ -188,14 +184,14 @@ class KCoreSuite extends SparkFunSuite with GraphFrameTestSparkContext { val rows = result.collect() // Check that we have a range of k-core values val kcoreValues = rows.map(_.getAs[Int]("kcore")).distinct.sorted - assert(kcoreValues.length > 3, "Should have more than 3 distinct k-core values") + assert(kcoreValues.length > 2, "Should have at least 3 distinct k-core values") // Verify specific expected patterns val kcoreMap = rows.map(row => row.getAs[Long]("id") -> row.getAs[Int]("kcore")).toMap // Vertices in the highly connected cluster should have higher k-core values - assert(kcoreMap(0L) >= 4, "Central vertex should have high k-core") - assert(kcoreMap(1L) >= 3, "Well-connected vertex should have medium-high k-core") + assert(kcoreMap(0L) >= 3, "Central vertex should have high k-core") + assert(kcoreMap(1L) >= 3, "Well-connected vertex should have high k-core") // Leaf nodes should have lower k-core values assert(kcoreMap(18L) <= 2, "Leaf node should have low k-core") @@ -294,4 +290,36 @@ class KCoreSuite extends SparkFunSuite with GraphFrameTestSparkContext { result.unpersist() } + + test("triangle with tail - exact kcore values") { + // This graph has vertices where degree != kcore, which is important to test correctness: + // it would catch a buggy implementation that converges too early (e.g. after one superstep), which + // would return kcore = degree for all vertices and pass simpler tests. + // + // Undirected graph: + // + // Triangle: 1 - 2 - 3 - 1 (kcore = 2, they form the 2-core) + // Tail: 1 - 4 - 5 (kcore = 1, pendant chain excluded from the 2-core) + // + // Degrees: 1→3, 2→2, 3→2, 4→2, 5→1 (degree != kcore for vertices 1 and 4) + val v = spark + .createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e"))) + .toDF("id", "name") + val e = spark + .createDataFrame(Seq((1L, 2L), (2L, 3L), (3L, 1L), (1L, 4L), (4L, 5L))) + .toDF("src", "dst") + val g = GraphFrame(v, e) + val result = g.kCore.run() + TestUtils.checkColumnType(result.schema, "kcore", DataTypes.IntegerType) + assert(result.count() === 5) + val kcoreMap = result.collect().map(r => r.getAs[Long]("id") -> r.getAs[Int]("kcore")).toMap + // Triangle vertices form the 2-core + assert(kcoreMap(1L) === 2) + assert(kcoreMap(2L) === 2) + assert(kcoreMap(3L) === 2) + // Tail vertices are only in the 1-core + assert(kcoreMap(4L) === 1) + assert(kcoreMap(5L) === 1) + result.unpersist() + } } From a223cf53935d925a407903f8bfa29cc3e5cdb49a Mon Sep 17 00:00:00 2001 From: David Girardier Date: Wed, 4 Mar 2026 12:57:43 +0100 Subject: [PATCH 2/2] fix(kcore): adjust formatting in KCore documentation --- core/src/main/scala/org/graphframes/lib/KCore.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/graphframes/lib/KCore.scala b/core/src/main/scala/org/graphframes/lib/KCore.scala index dbd54b12f..f83e5d47a 100644 --- a/core/src/main/scala/org/graphframes/lib/KCore.scala +++ b/core/src/main/scala/org/graphframes/lib/KCore.scala @@ -30,9 +30,9 @@ import org.graphframes.WithLocalCheckpoints * 2017 IEEE International Conference on Big Data (Big Data). IEEE, 2017. * * '''Edge representation''': K-core decomposition is defined for undirected graphs. Since - * GraphFrames represents edges as directed, each undirected edge `{u, v}` should be supplied - * as a single directed edge in either direction — the algorithm symmetrizes internally. - * Supplying both `(u, v)` and `(v, u)` will double-count the edge and produce incorrect results. + * GraphFrames represents edges as directed, each undirected edge `{u, v}` should be supplied as a + * single directed edge in either direction — the algorithm symmetrizes internally. Supplying both + * `(u, v)` and `(v, u)` will double-count the edge and produce incorrect results. */ class KCore private[graphframes] (private val graph: GraphFrame) extends Serializable