From 870997fe9c322902e420525c7bb21afc82edabf6 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Sat, 20 Sep 2025 07:22:56 +0200 Subject: [PATCH 1/5] =?UTF-8?q?Add=20Rocha=E2=80=93Thatte=20cycle=20detect?= =?UTF-8?q?ion=20algorithm?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scala/org/graphframes/GraphFrame.scala | 19 ++++ .../org/graphframes/lib/DetectingCycles.scala | 89 +++++++++++++++++++ .../lib/DetectingCyclesSuite.scala | 68 ++++++++++++++ docs/src/04-user-guide/05-traversals.md | 71 ++++++++++++--- 4 files changed, 235 insertions(+), 12 deletions(-) create mode 100644 core/src/main/scala/org/graphframes/lib/DetectingCycles.scala create mode 100644 core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala diff --git a/core/src/main/scala/org/graphframes/GraphFrame.scala b/core/src/main/scala/org/graphframes/GraphFrame.scala index f39c9343d..96446839f 100644 --- a/core/src/main/scala/org/graphframes/GraphFrame.scala +++ b/core/src/main/scala/org/graphframes/GraphFrame.scala @@ -550,6 +550,25 @@ class GraphFrame private ( } } + /** + * Find all cycles in the graph. An implementation of the Rocha–Thatte cycle detection + * algorithm. + * + * Rocha, Rodrigo Caetano, and Bhalchandra D. Thatte. "Distributed cycle detection in + * large-scale sparse graphs." Proceedings of Simpósio Brasileiro de Pesquisa Operacional + * (SBPO’15) (2015): 1-11. + * + * Returns a DataFrame with ID and cycles, ID are not unique if there are multiple cycles + * starting from this ID. For the case of cycle 1 -> 2 -> 3 -> 1 all the vertices will have the + * same cycle! E.g.: 1 -> [1, 2, 3, 1] 2 -> [2, 3, 1, 2] 3 -> [3, 1, 2, 3] + * + * Deduplication of cycles should be done by the user! + * + * @return + * an instance of DetectingCycles initialized with the current context + */ + def detectingCycles: DetectingCycles = new DetectingCycles(this) + // ========= Motif finding (private) ========= /** diff --git a/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala b/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala new file mode 100644 index 000000000..2c1e2adfe --- /dev/null +++ b/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala @@ -0,0 +1,89 @@ +package org.graphframes.lib + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.ArrayType +import org.graphframes.GraphFrame +import org.graphframes.Logging +import org.graphframes.WithCheckpointInterval +import org.graphframes.WithIntermediateStorageLevel +import org.graphframes.WithLocalCheckpoints + +class DetectingCycles private[graphframes] (private val graph: GraphFrame) + extends Arguments + with Serializable + with Logging + with WithIntermediateStorageLevel + with WithLocalCheckpoints + with WithCheckpointInterval { + import DetectingCycles._ + def run(): DataFrame = { + val rawRes = DetectingCycles.run(graph, useLocalCheckpoints, checkpointInterval) + val distinctRes = rawRes + .select( + col(GraphFrame.ID), + filter(col(foundSeqCol), x => size(x) > lit(0)).alias(foundSeqCol)) + .filter(size(col(foundSeqCol)) > lit(0)) + .select( + col(GraphFrame.ID), + // from vid -> [[cycle1, cycle2, ...]] + // to vid -> [cycle1], vid -> [cycle2], ... + explode(col(foundSeqCol)).alias(foundSeqCol)) + .distinct() + .persist(intermediateStorageLevel) + distinctRes.count() + resultIsPersistent() + rawRes.unpersist() + distinctRes + } +} + +object DetectingCycles { + private val storedSeqCol: String = "sequences" + val foundSeqCol: String = "found_cycles" + + def run(graph: GraphFrame, useLocalCheckpoints: Boolean, checkpointInterval: Int): DataFrame = { + val preparedGraph = GraphFrame( + graph.vertices.select(GraphFrame.ID), + graph.edges.select(GraphFrame.SRC, GraphFrame.DST)) + + val vertexDT = preparedGraph.vertices.schema(GraphFrame.ID).dataType + + // Each vertex stores sequences from the previous iteration, initial is just Array(Array(ID)) + val initSequences = array(array(col(GraphFrame.ID))) + // Each vertex stores all the found cycles + val foundSequences = array().cast(ArrayType(ArrayType(vertexDT))) + // Message is simply stored sequences + val sentMessages = when(size(Pregel.src(storedSeqCol)) =!= lit(0), Pregel.src(storedSeqCol)) + .otherwise(lit(null).cast(ArrayType(ArrayType(vertexDT)))) + // If the sequence contains the current vertex ID somewhere in the middle, it is + // a previously detected cycle and a sequence should be discarded. + val filterOutSequences = flatten(collect_list(Pregel.msg)) + when(Pregel.msg.isNull, array(array()).cast(ArrayType(ArrayType(vertexDT)))) + .otherwise(filter(Pregel.msg, x => !(array_position(x, col(GraphFrame.ID)) > lit(1)))) + // update found sequences by appending all from messages that start from the current vertex ID + val updateFound = when(Pregel.msg.isNull, col(foundSeqCol)).otherwise( + array_union( + col(foundSeqCol), + transform( + filter(Pregel.msg, x => try_element_at(x, lit(1)) === col(GraphFrame.ID)), + x => array_append(x, col(GraphFrame.ID))))) + // update stored sequences by filtering out already added sequences + val updateSequences = transform( + filter(Pregel.msg, x => !array_contains(x, col(GraphFrame.ID))), + x => array_append(x, col(GraphFrame.ID))) + + preparedGraph.pregel + .setCheckpointInterval(checkpointInterval) + .setUseLocalCheckpoints(useLocalCheckpoints) + .setEarlyStopping(false) + .setSkipMessagesFromNonActiveVertices(true) + .setInitialActiveVertexExpression(lit(true)) + .sendMsgToDst(sentMessages) + .setUpdateActiveVertexExpression(Pregel.msg.isNotNull && (size(updateSequences) > lit(0))) + .withVertexColumn(storedSeqCol, initSequences, updateSequences) + .withVertexColumn(foundSeqCol, foundSequences, updateFound) + .aggMsgs(filterOutSequences) + .run() + } +} diff --git a/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala b/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala new file mode 100644 index 000000000..42faa94d6 --- /dev/null +++ b/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala @@ -0,0 +1,68 @@ +package org.graphframes.lib + +import org.graphframes.GraphFrame +import org.graphframes.GraphFrameTestSparkContext +import org.graphframes.SparkFunSuite + +import scala.collection.mutable + +class DetectingCyclesSuite extends SparkFunSuite with GraphFrameTestSparkContext { + test("test detecting cycles") { + val graph = GraphFrame( + spark + .createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e"))) + .toDF("id", "attr"), + spark + .createDataFrame(Seq((1L, 2L), (2L, 3L), (3L, 1L), (1L, 4L), (2L, 5L))) + .toDF("src", "dst")) + val res = graph.detectingCycles.setUseLocalCheckpoints(true).run() + assert(res.count() == 3) + val collected = + res + .sort(GraphFrame.ID) + .select(DetectingCycles.foundSeqCol) + .collect() + .map(r => r.getAs[mutable.ArraySeq[Long]](0)) + + assert(collected(0) == Seq(1, 2, 3, 1)) + assert(collected(1) == Seq(2, 3, 1, 2)) + assert(collected(2) == Seq(3, 1, 2, 3)) + } + + test("test no cycles") { + val graph = GraphFrame( + spark + .createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e"))) + .toDF("id", "attr"), + spark + .createDataFrame(Seq((1L, 2L), (2L, 3L), (3L, 4L), (4L, 5L))) + .toDF("src", "dst")) + val res = graph.detectingCycles.setUseLocalCheckpoints(true).run() + assert(res.count() == 0) + } + + test("test multiple cycles from one source") { + val graph = GraphFrame( + spark + .createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e"))) + .toDF("id", "attr"), + spark + .createDataFrame(Seq((1L, 2L), (2L, 1L), (1L, 3L), (3L, 1L), (2L, 5L), (5L, 1L))) + .toDF("src", "dst")) + val res = graph.detectingCycles.setUseLocalCheckpoints(true).run() + assert(res.count() == 7) + val collected = + res + .sort(GraphFrame.ID, DetectingCycles.foundSeqCol) + .select(DetectingCycles.foundSeqCol) + .collect() + .map(r => r.getAs[mutable.ArraySeq[Long]](0)) + assert(collected(0) == Seq(1, 2, 1)) + assert(collected(1) == Seq(1, 2, 5, 1)) + assert(collected(2) == Seq(1, 3, 1)) + assert(collected(3) == Seq(2, 1, 2)) + assert(collected(4) == Seq(2, 5, 1, 2)) + assert(collected(5) == Seq(3, 1, 3)) + assert(collected(6) == Seq(5, 1, 2, 5)) + } +} diff --git a/docs/src/04-user-guide/05-traversals.md b/docs/src/04-user-guide/05-traversals.md index 52b6313af..79fd19f50 100644 --- a/docs/src/04-user-guide/05-traversals.md +++ b/docs/src/04-user-guide/05-traversals.md @@ -2,7 +2,8 @@ ## Shortest paths -Computes shortest paths from each vertex to the given set of landmark vertices, where landmarks are specified by the vertex ID. Note that this takes an edge direction into account. +Computes shortest paths from each vertex to the given set of landmark vertices, where landmarks are specified by the +vertex ID. Note that this takes an edge direction into account. See [Wikipedia](https://en.wikipedia.org/wiki/Shortest_path_problem) for a background. @@ -34,7 +35,8 @@ results.select("id", "distances").show() ## Breadth-first search (BFS) -Breadth-first search (BFS) finds the shortest path(s) from one vertex (or a set of vertices) to another vertex (or a set of vertices). The beginning and end vertices are specified as Spark DataFrame expressions. +Breadth-first search (BFS) finds the shortest path(s) from one vertex (or a set of vertices) to another vertex (or a set +of vertices). The beginning and end vertices are specified as Spark DataFrame expressions. See [Wikipedia on BFS](https://en.wikipedia.org/wiki/Breadth-first_search) for more background. @@ -54,8 +56,8 @@ paths.show() # Specify edge filters or max path lengths -g.bfs("name = 'Esther'", "age < 32",\ -edgeFilter="relationship != 'friend'", maxPathLength=3) +g.bfs("name = 'Esther'", "age < 32", + edgeFilter="relationship != 'friend'", maxPathLength=3) ``` ### Scala API @@ -63,7 +65,7 @@ edgeFilter="relationship != 'friend'", maxPathLength=3) For API details, refer to @:scaladoc(org.graphframes.lib.BFS). ```scala -import org.graphframes.{examples,GraphFrame} +import org.graphframes.{examples, GraphFrame} val g: GraphFrame = examples.Graphs.friends // get example graph @@ -72,9 +74,11 @@ val paths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run() paths.show() // Specify edge filters or max path lengths. -val paths = { g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32") -.edgeFilter("relationship != 'friend'") -.maxPathLength(3).run() } +val paths = { + g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32") + .edgeFilter("relationship != 'friend'") + .maxPathLength(3).run() +} paths.show() ``` @@ -84,7 +88,12 @@ Computes the connected component membership of each vertex and returns a graph w See [Wikipedia](https://en.wikipedia.org/wiki/Connected_component_(graph_theory)) for the background. -**NOTE:** With GraphFrames 0.3.0 and later releases, the default Connected Components algorithm requires setting a Spark checkpoint directory. Users can revert to the old algorithm using `connectedComponents.setAlgorithm("graphx")`. Starting from GraphFrames 0.9.3 release, users can also use `localCheckpoints` that does not require setting a Spark checkpoint directory. To use `localCheckpoints` users can set the config `spark.graphframes.useLocalCheckpoints` to `true` or use the API `connectedComponents.setUseLocalCheckpoints(true)`. While `localCheckpoints` provides better performance they are not as reliable as the persistent checkpointing. +**NOTE:** With GraphFrames 0.3.0 and later releases, the default Connected Components algorithm requires setting a Spark +checkpoint directory. Users can revert to the old algorithm using `connectedComponents.setAlgorithm("graphx")`. Starting +from GraphFrames 0.9.3 release, users can also use `localCheckpoints` that does not require setting a Spark checkpoint +directory. To use `localCheckpoints` users can set the config `spark.graphframes.useLocalCheckpoints` to `true` or use +the API `connectedComponents.setUseLocalCheckpoints(true)`. While `localCheckpoints` provides better performance they +are not as reliable as the persistent checkpointing. ### Python API @@ -106,7 +115,7 @@ result.select("id", "component").orderBy("component").show() For API details, refer to the @:scaladoc(org.graphframes.lib.ConnectedComponents). ```scala -import org.graphframes.{examples,GraphFrame} +import org.graphframes.{examples, GraphFrame} val g: GraphFrame = examples.Graphs.friends // get example graph @@ -116,7 +125,8 @@ result.select("id", "component").orderBy("component").show() ### Strongly connected components -Compute the strongly connected component (SCC) of each vertex and return a graph with each vertex assigned to the SCC containing that vertex. At the moment, SCC in GraphFrames is a wrapper around GraphX implementation. +Compute the strongly connected component (SCC) of each vertex and return a graph with each vertex assigned to the SCC +containing that vertex. At the moment, SCC in GraphFrames is a wrapper around GraphX implementation. See [Wikipedia](https://en.wikipedia.org/wiki/Strongly_connected_component) for the background. @@ -140,7 +150,7 @@ result.select("id", "component").orderBy("component").show() For API details, refer to the @:scaladoc(org.graphframes.lib.StronglyConnectedComponents). ```scala -import org.graphframes.{examples,GraphFrame} +import org.graphframes.{examples, GraphFrame} val g: GraphFrame = examples.Graphs.friends // get example graph @@ -177,3 +187,40 @@ val g: GraphFrame = examples.Graphs.friends // get example graph val results = g.triangleCount.run() results.select("id", "count").show() ``` + +## Cycles Detection + +GraphFrames provides an implementation of +the [Rocha–Thatte cycle detection algorithm](https://en.wikipedia.org/wiki/Rocha%E2%80%93Thatte_cycle_detection_algorithm). + +### Scala API + +```scala +import org.graphframes.GraphFrame + +val graph = GraphFrame( + spark + .createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e"))) + .toDF("id", "attr"), + spark + .createDataFrame(Seq((1L, 2L), (2L, 1L), (1L, 3L), (3L, 1L), (2L, 5L), (5L, 1L))) + .toDF("src", "dst")) +val res = graph.detectingCycles.setUseLocalCheckpoints(true).run() +res.show(false) + +// Output: +// +----+--------------+ +// | id | found_cycles | +// +----+--------------+ +// |1 |[1, 3, 1] | +// |1 |[1, 2, 1] | +// |1 |[1, 2, 5, 1] | +// |2 |[2, 1, 2] | +// |2 |[2, 5, 1, 2] | +// |3 |[3, 1, 3] | +// |5 |[5, 1, 2, 5] | +// +----+--------------+ +``` + +**WARNING:** This algorithm returns all the cycles, and users should handle deduplication of [1, 2, 1] and [2, 1, 2] ( +that is the same cycle) \ No newline at end of file From ae227c16d8f635e36874d25df0db6917b013cf17 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Sat, 20 Sep 2025 07:50:57 +0200 Subject: [PATCH 2/5] fix 2.12 failing tests --- build.sbt | 2 ++ .../scala/org/graphframes/lib/DetectingCyclesSuite.scala | 9 +++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index eaba2653c..d76e1616d 100644 --- a/build.sbt +++ b/build.sbt @@ -121,6 +121,8 @@ lazy val commonSetting = Seq( ScalacOptions.warnUnusedPrivates, ScalacOptions.warnNumericWiden, ScalacOptions.privateWarnNumericWiden, + ScalacOptions.warnUnusedNoWarn, + ScalacOptions.privateWarnUnusedNoWarn, )) lazy val graphx = (project in file("graphx")) diff --git a/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala b/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala index 42faa94d6..19bb0071f 100644 --- a/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala +++ b/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala @@ -4,6 +4,7 @@ import org.graphframes.GraphFrame import org.graphframes.GraphFrameTestSparkContext import org.graphframes.SparkFunSuite +import scala.annotation.nowarn import scala.collection.mutable class DetectingCyclesSuite extends SparkFunSuite with GraphFrameTestSparkContext { @@ -17,12 +18,12 @@ class DetectingCyclesSuite extends SparkFunSuite with GraphFrameTestSparkContext .toDF("src", "dst")) val res = graph.detectingCycles.setUseLocalCheckpoints(true).run() assert(res.count() == 3) - val collected = + @nowarn val collected = res .sort(GraphFrame.ID) .select(DetectingCycles.foundSeqCol) .collect() - .map(r => r.getAs[mutable.ArraySeq[Long]](0)) + .map(r => r.getAs[mutable.WrappedArray[Long]](0)) assert(collected(0) == Seq(1, 2, 3, 1)) assert(collected(1) == Seq(2, 3, 1, 2)) @@ -51,12 +52,12 @@ class DetectingCyclesSuite extends SparkFunSuite with GraphFrameTestSparkContext .toDF("src", "dst")) val res = graph.detectingCycles.setUseLocalCheckpoints(true).run() assert(res.count() == 7) - val collected = + @nowarn val collected = res .sort(GraphFrame.ID, DetectingCycles.foundSeqCol) .select(DetectingCycles.foundSeqCol) .collect() - .map(r => r.getAs[mutable.ArraySeq[Long]](0)) + .map(r => r.getAs[mutable.WrappedArray[Long]](0)) assert(collected(0) == Seq(1, 2, 1)) assert(collected(1) == Seq(1, 2, 5, 1)) assert(collected(2) == Seq(1, 3, 1)) From 10fb9ae4285bea8b50c81a4475204e884d09d0bf Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Mon, 22 Sep 2025 08:13:39 +0200 Subject: [PATCH 3/5] merge main + fix docs generation --- .../org/graphframes/lib/AggregateMessages.scala | 4 ++-- .../org/graphframes/lib/DetectingCycles.scala | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/graphframes/lib/AggregateMessages.scala b/core/src/main/scala/org/graphframes/lib/AggregateMessages.scala index 358797716..d595a2515 100644 --- a/core/src/main/scala/org/graphframes/lib/AggregateMessages.scala +++ b/core/src/main/scala/org/graphframes/lib/AggregateMessages.scala @@ -47,8 +47,8 @@ import org.graphframes.WithIntermediateStorageLevel * - [[AggregateMessages.msg]]: message sent to vertex (for aggregation function) * * Note: If you use this operation to write an iterative algorithm, you may want to use - * [[AggregateMessages$.getCachedDataFrame getCachedDataFrame()]] as a workaround for caching - * issues. + * `org.apache.spark.sql.Dataset.checkpoint` or `org.apache.spark.sql.Dataset.localCheckpoint`` as + * a workaround for caching issues. * * @example * We can use this function to compute the in-degree of each vertex diff --git a/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala b/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala index 2c1e2adfe..493a59f27 100644 --- a/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala +++ b/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala @@ -3,6 +3,7 @@ package org.graphframes.lib import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.ArrayType +import org.apache.spark.storage.StorageLevel import org.graphframes.GraphFrame import org.graphframes.Logging import org.graphframes.WithCheckpointInterval @@ -18,7 +19,11 @@ class DetectingCycles private[graphframes] (private val graph: GraphFrame) with WithCheckpointInterval { import DetectingCycles._ def run(): DataFrame = { - val rawRes = DetectingCycles.run(graph, useLocalCheckpoints, checkpointInterval) + val rawRes = DetectingCycles.run( + graph, + useLocalCheckpoints, + checkpointInterval, + intermediateStorageLevel) val distinctRes = rawRes .select( col(GraphFrame.ID), @@ -42,7 +47,11 @@ object DetectingCycles { private val storedSeqCol: String = "sequences" val foundSeqCol: String = "found_cycles" - def run(graph: GraphFrame, useLocalCheckpoints: Boolean, checkpointInterval: Int): DataFrame = { + def run( + graph: GraphFrame, + useLocalCheckpoints: Boolean, + checkpointInterval: Int, + intermediateStorageLevel: StorageLevel): DataFrame = { val preparedGraph = GraphFrame( graph.vertices.select(GraphFrame.ID), graph.edges.select(GraphFrame.SRC, GraphFrame.DST)) @@ -76,6 +85,7 @@ object DetectingCycles { preparedGraph.pregel .setCheckpointInterval(checkpointInterval) .setUseLocalCheckpoints(useLocalCheckpoints) + .setIntermediateStorageLevel(intermediateStorageLevel) .setEarlyStopping(false) .setSkipMessagesFromNonActiveVertices(true) .setInitialActiveVertexExpression(lit(true)) From 8cdac0d898eebee95eb5c2f3639ca14cb9a0d668 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Mon, 22 Sep 2025 08:40:56 +0200 Subject: [PATCH 4/5] fix bad merge + remove unnecessary distinct --- .../scala/org/graphframes/GraphFrame.scala | 23 ++++--------------- .../org/graphframes/lib/DetectingCycles.scala | 7 +++--- 2 files changed, 7 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/graphframes/GraphFrame.scala b/core/src/main/scala/org/graphframes/GraphFrame.scala index b98338541..24e35ed79 100644 --- a/core/src/main/scala/org/graphframes/GraphFrame.scala +++ b/core/src/main/scala/org/graphframes/GraphFrame.scala @@ -713,7 +713,6 @@ class GraphFrame private ( val withLongIds = vertices .select(ID) .repartition(col(ID)) - .distinct() .sortWithinPartitions(ID) .withColumn(LONG_ID, monotonically_increasing_id()) .persist(StorageLevel.MEMORY_AND_DISK) @@ -742,25 +741,11 @@ class GraphFrame private ( col(DST).cast("long").as(LONG_DST), col(ATTR)) } else { - val threshold = broadcastThreshold - val hubs: Set[Any] = degrees - .filter(col("degree") >= threshold) - .select(ID) - .collect() - .map(_.get(0)) - .toSet - val indexedSourceEdges = GraphFrame.skewedJoin( - packedEdges, - indexedVertices.select(col(ID).as(SRC), col(LONG_ID).as(LONG_SRC)), - SRC, - hubs, - "GraphFrame.indexedEdges:") - val indexedEdges = GraphFrame.skewedJoin( - indexedSourceEdges, + val indexedSourceEdges = + packedEdges.join(indexedVertices.select(col(ID).as(SRC), col(LONG_ID).as(LONG_SRC)), SRC) + val indexedEdges = indexedSourceEdges.join( indexedVertices.select(col(ID).as(DST), col(LONG_ID).as(LONG_DST)), - DST, - hubs, - "GraphFrame.indexedEdges:") + DST) indexedEdges.select(SRC, LONG_SRC, DST, LONG_DST, ATTR) } } diff --git a/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala b/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala index 493a59f27..6a3c407dc 100644 --- a/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala +++ b/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala @@ -24,7 +24,7 @@ class DetectingCycles private[graphframes] (private val graph: GraphFrame) useLocalCheckpoints, checkpointInterval, intermediateStorageLevel) - val distinctRes = rawRes + val explodedRes = rawRes .select( col(GraphFrame.ID), filter(col(foundSeqCol), x => size(x) > lit(0)).alias(foundSeqCol)) @@ -34,12 +34,11 @@ class DetectingCycles private[graphframes] (private val graph: GraphFrame) // from vid -> [[cycle1, cycle2, ...]] // to vid -> [cycle1], vid -> [cycle2], ... explode(col(foundSeqCol)).alias(foundSeqCol)) - .distinct() .persist(intermediateStorageLevel) - distinctRes.count() + explodedRes.count() resultIsPersistent() rawRes.unpersist() - distinctRes + explodedRes } } From ad3b2aed28d6eaf6f89886b4bab2c28692f115e3 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Mon, 22 Sep 2025 08:45:58 +0200 Subject: [PATCH 5/5] fix bad merge --- .../main/scala/org/graphframes/GraphFrame.scala | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/core/src/main/scala/org/graphframes/GraphFrame.scala b/core/src/main/scala/org/graphframes/GraphFrame.scala index 24e35ed79..6f3983bc1 100644 --- a/core/src/main/scala/org/graphframes/GraphFrame.scala +++ b/core/src/main/scala/org/graphframes/GraphFrame.scala @@ -1212,21 +1212,4 @@ object GraphFrame extends Serializable with Logging { } } } - - /** - * Controls broadcast threshold in skewed joins. Use normal joins for vertices with degrees less - * than the threshold, and broadcast joins otherwise. The default value is 1000000. If we have - * less than 100 billion edges, this would collect at most 2e11 / 1000000 = 200000 hubs, which - * could be handled by the driver. - */ - private[this] var _broadcastThreshold: Int = 1000000 - - private[graphframes] def broadcastThreshold: Int = _broadcastThreshold - - // for unit testing only - private[graphframes] def setBroadcastThreshold(value: Int): this.type = { - require(value >= 0) - _broadcastThreshold = value - this - } }