Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 11 additions & 29 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
# This file is inspired by:
# https://github.com/actions/starter-workflows/blob/main/pages/jekyll.yml
# MIT License
#
# Copyright (c) 2020 GitHub

name: Deploy Docs

on:
push:
branches:
- master
release:
types: [published]
workflow_dispatch:

permissions:
Expand All @@ -28,22 +21,11 @@ jobs:
- name: Checkout
uses: actions/checkout@v4

- name: Setup Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: '3.1'
bundler-cache: true
cache-version: 0

- name: Setup Java
uses: actions/setup-java@v4
with:
distribution: "zulu"
java-version: "11"

- name: Setup Jekyll
run: |
gem install jekyll jekyll-redirect-from bundler
java-version: "17"

- name: Setup Python
uses: actions/setup-python@v4
Expand Down Expand Up @@ -71,18 +53,18 @@ jobs:
id: pages
uses: actions/configure-pages@v5

- name: Build with Jekyll
working-directory: ./docs
run: jekyll build --baseurl "${{ steps.pages.outputs.base_path }}"
env:
SKIP_SCALADOC: 0
SKIP_PYTHONDOC: 0
PRODUCTION: 1
- name: Run benchmarks
run: |
./build/sbt -Dspark.version=4.0.0 "benchmarks/Jmh/run -rf json"

- name: Build with Laika
run: |
./build/sbt -Dspark.version=4.0.0 -Ddocs.mode=production "docs/laikaHTML"

- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
path: ./docs/_site
path: ./docs/target/docs/site

