Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
uses: actions/setup-java@v4
with:
distribution: "zulu"
java-version: "11"
java-version: "17"

- name: Setup Jekyll
run: |
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ jobs:
fail-fast: false
matrix:
include:
- spark-version: 3.5.4
scala-version: 2.12.18
- spark-version: 4.0.0
scala-version: 2.13.16
python-version: 3.10.6
runs-on: ubuntu-22.04
runs-on: ubuntu-latest
env:
# define Java options for both official sbt and sbt-extras
JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8
Expand All @@ -21,7 +21,7 @@ jobs:
- uses: actions/setup-java@v4
with:
distribution: "zulu"
java-version: "11"
java-version: "17"
- uses: actions/cache@v4
with:
path: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
if: startsWith(github.ref, 'refs/tags/')
uses: actions/setup-java@v3
with:
java-version: '11'
java-version: '17'
distribution: 'zulu'

- name: Set up Python
Expand Down
15 changes: 3 additions & 12 deletions .github/workflows/scala-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,10 @@ jobs:
fail-fast: false
matrix:
include:
- spark-version: 3.5.4
scala-version: 2.13.12
java-version: 11
- spark-version: 3.5.4
scala-version: 2.13.12
- spark-version: 4.0.0
scala-version: 2.13.16
java-version: 17
- spark-version: 3.5.4
scala-version: 2.12.18
java-version: 11
- spark-version: 3.5.4
scala-version: 2.12.18
java-version: 17
runs-on: ubuntu-22.04
runs-on: ubuntu-latest
env:
# fixing this error after tests success: sbt.ForkMain failed with exit code 134
# https://stackoverflow.com/questions/33287424/strange-exception-in-sbt-test
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ tmp/*

# db-connect targets
graphframes-connect-databricks/*

# JDTLS
workspace
6 changes: 3 additions & 3 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ managed:

plugins:
# Python API
- remote: buf.build/grpc/python:v1.64.2
- remote: buf.build/grpc/python:v1.73.0
out: python/graphframes/connect/proto
- remote: buf.build/protocolbuffers/python:v27.1
- remote: buf.build/protocolbuffers/python:v29.3
out: python/graphframes/connect/proto
- remote: buf.build/protocolbuffers/pyi
out: python/graphframes/connect/proto
out: python/graphframes/connect/proto
15 changes: 6 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import xerial.sbt.Sonatype.sonatypeCentralHost

lazy val sparkVer = sys.props.getOrElse("spark.version", "3.5.5")
lazy val sparkVer = sys.props.getOrElse("spark.version", "4.0.0")
lazy val sparkBranch = sparkVer.substring(0, 3)
lazy val defaultScalaVer = sparkBranch match {
case "3.5" => "2.12.18"
case "3.4" => "2.12.17"
case "4.0" => "2.13.16"
case _ => throw new IllegalArgumentException(s"Unsupported Spark version: $sparkVer.")
}
lazy val scalaVer = sys.props.getOrElse("scala.version", defaultScalaVer)
lazy val defaultScalaTestVer = scalaVer match {
case s if s.startsWith("2.12") || s.startsWith("2.13") => "3.0.8"
}
lazy val defaultScalaTestVer = "3.0.8"

// Some vendors are using an own shading rule for protobuf
lazy val protobufShadingPattern = sys.props.getOrElse("vendor.name", "oss") match {
case "oss" => "org.sparkproject.connect.protobuf.@1"
Expand Down Expand Up @@ -41,11 +39,10 @@ ThisBuild / developers := List(
ThisBuild / sonatypeCredentialHost := "s01.oss.sonatype.org"
ThisBuild / sonatypeRepository := "https://s01.oss.sonatype.org/service/local"
ThisBuild / sonatypeProfileName := "io.graphframes"
ThisBuild / crossScalaVersions := Seq("2.12.18", "2.13.12")

// Scalafix configuration
ThisBuild / semanticdbEnabled := true
ThisBuild / semanticdbVersion := "4.8.10" // The maximal version that supports both 2.13.8 and 2.12.18
ThisBuild / semanticdbVersion := "4.13.6"

lazy val commonSetting = Seq(
libraryDependencies ++= Seq(
Expand Down Expand Up @@ -127,7 +124,7 @@ lazy val connect = (project in file("graphframes-connect"))
moduleName := s"${name.value}-spark${sparkBranch}",
Compile / PB.targets := Seq(PB.gens.java -> (Compile / sourceManaged).value),
Compile / PB.includePaths ++= Seq(file("src/main/protobuf")),
PB.protocVersion := "3.23.4", // Spark 3.5 branch
PB.protocVersion := "4.29.3", // Spark 4.0 branch
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-connect" % sparkVer % "provided" cross CrossVersion.for3Use2_13),

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
package org.apache.spark.sql.graphframes

import com.google.protobuf
import com.google.protobuf.Any
import com.google.protobuf.InvalidProtocolBufferException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.connect.plugin.RelationPlugin
import org.graphframes.connect.proto.GraphFramesAPI
import java.util.Optional
import org.apache.spark.sql.classic.{DataFrame => ClassicDataFrame}

class GraphFramesConnect extends RelationPlugin {
override def transform(
relation: protobuf.Any,
planner: SparkConnectPlanner): Option[LogicalPlan] = {
if (relation.is(classOf[GraphFramesAPI])) {
val protoCall = relation.unpack(classOf[GraphFramesAPI])
// Because the plugins API is changed in spark 4.0 it makes sense to separate plugin impl from the parsing logic
val result = GraphFramesConnectUtils.parseAPICall(protoCall, planner)
Some(result.logicalPlan)
} else {
None
relation: Array[Byte],
planner: SparkConnectPlanner): Optional[LogicalPlan] = {
try {
val relationProto = Any.parseFrom(relation)
if (relationProto.is(classOf[GraphFramesAPI])) {
val protoCall = relationProto.unpack(classOf[GraphFramesAPI])
val result = GraphFramesConnectUtils.parseAPICall(protoCall, planner)
// We know exactly that it is classic one!
Optional.of(result.asInstanceOf[ClassicDataFrame].logicalPlan)
} else {
Optional.empty()
}
} catch {
case e: InvalidProtocolBufferException => throw new RuntimeException(e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ package org.apache.spark.sql.graphframes

import com.google.protobuf.ByteString
import org.apache.spark.sql.Column
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.classic.Dataset
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.functions.lit
Expand Down Expand Up @@ -190,7 +191,7 @@ object GraphFramesConnectUtils {
case MethodCase.SHORTEST_PATHS => {
graphFrame.shortestPaths
.landmarks(
apiMessage.getShortestPaths.getLandmarksList.asScala.map(parseLongOrStringID))
apiMessage.getShortestPaths.getLandmarksList.asScala.map(parseLongOrStringID).toSeq)
.run()
}
case MethodCase.STRONGLY_CONNECTED_COMPONENTS => {
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ThisBuild / libraryDependencySchemes ++= Seq(
"org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always
)

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.10")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.3.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.1")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.4")

Expand Down
6 changes: 3 additions & 3 deletions python/dev/build_jar.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
from pathlib import Path


def build(spark_versions: Sequence[str] = ["3.5.5"]):
def build(spark_versions: Sequence[str] = ["4.0.0"]):
for spark_version in spark_versions:
print("Building GraphFrames JAR...")
print(f"SPARK_VERSION: {spark_version[:3]}")
assert spark_version[:3] in {"3.5",}, "Unsopported spark version!"
assert spark_version[:3] in {"4.0",}, "Unsupported spark version!"
project_root = Path(__file__).parent.parent.parent
sbt_executable = project_root.joinpath("build").joinpath("sbt").absolute().__str__()
sbt_build_command = [
Expand Down Expand Up @@ -40,7 +40,7 @@ def build(spark_versions: Sequence[str] = ["3.5.5"]):
python_resources = (
project_root.joinpath("python").joinpath("graphframes").joinpath("resources")
)
target_dir = project_root.joinpath("target").joinpath("scala-2.12")
target_dir = project_root.joinpath("target").joinpath("scala-2.13")
gf_jar = None

for pp in target_dir.glob("*.jar"):
Expand Down
4 changes: 2 additions & 2 deletions python/dev/run_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import pyspark

SBT_BUILD_COMMAND = ["./build/sbt", "connect/assembly"]
SPARK_VERSION = "3.5.5"
SCALA_VERSION = "2.12"
SPARK_VERSION = "4.0.0"
SCALA_VERSION = "2.13"


if __name__ == "__main__":
Expand Down
42 changes: 17 additions & 25 deletions python/graphframes/connect/graphframe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
if self._agg_msg is None:
raise ValueError("AggMsg is not initialized!")

return DataFrame.withPlan(
return DataFrame(
Pregel(
max_iter=self._max_iter,
checkpoint_interval=self._checkpoint_interval,
Expand Down Expand Up @@ -269,7 +269,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

return DataFrame.withPlan(Triplets(self._vertices, self._edges), self._spark)
return DataFrame(Triplets(self._vertices, self._edges), self._spark)

@property
def pregel(self):
Expand All @@ -292,7 +292,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

return DataFrame.withPlan(Find(self._vertices, self._edges, pattern), self._spark)
return DataFrame(Find(self._vertices, self._edges, pattern), self._spark)

def filterVertices(self, condition: str | Column) -> "GraphFrameConnect":
class FilterVertices(LogicalPlan):
Expand All @@ -314,7 +314,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

new_vertices = DataFrame.withPlan(
new_vertices = DataFrame(
FilterVertices(self._vertices, self._edges, condition), self._spark
)
# Exactly like in the scala-core
Expand Down Expand Up @@ -347,9 +347,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

new_edges = DataFrame.withPlan(
FilterEdges(self._vertices, self._edges, condition), self._spark
)
new_edges = DataFrame(FilterEdges(self._vertices, self._edges, condition), self._spark)
return GraphFrameConnect(self._vertices, new_edges)

def dropIsolatedVertices(self) -> "GraphFrameConnect":
Expand All @@ -368,9 +366,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

new_vertices = DataFrame.withPlan(
DropIsolatedVertices(self._vertices, self._edges), self._spark
)
new_vertices = DataFrame(DropIsolatedVertices(self._vertices, self._edges), self._spark)
return GraphFrameConnect(new_vertices, self._edges)

def bfs(
Expand Down Expand Up @@ -417,7 +413,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
if edgeFilter is None:
edgeFilter = F.lit(True)

return DataFrame.withPlan(
return DataFrame(
BFS(
v=self._vertices,
e=self._edges,
Expand Down Expand Up @@ -477,7 +473,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
if sendToSrc is None and sendToDst is None:
raise ValueError("Either `sendToSrc`, `sendToDst`, or both have to be provided")

return DataFrame.withPlan(
return DataFrame(
AggregateMessages(self._vertices, self._edges, aggCol, sendToSrc, sendToDst),
self._spark,
)
Expand Down Expand Up @@ -519,7 +515,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

return DataFrame.withPlan(
return DataFrame(
ConnectedComponents(
self._vertices,
self._edges,
Expand Down Expand Up @@ -549,9 +545,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

return DataFrame.withPlan(
LabelPropagation(self._vertices, self._edges, maxIter), self._spark
)
return DataFrame(LabelPropagation(self._vertices, self._edges, maxIter), self._spark)

def _update_page_rank_edge_weights(self, new_vertices: DataFrame) -> "GraphFrameConnect":
cols2select = self.edges.columns + ["weight"]
Expand Down Expand Up @@ -619,7 +613,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
# at the same time I think it should be an exception.
raise ValueError("Exactly one of maxIter or tol should be set.")

new_vertices = DataFrame.withPlan(
new_vertices = DataFrame(
PageRank(
self._vertices,
self._edges,
Expand Down Expand Up @@ -675,7 +669,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
), "Source vertices Ids sourceIds must be provided"
assert maxIter is not None, "Max number of iterations maxIter must be provided"

new_vertices = DataFrame.withPlan(
new_vertices = DataFrame(
ParallelPersonalizedPageRank(
self._vertices,
self._edges,
Expand Down Expand Up @@ -721,7 +715,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

return DataFrame.withPlan(
return DataFrame(
PowerIterationClustering(self._vertices, self._edges, k, maxIter, weightCol),
self._spark,
)
Expand All @@ -747,9 +741,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

return DataFrame.withPlan(
ShortestPaths(self._vertices, self._edges, landmarks), self._spark
)
return DataFrame(ShortestPaths(self._vertices, self._edges, landmarks), self._spark)

def stronglyConnectedComponents(self, maxIter: int) -> DataFrame:
class StronglyConnectedComponents(LogicalPlan):
Expand All @@ -770,7 +762,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

return DataFrame.withPlan(
return DataFrame(
StronglyConnectedComponents(self._vertices, self._edges, maxIter),
self._spark,
)
Expand Down Expand Up @@ -833,7 +825,7 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

output = DataFrame.withPlan(
output = DataFrame(
SVDPlusPlus(
self._vertices,
self._edges,
Expand Down Expand Up @@ -871,4 +863,4 @@ def plan(self, session: SparkConnectClient) -> proto.Relation:
plan.extension.Pack(graphframes_api_call)
return plan

return DataFrame.withPlan(TriangleCount(self._vertices, self._edges), self._spark)
return DataFrame(TriangleCount(self._vertices, self._edges), self._spark)
Loading