diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 000000000..0af62d0c3 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,85 @@ +# GraphFrames Benchmarks + +JMH-based benchmarks for GraphFrames algorithms using LDBC Graphalytics datasets. + +## Available Benchmarks + +| Benchmark | Description | +| ------------------------------ | ----------------------------------------- | +| `ShortestPathsBenchmark` | Shortest paths from a source vertex | +| `ConnectedComponentsBenchmark` | Connected components detection | +| `LabelPropagationBenchmark` | Community detection via label propagation | + +## Running Benchmarks + +### Basic Usage + +```bash +sbt "benchmarks/jmh:run -i 3 -wi 1 -f 1 -p graphName=wiki-Talk org.graphframes.benchmarks.ShortestPathsBenchmark" +``` + +### Parameters + +| Parameter | Values | Description | +| --------------------- | ------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------- | +| `graphName` | See Available Graphs | LDBC graph dataset to use | +| `algorithm` | `graphframes`, `graphx` | Algorithm implementation | +| `maxIter` | integer (default: 10) | Max iterations (iterative algorithms only, e.g. `LabelPropagationBenchmark`) | +| `useLocalCheckpoints` | `true`, `false` (default: `true`) | Use local checkpoints instead of regular checkpoints; faster but less reliable (not applicable to `graphx` algorithm) | +| `broadcastThreshold` | integer (default: 1000000, or `-1` for AQE) | Max vertex degree for join-based propagation; above this threshold a broadcast is used (`ConnectedComponentsBenchmark` only) | +| `checkPointInterval` | integer (default: 1) | Number of iterations between checkpoints (`ShortestPathsBenchmark` only, `graphframes` algorithm) | +| `startingVertex` | long integer (default: 1) | Source vertex ID for shortest paths computation (`ShortestPathsBenchmark` only) | + +### Examples + +Run all algorithms on wiki-Talk: + +```bash +sbt "benchmarks/jmh:run -p graphName=wiki-Talk org.graphframes.benchmarks.ShortestPathsBenchmark" +``` + +Run only GraphX implementation: + +```bash +sbt "benchmarks/jmh:run -p algorithm=graphx -p graphName=cit-Patents org.graphframes.benchmarks.ConnectedComponentsBenchmark" +``` + +Run with custom iteration count: + +```bash +sbt "benchmarks/jmh:run -p maxIter=5 -p graphName=wiki-Talk org.graphframes.benchmarks.LabelPropagationBenchmark" +``` + +## Available Graphs + +| Graph | Vertices | Edges | Weighted | +| ------------- | -------- | ------ | -------- | +| `wiki-Talk` | ~2.4M | ~5.0M | No | +| `cit-Patents` | ~3.8M | ~16.5M | No | +| `kgs` | ~800K | ~23.5M | Yes | +| `graph500-22` | ~4.2M | ~67M | No | +| `graph500-23` | ~8.4M | ~134M | No | +| `graph500-24` | ~16.8M | ~268M | No | +| `graph500-25` | ~33.6M | ~537M | No | + +Test graphs (small, for validation): + +- `test-bfs-directed`, `test-bfs-undirected` +- `test-cdlp-directed`, `test-cdlp-undirected` +- `test-pr-directed`, `test-pr-undirected` +- `test-wcc-directed`, `test-wcc-undirected` + +## Data Format + +Benchmarks use LDBC Graphalytics parquet format: + +- Vertices: `{graphName}-v.parquet` with column `id` +- Edges: `{graphName}-e.parquet` with columns `src`, `dst` (and `weight` for weighted graphs) + +Data is downloaded on first use and cached in `target/ldbc-cache/`. + +## Notes + +- Benchmarks require ~10GB heap memory (configured automatically) +- Data files are cached locally to avoid repeated downloads +- The old `LDBCBenchmarkSuite` is deprecated; use the new split benchmarks diff --git a/benchmarks/src/main/scala/org/graphframes/benchmarks/ConnectedComponentsBenchmark.scala b/benchmarks/src/main/scala/org/graphframes/benchmarks/ConnectedComponentsBenchmark.scala new file mode 100644 index 000000000..9d1946b9b --- /dev/null +++ b/benchmarks/src/main/scala/org/graphframes/benchmarks/ConnectedComponentsBenchmark.scala @@ -0,0 +1,47 @@ +package org.graphframes.benchmarks + +import org.openjdk.jmh.annotations.* +import org.openjdk.jmh.infra.Blackhole + +import java.util.concurrent.TimeUnit + +@State(Scope.Benchmark) +@Warmup(iterations = 1) +@Measurement(iterations = 3) +@BenchmarkMode(Array(Mode.AverageTime)) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork( + value = 1, + jvmArgs = Array( + "-Xmx10g", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED")) +class ConnectedComponentsBenchmark extends LDBCBenchmarkBase { + @Param(Array("graphframes", "graphx")) + var algorithm: String = _ + + @Param(Array("1000000", "-1")) + var broadcastThreshold: String = _ + + @Benchmark + def benchmarkConnectedComponents(blackhole: Blackhole): Unit = { + val ccResults = if (algorithm == "graphx") { + graph.connectedComponents + .setAlgorithm("graphx") + .run() + } else { + graph.connectedComponents + .setUseLocalCheckpoints(true) + .setAlgorithm("graphframes") + .setBroadcastThreshold(broadcastThreshold.toInt) + .setUseLocalCheckpoints(useLocalCheckpoints.toBoolean) + .run() + } + + val res: Unit = ccResults.write.format("noop").mode("overwrite").save() + blackhole.consume(res) + } +} diff --git a/benchmarks/src/main/scala/org/graphframes/benchmarks/LDBCBenchmarkBase.scala b/benchmarks/src/main/scala/org/graphframes/benchmarks/LDBCBenchmarkBase.scala new file mode 100644 index 000000000..ad8cc5fcb --- /dev/null +++ b/benchmarks/src/main/scala/org/graphframes/benchmarks/LDBCBenchmarkBase.scala @@ -0,0 +1,76 @@ +package org.graphframes.benchmarks + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.storage.StorageLevel +import org.graphframes.GraphFrame +import org.openjdk.jmh.annotations.* + +import java.io.File +import java.nio.file.Path + +trait LDBCBenchmarkBase { + @Param(Array("wiki-Talk")) + var graphName: String = _ + + @Param(Array("true")) + var useLocalCheckpoints: String = _ + + var spark: SparkSession = _ + var graph: GraphFrame = _ + + protected def cacheDir: Path = Path.of(new File("target").toURI).resolve("ldbc-cache") + + @Setup(Level.Trial) + def setup(): Unit = { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .setAppName("GraphFramesBenchmarks") + .set("spark.sql.shuffle.partitions", s"${Runtime.getRuntime.availableProcessors() * 2}") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + + spark = SparkSession.builder().config(sparkConf).getOrCreate() + spark.sparkContext.setLogLevel("ERROR") + spark.sparkContext.setCheckpointDir("/tmp/graphframes-checkpoints") + + loadGraph(graphName) + } + + @TearDown(Level.Trial) + def tearDown(): Unit = { + if (spark != null) { + spark.stop() + } + } + + def loadGraph(name: String): Unit = { + val loader = new ParquetDataLoader(cacheDir) + val graphDir = cacheDir.resolve(name) + + loader.downloadParquetIfNotExists(name) + + val isWeighted = ParquetDataLoader.weightedGraphs.contains(name) + + val edges = if (isWeighted) { + spark.read + .parquet(graphDir.resolve(s"${name}-e.parquet").toString) + .withColumnRenamed("source", "src") + .withColumnRenamed("target", "dst") + .persist(StorageLevel.MEMORY_AND_DISK_SER) + } else { + spark.read + .parquet(graphDir.resolve(s"${name}-e.parquet").toString) + .withColumnRenamed("source", "src") + .withColumnRenamed("target", "dst") + .persist(StorageLevel.MEMORY_AND_DISK_SER) + } + println(s"Read edges: ${edges.count()}") + + val vertices = spark.read + .parquet(graphDir.resolve(s"${name}-v.parquet").toString) + .persist(StorageLevel.MEMORY_AND_DISK_SER) + println(s"Read vertices: ${vertices.count()}") + + graph = GraphFrame(vertices, edges) + } +} diff --git a/benchmarks/src/main/scala/org/graphframes/benchmarks/LDBCBenchmarkSuite.scala b/benchmarks/src/main/scala/org/graphframes/benchmarks/LDBCBenchmarkSuite.scala index f35848d17..d06ca84e9 100644 --- a/benchmarks/src/main/scala/org/graphframes/benchmarks/LDBCBenchmarkSuite.scala +++ b/benchmarks/src/main/scala/org/graphframes/benchmarks/LDBCBenchmarkSuite.scala @@ -17,6 +17,13 @@ import java.nio.file.Path import java.util.Properties import java.util.concurrent.TimeUnit +/** + * @deprecated + * Use ShortestPathsBenchmark, ConnectedComponentsBenchmark, or LabelPropagationBenchmark + * instead. These new benchmarks support CLI parameters for algorithm selection and graph + * choice. See benchmarks/README.md for usage instructions. + */ +@deprecated("use the new benchmarks set", since = "0.11") @State(Scope.Benchmark) @Warmup(iterations = 1) @Measurement(iterations = 3) diff --git a/benchmarks/src/main/scala/org/graphframes/benchmarks/LabelPropagationBenchmark.scala b/benchmarks/src/main/scala/org/graphframes/benchmarks/LabelPropagationBenchmark.scala new file mode 100644 index 000000000..3e5b6acb2 --- /dev/null +++ b/benchmarks/src/main/scala/org/graphframes/benchmarks/LabelPropagationBenchmark.scala @@ -0,0 +1,47 @@ +package org.graphframes.benchmarks + +import org.openjdk.jmh.annotations.* +import org.openjdk.jmh.infra.Blackhole + +import java.util.concurrent.TimeUnit + +@State(Scope.Benchmark) +@Warmup(iterations = 1) +@Measurement(iterations = 3) +@BenchmarkMode(Array(Mode.AverageTime)) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork( + value = 1, + jvmArgs = Array( + "-Xmx10g", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED")) +class LabelPropagationBenchmark extends LDBCBenchmarkBase { + @Param(Array("graphframes", "graphx")) + var algorithm: String = _ + + @Param(Array("10")) + var maxIter: String = _ + + @Benchmark + def benchmarkLabelPropagation(blackhole: Blackhole): Unit = { + val cdlpResults = if (algorithm == "graphx") { + graph.labelPropagation + .setAlgorithm("graphx") + .maxIter(maxIter.toInt) + .run() + } else { + graph.labelPropagation + .setAlgorithm("graphframes") + .maxIter(maxIter.toInt) + .setUseLocalCheckpoints(useLocalCheckpoints.toBoolean) + .run() + } + + val res: Unit = cdlpResults.write.format("noop").mode("overwrite").save() + blackhole.consume(res) + } +} diff --git a/benchmarks/src/main/scala/org/graphframes/benchmarks/ParquetDataLoader.scala b/benchmarks/src/main/scala/org/graphframes/benchmarks/ParquetDataLoader.scala new file mode 100644 index 000000000..04d4f0a99 --- /dev/null +++ b/benchmarks/src/main/scala/org/graphframes/benchmarks/ParquetDataLoader.scala @@ -0,0 +1,80 @@ +package org.graphframes.benchmarks + +import java.net.URL +import java.nio.file.Files +import java.nio.file.Path + +class ParquetDataLoader(cacheDir: Path) { + private val LDBC_PARQUET_URL_PREFIX = "https://datasets.ldbcouncil.org/graphalytics-parquet/" + private val bufferSize = 8192 + + def downloadParquetIfNotExists(graphName: String): Unit = { + val graphDir = cacheDir.resolve(graphName) + + if (Files.notExists(graphDir)) { + Files.createDirectories(graphDir) + } + + val vertexFile = graphDir.resolve(s"${graphName}-v.parquet") + val edgeFile = graphDir.resolve(s"${graphName}-e.parquet") + + if (Files.notExists(vertexFile)) { + println(s"Downloading vertex file for $graphName...") + downloadFile(s"${LDBC_PARQUET_URL_PREFIX}${graphName}-v.parquet", vertexFile) + } else { + println(s"Vertex file for $graphName already exists, skipping download") + } + + if (Files.notExists(edgeFile)) { + println(s"Downloading edge file for $graphName...") + downloadFile(s"${LDBC_PARQUET_URL_PREFIX}${graphName}-e.parquet", edgeFile) + } else { + println(s"Edge file for $graphName already exists, skipping download") + } + } + + private def downloadFile(url: String, dest: Path): Unit = { + val connection = new URL(url).openConnection() + connection.setConnectTimeout(30000) + connection.setReadTimeout(30000) + val inputStream = connection.getInputStream + val outputStream = Files.newOutputStream(dest) + val buffer = new Array[Byte](bufferSize) + var bytesRead = 0 + try { + while ({ bytesRead = inputStream.read(buffer); bytesRead } != -1) { + outputStream.write(buffer, 0, bytesRead) + } + } finally { + inputStream.close() + outputStream.close() + } + println(s"Downloaded $url to $dest") + } +} + +object ParquetDataLoader { + val weightedGraphs: Set[String] = Set("kgs") + + val availableGraphs: Set[String] = Set( + "test-bfs-directed", + "test-bfs-undirected", + "test-cdlp-directed", + "test-cdlp-undirected", + "test-pr-directed", + "test-pr-undirected", + "test-wcc-directed", + "test-wcc-undirected", + "kgs", + "graph500-22", + "graph500-23", + "graph500-24", + "graph500-25", + "graph500-26", + "graph500-27", + "graph500-28", + "graph500-29", + "graph500-30", + "cit-Patents", + "wiki-Talk") +} diff --git a/benchmarks/src/main/scala/org/graphframes/benchmarks/ShortestPathsBenchmark.scala b/benchmarks/src/main/scala/org/graphframes/benchmarks/ShortestPathsBenchmark.scala new file mode 100644 index 000000000..b6ab1c572 --- /dev/null +++ b/benchmarks/src/main/scala/org/graphframes/benchmarks/ShortestPathsBenchmark.scala @@ -0,0 +1,53 @@ +package org.graphframes.benchmarks + +import org.openjdk.jmh.annotations.* +import org.openjdk.jmh.infra.Blackhole + +import java.util.concurrent.TimeUnit + +@State(Scope.Benchmark) +@Warmup(iterations = 1) +@Measurement(iterations = 3) +@BenchmarkMode(Array(Mode.AverageTime)) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork( + value = 1, + jvmArgs = Array( + "-Xmx10g", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED")) +class ShortestPathsBenchmark extends LDBCBenchmarkBase { + @Param(Array("graphframes", "graphx")) + var algorithm: String = _ + + @Param(Array("1")) + var checkPointInterval: String = _ + + @Param(Array("1")) + var startingVertex: String = _ + + @Benchmark + def benchmarkShortestPaths(blackhole: Blackhole): Unit = { + val sourceVertex = startingVertex.toLong + + val spResults = if (algorithm == "graphx") { + graph.shortestPaths + .setAlgorithm("graphx") + .landmarks(Seq(sourceVertex)) + .run() + } else { + graph.shortestPaths + .setAlgorithm("graphframes") + .landmarks(Seq(sourceVertex)) + .setCheckpointInterval(checkPointInterval.toInt) + .setUseLocalCheckpoints(useLocalCheckpoints.toBoolean) + .run() + } + + val res: Unit = spResults.write.format("noop").mode("overwrite").save() + blackhole.consume(res) + } +}