diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index f1709a440..dacdeb2ef 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -55,11 +55,11 @@ jobs: - name: Run benchmarks run: | - python ./dev/run_doc_benchmarks.py --spark-version 4.0.0 + python ./dev/run_doc_benchmarks.py --spark-version 4.1.2 - name: Build with Laika run: | - ./build/sbt -Dspark.version=4.0.0 -Ddocs.mode=production "docs/laikaHTML" + ./build/sbt -Dspark.version=4.1.2 -Ddocs.mode=production "docs/laikaHTML" - name: Upload artifact uses: actions/upload-pages-artifact@v4 diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index 1ebcfc467..e84e3dc1f 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -8,15 +8,15 @@ jobs: fail-fast: false matrix: include: - - spark-version: 3.5.7 + - spark-version: 3.5.8 scala-version: 2.12 python-version: "3.10" java-version: 11 - - spark-version: 4.0.1 + - spark-version: 4.0.2 scala-version: 2.13 python-version: "3.12" java-version: 17 - - spark-version: 4.1.0 + - spark-version: 4.1.2 scala-version: 2.13 python-version: "3.13" java-version: 17 diff --git a/.github/workflows/scala-ci.yml b/.github/workflows/scala-ci.yml index a5c0a00fe..fdf54c6f7 100644 --- a/.github/workflows/scala-ci.yml +++ b/.github/workflows/scala-ci.yml @@ -8,15 +8,15 @@ jobs: fail-fast: false matrix: include: - - spark-version: 3.5.7 + - spark-version: 3.5.8 java-version: 8 - - spark-version: 3.5.7 + - spark-version: 3.5.8 java-version: 11 - - spark-version: 3.5.7 + - spark-version: 3.5.8 java-version: 17 - - spark-version: 4.0.1 + - spark-version: 4.0.2 java-version: 17 - - spark-version: 4.1.0 + - spark-version: 4.1.2 java-version: 17 runs-on: ubuntu-latest diff --git a/.github/workflows/scala-publish.yml b/.github/workflows/scala-publish.yml index dbd04ab26..ee48cc0cc 100644 --- a/.github/workflows/scala-publish.yml +++ b/.github/workflows/scala-publish.yml @@ -25,7 +25,7 @@ jobs: include: - spark-version: 3.5.8 java-version: 8 - - spark-version: 4.1.0 + - spark-version: 4.1.2 java-version: 17 runs-on: ubuntu-latest steps: diff --git a/build.sbt b/build.sbt index c788f2c6c..89b8ce90c 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,7 @@ import laika.config.SyntaxHighlighting import laika.format.Markdown.GitHubFlavor import org.typelevel.scalacoptions.ScalacOptions -lazy val sparkVer = sys.props.getOrElse("spark.version", "3.5.7") +lazy val sparkVer = sys.props.getOrElse("spark.version", "3.5.8") lazy val sparkMajorVer = sparkVer.substring(0, 1) lazy val sparkBranch = sparkVer.substring(0, 3) lazy val scalaVersions = sparkMajorVer match { diff --git a/core/src/main/scala/org/graphframes/GraphFrame.scala b/core/src/main/scala/org/graphframes/GraphFrame.scala index aa8eed9ef..029ec0948 100644 --- a/core/src/main/scala/org/graphframes/GraphFrame.scala +++ b/core/src/main/scala/org/graphframes/GraphFrame.scala @@ -964,6 +964,15 @@ class GraphFrame private ( */ def maximalIndependentSet: MaximalIndependentSet = new MaximalIndependentSet(this) + /** + * Run an approximate neighbor function backed by the HLL-sketches. + * + * See [[org.graphframes.lib.HyperANF]] for more details. + * + * @group stdlib + */ + def hyperANF: HyperANF = new HyperANF(this) + // ========= Graph Machine Learning ========== /** diff --git a/core/src/main/scala/org/graphframes/lib/HyperANF.scala b/core/src/main/scala/org/graphframes/lib/HyperANF.scala new file mode 100644 index 000000000..8efaf483f --- /dev/null +++ b/core/src/main/scala/org/graphframes/lib/HyperANF.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.graphframes.lib + +import org.apache.spark.sql.Column +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.hll_sketch_agg +import org.apache.spark.sql.functions.hll_union_agg +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions.udf +import org.graphframes.GraphFrame +import org.graphframes.Logging +import org.graphframes.WithIntermediateStorageLevel +import org.graphframes.WithLgNomEntries + +/** + * HyperANF-style approximation of the neighbourhood function on top of GraphFrames. + * + * This implementation is inspired by + * [[https://arxiv.org/pdf/1011.5599 Vigna, Paolo; Boldi, Marco; Rosa, Sebastiano. "HyperANF: Approximating the Neighbourhood Function of Very Large Graphs on a Budget." arXiv preprint arXiv:1011.5599 (2010)]]. + * + * The input graph is treated as directed: for each vertex, reachability is computed by following + * outgoing edges from `src` to `dst`. + * + * Compared with the cumulative neighbourhood-function presentation in the paper, this + * implementation returns one column per hop, `hop_0`, `hop_1`, `hop_2`, ..., `hop_N`. The `hop_0` + * column contains a HyperLogLog sketch of the source vertex itself, and each `hop_k` column for + * `k >= 1` contains a HyperLogLog sketch of the set of vertices reachable in exactly `k` hops. To + * derive the cumulative approximate neighbourhood function for distances up to some hop `k`, a + * user can combine `hop_0` through `hop_k` with `hll_union` and then apply `hll_sketch_estimate` + * to the merged sketch. + * + * The computation can also be restricted to a subgraph by supplying an edge filter expression via + * [[setEdgesFilterExpression]]. A common use case is to filter on `src`, for example + * `src IN (...)`, to obtain sketches only for a selected set of starting vertices. + * + * @param graph + * input graph whose directed edges are used for reachability expansion + */ +class HyperANF private[graphframes] (graph: GraphFrame) + extends Serializable + with Logging + with WithIntermediateStorageLevel + with WithLgNomEntries { + private var nHops: Int = 3 + private var edgesFilterExpression: Column = lit(true) + + /** + * Sets the edge filter expression used before running the computation. + * + * Only edges satisfying this predicate participate in the directed reachability expansion. This + * effectively runs the algorithm on the subgraph induced by the filtered edge set. + * + * A common use case is filtering on `src`, for example `src IN (...)`, to limit the result to a + * chosen set of starting vertices. + * + * @param value + * filter expression applied to `graph.edges` + * @return + * this HyperANF instance + */ + def setEdgesFilterExpression(value: Column): this.type = { + edgesFilterExpression = value + this + } + + /** + * Sets the maximum hop distance to compute. + * + * The result will contain `hop_0`, `hop_1`, `hop_2`, ..., `hop_N`, where `N` is the configured + * number of hops. + * + * @param value + * positive number of hops to compute + * @return + * this HyperANF instance + */ + def setNHops(value: Int): this.type = { + require(value > 0, "n-hops cannot be nagative or zero") + nHops = value + this + } + + /** + * Runs the HyperANF-style computation. + * + * The returned `DataFrame` has one row per source vertex present in the filtered edge set. It + * contains the vertex id column `id` and one sketch column per hop: `hop_0`, `hop_1`, `hop_2`, + * ..., `hop_N`. The `hop_0` column stores a HyperLogLog sketch containing `id` itself. Each + * `hop_k` column for `k >= 1` stores a HyperLogLog sketch for the set of vertices reachable + * from `id` in exactly `k` directed hops. + * + * To obtain an approximate cumulative neighbourhood size up to hop `k`, union `hop_0` through + * `hop_k` with `hll_union` and then apply `hll_sketch_estimate`. + * + * @return + * a `DataFrame` with exact-hop HyperLogLog sketches per source vertex + */ + def run(): DataFrame = { + val edges = + graph.edges + .filter(edgesFilterExpression) + .select(GraphFrame.SRC, GraphFrame.DST) + .persist(intermediateStorageLevel) + var hop = 1 + + val hop0func = udf(HyperANF.hll(lgNomEntries)) + var state = edges + .groupBy(col(GraphFrame.SRC).alias(GraphFrame.ID)) + .agg(hll_sketch_agg(GraphFrame.DST, lgNomEntries).alias("hop_1")) + .select(col(GraphFrame.ID), hop0func(col(GraphFrame.ID)).alias("hop_0"), col("hop_1")) + + while (hop < nHops) { + hop += 1 + + val n_state = edges + .join( + state.select(GraphFrame.ID, s"hop_${hop - 1}"), + col(GraphFrame.DST) === col(GraphFrame.ID), + "left") + .groupBy(col(GraphFrame.SRC).alias(GraphFrame.ID)) + .agg(hll_union_agg(s"hop_${hop - 1}").alias(s"hop_${hop}")) + + state = state.join(n_state, GraphFrame.ID) + } + + val result = state.persist(intermediateStorageLevel) + // materialize + val _ = result.count() + resultIsPersistent() + + edges.unpersist() + + result + } +} + +private object HyperANF extends Serializable { + def hll(lgNomEntries: Int): Any => Array[Byte] = (id) => { + val sketch = new org.apache.datasketches.hll.HllSketch(lgNomEntries) + sketch.update(id.toString()) + sketch.toCompactByteArray() + } +} diff --git a/core/src/main/scala/org/graphframes/lib/TriangleCount.scala b/core/src/main/scala/org/graphframes/lib/TriangleCount.scala index 766361886..7957454d3 100644 --- a/core/src/main/scala/org/graphframes/lib/TriangleCount.scala +++ b/core/src/main/scala/org/graphframes/lib/TriangleCount.scala @@ -24,6 +24,7 @@ import org.graphframes.GraphFrame import org.graphframes.GraphFramesSparkVersionException import org.graphframes.Logging import org.graphframes.WithIntermediateStorageLevel +import org.graphframes.WithLgNomEntries /** * Triangle count implementation. @@ -41,24 +42,14 @@ import org.graphframes.WithIntermediateStorageLevel class TriangleCount private[graphframes] (private val graph: GraphFrame) extends Arguments with Serializable - with WithIntermediateStorageLevel { + with WithIntermediateStorageLevel + with WithLgNomEntries { private var algorithm: String = "exact" private val supportedAlgorithms: Set[String] = Set("exact", "approx") - private var lgNomEntries: Int = 12 private def supportedAlgorithmsRepr: String = supportedAlgorithms.mkString(", ") - /** - * Sets the log2 of the nominal entries for the Theta sketch (only for "approx" algorithm). - * Default is 12 (4096 entries). - */ - def setLgNomEntries(value: Int): this.type = { - require((value >= 4) && (value <= 24), "lg_nom must be between 4 and 26, defaults to 12") - lgNomEntries = value - this - } - /** * Sets the triangle counting algorithm. Options are "exact" (default) or "approx". */ diff --git a/core/src/main/scala/org/graphframes/mixins.scala b/core/src/main/scala/org/graphframes/mixins.scala index c059855e8..811c3b9e8 100644 --- a/core/src/main/scala/org/graphframes/mixins.scala +++ b/core/src/main/scala/org/graphframes/mixins.scala @@ -211,3 +211,20 @@ private[graphframes] trait WithDirection { */ def getIsDirected: Boolean = isDirected } + +/** + * Helper for the sketch-based algorithms. + */ +private[graphframes] trait WithLgNomEntries { + protected var lgNomEntries: Int = 12 + + /** + * Sets the log2 of the nominal entries for the data sketch. Default is 12 (4096 entries). + */ + def setLgNomEntries(value: Int): this.type = { + require((value >= 4) && (value <= 24), "lg_nom must be between 4 and 24, defaults to 12") + lgNomEntries = value + this + } + +} diff --git a/core/src/test/scala/org/graphframes/lib/HyperANFSuite.scala b/core/src/test/scala/org/graphframes/lib/HyperANFSuite.scala new file mode 100644 index 000000000..553e632de --- /dev/null +++ b/core/src/test/scala/org/graphframes/lib/HyperANFSuite.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.graphframes.lib + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.expr +import org.apache.spark.sql.functions.hll_sketch_estimate +import org.apache.spark.sql.types.DataTypes +import org.graphframes.GraphFrame +import org.graphframes.GraphFrameTestSparkContext +import org.graphframes.SparkFunSuite +import org.graphframes.TestUtils + +class HyperANFSuite extends SparkFunSuite with GraphFrameTestSparkContext { + + private def diamondCycleGraph: GraphFrame = { + val vertices = + spark.createDataFrame((1L to 5L).map(id => (id, s"v$id"))).toDF("id", "name") + val edges = spark + .createDataFrame(Seq((1L, 2L), (1L, 3L), (2L, 4L), (3L, 4L), (4L, 5L), (5L, 1L))) + .toDF("src", "dst") + GraphFrame(vertices, edges) + } + + private def estimateHopCounts(result: DataFrame, nHops: Int): Map[Long, Seq[Long]] = { + val estimateColumns = (0 to nHops).map { hop => + hll_sketch_estimate(col(s"hop_$hop")).alias(s"hop_${hop}_estimate") + } + + result + .select((Seq(col("id")) ++ estimateColumns): _*) + .collect() + .map { row => + row.getAs[Long]("id") -> (0 to nHops).map { hop => + row.getAs[Long](s"hop_${hop}_estimate") + } + } + .toMap + } + + test("HyperANF returns exact 0-hop through 2-hop reachable cardinalities") { + val graph = diamondCycleGraph + val result = new HyperANF(graph) + .setNHops(2) + .setLgNomEntries(12) + .run() + + TestUtils.checkColumnType(result.schema, "hop_0", DataTypes.BinaryType) + TestUtils.checkColumnType(result.schema, "hop_1", DataTypes.BinaryType) + TestUtils.checkColumnType(result.schema, "hop_2", DataTypes.BinaryType) + + val expected = Map( + 1L -> Seq(1L, 2L, 1L), + 2L -> Seq(1L, 1L, 1L), + 3L -> Seq(1L, 1L, 1L), + 4L -> Seq(1L, 1L, 1L), + 5L -> Seq(1L, 1L, 2L)) + + assert(estimateHopCounts(result, 2) === expected) + result.unpersist() + } + + test("HyperANF returns exact 0-hop through 3-hop reachable cardinalities") { + val graph = diamondCycleGraph + val result = new HyperANF(graph) + .setNHops(3) + .setLgNomEntries(12) + .run() + + TestUtils.checkColumnType(result.schema, "hop_0", DataTypes.BinaryType) + TestUtils.checkColumnType(result.schema, "hop_1", DataTypes.BinaryType) + TestUtils.checkColumnType(result.schema, "hop_2", DataTypes.BinaryType) + TestUtils.checkColumnType(result.schema, "hop_3", DataTypes.BinaryType) + + val expected = Map( + 1L -> Seq(1L, 2L, 1L, 1L), + 2L -> Seq(1L, 1L, 1L, 1L), + 3L -> Seq(1L, 1L, 1L, 1L), + 4L -> Seq(1L, 1L, 1L, 2L), + 5L -> Seq(1L, 1L, 2L, 1L)) + + assert(estimateHopCounts(result, 3) === expected) + result.unpersist() + } + + test( + "HyperANF starting vertices expression limits output to selected vertices with outgoing edges") { + val graph = diamondCycleGraph + val result = new HyperANF(graph) + .setEdgesFilterExpression(expr("src IN (1, 3, 42)")) + .setNHops(2) + .setLgNomEntries(12) + .run() + + val ids = result.select("id").collect().map(_.getAs[Long]("id")).toSet + + assert(ids === Set(1L, 3L)) + result.unpersist() + } +}