deploy:
environment:
Expand Down
12 changes: 10 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
build/*.jar

docs/_site
docs/api
docs/src/api/python
docs/src/api/scaladoc

# sbt specific
.cache/
Expand Down Expand Up @@ -64,4 +65,11 @@ graphframes-connect-databricks/*
workspace

# JMH (jmh-result.json, jmh-result.csv, etc.)
jmh-result.*
jmh-result.*

# Intellij thing
connect/project

# Auto-generated doc
/docs/src/02-quick-start/01-installation.md
/docs/src/05-blog/feed.xml
Empty file removed .nojekyll
Empty file.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ repos:

- id: scalafmt
name: scalafmt
entry: build/sbt scalafmtAll
entry: build/sbt scalafmtCheckAll
language: system
types: [scala]
pass_filenames: false

- id: scalafix
name: scalafix
entry: build/sbt scalafixAll
entry: build/sbt "scalafixAll --check"
language: system
types: [scala]
pass_filenames: false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package org.graphframes.benchmarks

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel
import org.graphframes.GraphFrame
import org.graphframes.examples.LDBCUtils
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.BenchmarkMode
import org.openjdk.jmh.annotations.Fork
import org.openjdk.jmh.annotations.Measurement
import org.openjdk.jmh.annotations.Mode
import org.openjdk.jmh.annotations.OutputTimeUnit
import org.openjdk.jmh.annotations.Warmup
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

import java.io.File
Expand All @@ -22,117 +17,132 @@ import java.nio.file.Path
import java.util.Properties
import java.util.concurrent.TimeUnit

@State(Scope.Benchmark)
@Warmup(iterations = 1)
@Measurement(iterations = 3)
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(
value = 1,
jvmArgs = Array(
"-Xmx10g",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED"))
class LDBCBenchmarkSuite {
private def spark: SparkSession = {
SparkSession.getActiveSession match {
case Some(session) => session
case None =>
val spark = SparkSession
.builder()
.master("local[*]")
.appName("GraphFramesBenchmarks")
.config("spark.sql.shuffle.partitions", s"${Runtime.getRuntime.availableProcessors()}")
.config("spark.driver.memory", "8g")
.config("spark.executor.memory", "8g")
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
spark
}
}

private def resourcesPath = Path.of(new File("target").toURI())
private def unreachableID = 9223372036854775807L
val benchmarkGraphName: String = LDBCUtils.WIKI_TALKS
var graph: GraphFrame = _
var props: Properties = _

@Benchmark
@Fork(value = 1)
@Warmup(iterations = 1)
@Measurement(iterations = 2)
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.SECONDS)
def benchmarkSP(blackhole: Blackhole): Unit = {
LDBCUtils.downloadLDBCIfNotExists(resourcesPath, LDBCUtils.GRAPH500_22)
val caseRoot = resourcesPath.resolve(LDBCUtils.GRAPH500_22)

val expectedResults = spark.read
.format("csv")
.option("header", "false")
.schema(StructType(Seq(StructField("id", LongType), StructField("distance", LongType))))
.csv(caseRoot.resolve(s"${LDBCUtils.GRAPH500_22}-BFS").toString)
@Setup(Level.Trial)
def setup(): Unit = {
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("GraphFramesBenchmarks")
.set("spark.sql.shuffle.partitions", s"${Runtime.getRuntime.availableProcessors() * 2}")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val props = new Properties()
val stream = Files.newInputStream(caseRoot.resolve(s"${LDBCUtils.GRAPH500_22}.properties"))
props.load(stream)
stream.close()
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val context = spark.sparkContext
context.setLogLevel("ERROR")
context.setCheckpointDir("/tmp/graphframes-checkpoints")

val sourceVertex =
props.getProperty(s"graph.${LDBCUtils.GRAPH500_22}.bfs.source-vertex").toLong
LDBCUtils.downloadLDBCIfNotExists(resourcesPath, benchmarkGraphName)

val edges = spark.read
.format("csv")
.option("header", "false")
.option("delimiter", " ")
.schema(StructType(Seq(StructField("src", LongType), StructField("dst", LongType))))
.load(caseRoot.resolve(s"${LDBCUtils.GRAPH500_22}.e").toString)
.load(caseRoot.resolve(s"${benchmarkGraphName}.e").toString)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
println()
println(s"Read edges: ${edges.count()}")

val vertices = spark.read
.format("csv")
.option("header", "false")
.schema(StructType(Seq(StructField("id", LongType))))
.load(caseRoot.resolve(s"${LDBCUtils.GRAPH500_22}.v").toString)
val graph = GraphFrame(vertices, edges)
.load(caseRoot.resolve(s"${benchmarkGraphName}.v").toString)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
println(s"Read vertices: ${vertices.count()}")

graph = GraphFrame(vertices, edges)
props = new Properties()
val stream = Files.newInputStream(caseRoot.resolve(s"${benchmarkGraphName}.properties"))
props.load(stream)
stream.close()
}

private def caseRoot: Path = resourcesPath.resolve(benchmarkGraphName)

private def resourcesPath = Path.of(new File("target").toURI)

@Benchmark
def benchmarkSPlocalCheckpoints(blackhole: Blackhole): Unit = {
val sourceVertex =
props.getProperty(s"graph.${benchmarkGraphName}.bfs.source-vertex").toLong

val spResults = graph.shortestPaths
.setUseLocalCheckpoints(true)
.landmarks(Seq(sourceVertex))
.setCheckpointInterval(1)
.setAlgorithm("graphframes")
.run()
.select(
col(GraphFrame.ID),
col("distances").getItem(sourceVertex).cast(LongType).alias("got_distance"))
.na
.fill(Map("got_distance" -> unreachableID))

val cntOfMismatches = spResults
.join(expectedResults, Seq("id"))
.filter(col("got_distance") =!= col("distance"))
.count()
blackhole.consume(assert(cntOfMismatches == 0))

val res: Unit = spResults.write.format("noop").mode("overwrite").save()
blackhole.consume(res)
}

@Benchmark
@Fork(value = 1)
@Warmup(iterations = 1)
@Measurement(iterations = 2)
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.SECONDS)
def benchmarkCC(blackhole: Blackhole): Unit = {
LDBCUtils.downloadLDBCIfNotExists(resourcesPath, LDBCUtils.GRAPH500_22)
val caseRoot = resourcesPath.resolve(LDBCUtils.GRAPH500_22)
def benchmarkSP(blackhole: Blackhole): Unit = {
val sourceVertex =
props.getProperty(s"graph.${benchmarkGraphName}.bfs.source-vertex").toLong

val expectedResults = spark.read
.format("csv")
.option("header", "false")
.schema(StructType(Seq(StructField("id", LongType), StructField("wcomp", LongType))))
.csv(caseRoot.resolve(s"${LDBCUtils.GRAPH500_22}-WCC").toString)
val spResults = graph.shortestPaths
.setAlgorithm("graphframes")
.landmarks(Seq(sourceVertex))
.run()

val edges = spark.read
.format("csv")
.option("header", "false")
.schema(StructType(Seq(StructField("src", LongType), StructField("dst", LongType))))
.load(caseRoot.resolve(s"${LDBCUtils.GRAPH500_22}.e").toString)
val vertices = spark.read
.format("csv")
.option("header", "false")
.schema(StructType(Seq(StructField("id", LongType))))
.load(caseRoot.resolve(s"${LDBCUtils.GRAPH500_22}.v").toString)
val graph = GraphFrame(vertices, edges)
val res: Unit = spResults.write.format("noop").mode("overwrite").save()
blackhole.consume(res)
}

@Benchmark
def benchmarkSPGraphX(blackhole: Blackhole): Unit = {
val sourceVertex =
props.getProperty(s"graph.${benchmarkGraphName}.bfs.source-vertex").toLong

val spResults = graph.shortestPaths.setAlgorithm("graphx").landmarks(Seq(sourceVertex)).run()
val res: Unit = spResults.write.format("noop").mode("overwrite").save()
blackhole.consume(res)
}

@Benchmark
def benchmarkCC(blackhole: Blackhole): Unit = {
val ccResults =
graph.connectedComponents.setUseLocalCheckpoints(true).setAlgorithm("graphframes").run()
Comment thread
SemyonSinchenko marked this conversation as resolved.
val cntOfMismatches = ccResults
.join(expectedResults, Seq("id"))
.filter(col("wcomp") =!= col("component"))
.count()
blackhole.consume(assert(cntOfMismatches == 0))
val res: Unit = ccResults.write.format("noop").mode("overwrite").save()
blackhole.consume(res)
}

@Benchmark
def benchmarkCCGraphX(blackhole: Blackhole): Unit = {
val ccResults = graph.connectedComponents.setAlgorithm("graphx").run()
val res: Unit = ccResults.write.format("noop").mode("overwrite").save()
blackhole.consume(res)
}

@Benchmark
def benchmarkCDLP(blackhole: Blackhole): Unit = {
val cdlpResults = graph.labelPropagation
.setAlgorithm("graphframes")
.maxIter(10)
.setUseLocalCheckpoints(true)
.run()
val res: Unit = cdlpResults.write.format("noop").mode("overwrite").save()
blackhole.consume(res)
}
}
Loading