From 20ec0b32bd176b587c5b81591384350e6c0e765d Mon Sep 17 00:00:00 2001 From: Russell Jurney Date: Mon, 11 Nov 2024 14:55:42 -0800 Subject: [PATCH 1/3] Java opt MaxPermSize --> MaxMetaspaceSize, includes for Java 17+ sun.nio.ch, java.lang ALL-UNNAMED --- build.sbt | 8 +++++++- build/sbt-launch-lib.bash | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 522c3d34c..afce538d2 100644 --- a/build.sbt +++ b/build.sbt @@ -68,7 +68,13 @@ scalacOptions in (Test, doc) ++= Seq("-groups", "-implicits") fork in Test := true // This and the next line fix a problem with forked run: https://github.com/scalatest/scalatest/issues/770 -javaOptions in Test ++= Seq("-Xmx2048m", "-XX:ReservedCodeCacheSize=384m", "-XX:MaxPermSize=384m") +javaOptions in Test ++= Seq( + "-Xmx2048m", + "-XX:ReservedCodeCacheSize=384m", + "-XX:MaxMetaspaceSize=384m", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/java.lang=ALL-UNNAMED" +) concurrentRestrictions in Global := Seq( Tags.limitAll(1)) diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash index 9b3f0e934..6ab8d7313 100755 --- a/build/sbt-launch-lib.bash +++ b/build/sbt-launch-lib.bash @@ -117,7 +117,7 @@ get_mem_opts () { (( $perm < 4096 )) || perm=4096 local codecache=$(( $perm / 2 )) - echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m" + echo "-Xms${mem}m -Xmx${mem}m -XX:MaxMetaspaceSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m" } require_arg () { From 0442215056fe93b45873cf458e9e896673fd1d56 Mon Sep 17 00:00:00 2001 From: Russell Jurney Date: Mon, 11 Nov 2024 15:58:40 -0800 Subject: [PATCH 2/3] Default spark.version 3.5.0 --> 3.5.3 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index c2c230768..ec2b5c223 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,7 @@ import ReleaseTransformations._ -val sparkVer = sys.props.getOrElse("spark.version", "3.5.0") +val sparkVer = sys.props.getOrElse("spark.version", "3.5.3") val sparkBranch = sparkVer.substring(0, 3) val defaultScalaVer = sparkBranch match { case "3.5" => "2.12.18" From fe9cc34ef838c84360d7b59658b5765acd8f4532 Mon Sep 17 00:00:00 2001 From: Russell Jurney Date: Tue, 12 Nov 2024 08:47:54 -0800 Subject: [PATCH 3/3] Docs update PR - GraphFrames version, future plans re: Apache Spark inclusion (#467) --- VERSION | 0 docs/_config.yml | 2 +- docs/index.md | 21 ++----- docs/user-guide.md | 139 +++++++++++++++++++++++++++++---------------- 4 files changed, 96 insertions(+), 66 deletions(-) create mode 100644 VERSION diff --git a/VERSION b/VERSION new file mode 100644 index 000000000..e69de29bb diff --git a/docs/_config.yml b/docs/_config.yml index 55afbc5f2..4c1ab075c 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -13,7 +13,7 @@ include: # These allow the documentation to be updated with newer releases # of Spark, Scala, and Mesos. -GRAPHFRAMES_VERSION: 0.8.0 +GRAPHFRAMES_VERSION: 0.8.4 #SCALA_BINARY_VERSION: "2.10" #SCALA_VERSION: "2.10.4" #MESOS_VERSION: 0.21.0 diff --git a/docs/index.md b/docs/index.md index bc8da4cb8..9bd2ccb82 100644 --- a/docs/index.md +++ b/docs/index.md @@ -27,23 +27,12 @@ Refer to the [User Guide](user-guide.html) for a full list of queries and algori __Will GraphFrames be part of Apache Spark?__ -The GraphX component of Apache Spark has no DataFrames- or Dataset-based equivalent, so it is -natural to ask this question. The current plan is to keep GraphFrames separate from core Apache -Spark for the time being: +The GraphX component of Apache Spark has no DataFrames - or Dataset-based equivalent, so it is +natural to ask this question. -* we are still considering making small adjustments to the API. The GraphFrames project will be -considered for inclusion into Spark once we are confident that the current API addresses current -and future needs. - -* some important features present in GraphX such as partitioning are missing. We would like to -offer some equivalent operations before considering merging with the Spark project. - -* GraphFrames is used as a testbed for advanced, graph-specific optimizations into Spark’s -Catalyst engine. Having them in a separate project accelerates the development cycle. - -That being said, GraphFrames follows the same code quality standards as Spark, and it is -cross-compiled and published for a large number of Spark versions. It is -easy for users to depend on it. +**The new plan is to merge GraphFrames with Apache Spark to make it a component of core Spark. +[Spark committer Holden Karau](https://spark.apache.org/committers.html#:~:text=Holden%20Karau,Netflix) +is willing to support this effort by reviewing the relevant pull requests.** # Downloading diff --git a/docs/user-guide.md b/docs/user-guide.md index bfe07751f..7e66f74ff 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -6,7 +6,7 @@ description: GraphFrames GRAPHFRAMES_VERSION user guide --- This page gives examples of how to use GraphFrames for basic queries, motif finding, and -general graph algorithms. This includes code examples in Scala and Python. +general graph algorithms. This includes code examples in Scala and Python. * Table of contents (This text will be scraped.) {:toc} @@ -33,7 +33,7 @@ The vertices will be inferred from the sources and destinations of the edges. The following example demonstrates how to create a GraphFrame from vertex and edge DataFrames. -
+
{% highlight scala %} import org.graphframes.GraphFrame // Vertex DataFrame @@ -64,11 +64,12 @@ val g = GraphFrame(v, e) The GraphFrame constructed above is available in the GraphFrames package: {% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends {% endhighlight %}
-
+
{% highlight python %} # Vertex DataFrame v = spark.createDataFrame([ @@ -98,7 +99,8 @@ g = GraphFrame(v, e) The GraphFrame constructed above is available in the GraphFrames package: {% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() + +g = Graphs(spark).friends() # Get example graph {% endhighlight %}
@@ -114,9 +116,10 @@ as `vertices` and `edges` fields in the GraphFrame.
-
+
{% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph // Display the vertex and edge DataFrames @@ -152,6 +155,7 @@ import org.apache.spark.sql.DataFrame // Get a DataFrame with columns "id" and "inDeg" (in-degree) val vertexInDegrees: DataFrame = g.inDegrees +vertexInDegrees.show() // Find the youngest user's age in the graph. // This queries the vertex DataFrame. @@ -163,10 +167,11 @@ val numFollows = g.edges.filter("relationship = 'follow'").count() {% endhighlight %}
-
+
{% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph # Display the vertex and edge DataFrames g.vertices.show() @@ -271,16 +276,18 @@ matching edge, even if those edges share the same vertex `u`.
-
+
For API details, refer to the [API docs](api/scala/index.html#org.graphframes.GraphFrame). {% highlight scala %} +import org.apache.spark.sql.DataFrame import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph // Search for pairs of vertices with edges in both directions between them. -val motifs: GraphFrame = g.find("(a)-[e]->(b); (b)-[e2]->(a)") +val motifs: DataFrame = g.find("(a)-[e]->(b); (b)-[e2]->(a)") motifs.show() // More complex queries can be expressed by applying filters. @@ -288,13 +295,14 @@ motifs.filter("b.age > 30").show() {% endhighlight %}
-
+
For API details, refer to the [API docs](api/python/graphframes.html#graphframes.GraphFrame.find). {% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph # Search for pairs of vertices with edges in both directions between them. motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)") @@ -329,16 +337,17 @@ In this example, the state is the current count of "friend" edges; in general, i
-
+
{% highlight scala %} -import org.apache.spark.sql.Column +import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.functions.{col, when} import org.graphframes.{examples,GraphFrame} val g: GraphFrame = examples.Graphs.friends // get example graph // Find chains of 4 vertices. -val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)") +val chain4: DataFrame = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)") +chain4.show() // Query on sequence, with state (cnt) // (a) Define method for updating state given the next element of the motif. @@ -360,7 +369,8 @@ chainWith2Friends2.show() from pyspark.sql.functions import col, lit, when from pyspark.sql.types import IntegerType from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)") @@ -399,9 +409,10 @@ The following example shows how to select a subgraph based upon vertex and edge
-
+
{% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // Select subgraph of users older than 30, and relationships of type "friend". @@ -411,10 +422,11 @@ val g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'").dro {% endhighlight %}
-
+
{% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph # Select subgraph of users older than 30, and relationships of type "friend". # Drop isolated vertices (users) which are not contained in any edges (relationships). @@ -435,6 +447,7 @@ triplets by using more complex motifs.
{% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph // Select subgraph based on edges "e" of type "follow" @@ -443,19 +456,18 @@ val paths = { g.find("(a)-[e]->(b)") .filter("e.relationship = 'follow'") .filter("a.age < b.age") } // "paths" contains vertex info. Extract the edges. -val e2 = paths.select("e.src", "e.dst", "e.relationship") -// In Spark 1.5+, the user may simplify this call: -// val e2 = paths.select("e.*") +val e2 = paths.select("e.*") // Construct the subgraph val g2 = GraphFrame(g.vertices, e2) {% endhighlight %}
-
+
{% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph # Select subgraph based on edges "e" of type "follow" # pointing from a younger user "a" to an older user "b". @@ -503,6 +515,7 @@ For API details, refer to the [API docs](api/scala/index.html#org.graphframes.li {% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph // Search from "Esther" for users of age < 32. @@ -510,19 +523,21 @@ val paths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run() paths.show() // Specify edge filters or max path lengths. -{ g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32") +val paths = { g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32") .edgeFilter("relationship != 'friend'") .maxPathLength(3).run() } +paths.show() {% endhighlight %}
-
+
For API details, refer to the [API docs](api/python/graphframes.html#graphframes.GraphFrame.bfs). {% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph # Search from "Esther" for users of age < 32. paths = g.bfs("name = 'Esther'", "age < 32") @@ -544,7 +559,7 @@ assigned a component ID. See [Wikipedia](https://en.wikipedia.org/wiki/Connected_component_(graph_theory)) for background. NOTE: With GraphFrames 0.3.0 and later releases, the default Connected Components algorithm -requires setting a Spark checkpoint directory. Users can revert to the old algorithm using +requires setting a Spark checkpoint directory. Users can revert to the old algorithm using `connectedComponents.setAlgorithm("graphx")`.
@@ -555,6 +570,9 @@ For API details, refer to the [API docs](api/scala/index.html#org.graphframes.li {% highlight scala %} import org.graphframes.{examples,GraphFrame} + +sc.setCheckpointDir("/tmp/spark-checkpoints") + val g: GraphFrame = examples.Graphs.friends // get example graph val result = g.connectedComponents.run() @@ -562,13 +580,16 @@ result.select("id", "component").orderBy("component").show() {% endhighlight %}
-
+
For API details, refer to the [API docs](api/python/graphframes.html#graphframes.GraphFrame.connectedComponents). {% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +sc.setCheckpointDir("/tmp/spark-checkpoints") + +g = Graphs(spark).friends() # Get example graph result = g.connectedComponents() result.select("id", "component").orderBy("component").show() @@ -586,12 +607,13 @@ See [Wikipedia](https://en.wikipedia.org/wiki/Strongly_connected_component) for
-
+
For API details, refer to the [API docs](api/scala/index.html#org.graphframes.lib.StronglyConnectedComponents). {% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph val result = g.stronglyConnectedComponents.maxIter(10).run() @@ -599,13 +621,16 @@ result.select("id", "component").orderBy("component").show() {% endhighlight %}
-
+
For API details, refer to the [API docs](api/python/graphframes.html#graphframes.GraphFrame.stronglyConnectedComponents). {% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +sc.setCheckpointDir("/tmp/spark-checkpoints") + +g = Graphs(spark).friends() # Get example graph result = g.stronglyConnectedComponents(maxIter=10) result.select("id", "component").orderBy("component").show() @@ -630,12 +655,13 @@ See [Wikipedia](https://en.wikipedia.org/wiki/Label_Propagation_Algorithm) for b
-
+
For API details, refer to the [API docs](api/scala/index.html#org.graphframes.lib.LabelPropagation). {% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph val result = g.labelPropagation.maxIter(5).run() @@ -643,13 +669,14 @@ result.select("id", "label").show() {% endhighlight %}
-
+
For API details, refer to the [API docs](api/python/graphframes.html#graphframes.GraphFrame.labelPropagation). {% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph result = g.labelPropagation(maxIter=5) result.select("id", "label").show() @@ -674,16 +701,18 @@ See [Wikipedia](https://en.wikipedia.org/wiki/PageRank) for background.
-
+
For API details, refer to the [API docs](api/scala/index.html#org.graphframes.lib.PageRank). {% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph // Run PageRank until convergence to tolerance "tol". -val results = g.pageRank.resetProbability(0.15).tol(0.01).run() +val results: GraphFrame = g.pageRank.resetProbability(0.15).tol(0.01).run() + // Display resulting pageranks and final edge weights // Note that the displayed pagerank may be truncated, e.g., missing the E notation. // In Spark 1.5+, you can use show(truncate=false) to avoid truncation. @@ -697,17 +726,20 @@ val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run() val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run() // Run PageRank personalized for vertex ["a", "b", "c", "d"] in parallel -val results3 = g.parallelPersonalizedPageRank.resetProbability(0.15).maxIter(10).sourceIds(Array("a", "b", "c", "d")).run() +val results4 = g.parallelPersonalizedPageRank.resetProbability(0.15).maxIter(10).sourceIds(Array("a", "b", "c", "d")).run() +results4.vertices.show() +results4.edges.show() {% endhighlight %}
-
+
For API details, refer to the [API docs](api/python/graphframes.html#graphframes.GraphFrame.pageRank). {% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph # Run PageRank until convergence to tolerance "tol". results = g.pageRank(resetProbability=0.15, tol=0.01) @@ -740,12 +772,13 @@ See [Wikipedia](https://en.wikipedia.org/wiki/Shortest_path_problem) for backgro
-
+
For API details, refer to the [API docs](api/scala/index.html#org.graphframes.lib.ShortestPaths). {% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph val results = g.shortestPaths.landmarks(Seq("a", "d")).run() @@ -753,13 +786,14 @@ results.select("id", "distances").show() {% endhighlight %}
-
+
For API details, refer to the [API docs](api/python/graphframes.html#graphframes.GraphFrame.shortestPaths). {% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph results = g.shortestPaths(landmarks=["a", "d"]) results.select("id", "distances").show() @@ -780,6 +814,7 @@ For API details, refer to the [API docs](api/scala/index.html#org.graphframes.li {% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph val results = g.triangleCount.run() @@ -793,7 +828,8 @@ For API details, refer to the [API docs](api/python/graphframes.html#graphframes {% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph results = g.triangleCount() results.select("id", "count").show() @@ -816,6 +852,7 @@ The below example shows how to save and then load a graph.
{% highlight scala %} import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph // Save vertices and edges as Parquet to some location. @@ -834,7 +871,8 @@ val sameG = GraphFrame(sameV, sameE)
{% highlight python %} from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph + +g = Graphs(spark).friends() # Get example graph # Save vertices and edges as Parquet to some location. g.vertices.write.parquet("hdfs://myLocation/vertices") @@ -867,13 +905,14 @@ of adjacent users.
-
+
For API details, refer to the [API docs](api/scala/index.html#org.graphframes.lib.AggregateMessages). {% highlight scala %} import org.graphframes.{examples,GraphFrame} import org.graphframes.lib.AggregateMessages + val g: GraphFrame = examples.Graphs.friends // get example graph // We will use AggregateMessages utilities later, so name it "AM" for short. @@ -894,16 +933,17 @@ For a more complex example, look at the code used to implement the
-
+
For API details, refer to the [API docs](api/python/graphframes.html#graphframes.GraphFrame.aggregateMessages). {% highlight python %} -from pyspark.sql.functions import sum as sqlsum from graphframes.lib import AggregateMessages as AM from graphframes.examples import Graphs -g = Graphs.friends() # Get example graph +from pyspark.sql.functions import sum as sqlsum + +g = Graphs(spark).friends() # Get example graph # For each user, sum the ages of the adjacent users. msgToSrc = AM.dst["age"] @@ -984,6 +1024,7 @@ For API details, refer to the API docs for: import org.apache.spark.graphx.Graph import org.apache.spark.sql.Row import org.graphframes.{examples,GraphFrame} + val g: GraphFrame = examples.Graphs.friends // get example graph // Convert to GraphX