Skip to content

Commit f1db6f4

Browse files
feat: pyspark property graphs (#798)
* test: add comprehensive PropertyGraphFrame test suite with vertex/edge operations and projections * refactor: rewrite PropertyGraphFrameTest to pytest with fixtures * feat: add Python API for property graphs with VertexPropertyGroup and EdgePropertyGroup * refactor: rename to_graph_frame to to_graphframe in PropertyGraphFrame API
1 parent c2fe76c commit f1db6f4

7 files changed

Lines changed: 1313 additions & 8 deletions

File tree

docs/src/04-user-guide/11-property-graphs.md

Lines changed: 109 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ GraphFrames represent a property graph as a combination of multiple logical stru
2424

2525
### Vertex Property Group
2626

27-
For API details see @:scaladoc(org.graphframes.propertygraph.property.VertexPropertyGroup). It contains a name of the property group, for example, `movies`, a name of ID column and underlying data in the form of a `DataFrame`.
27+
For API details see @:scaladoc(org.graphframes.propertygraph.property.VertexPropertyGroup) (Scala) or `graphframes.pg.VertexPropertyGroup` (Python). It contains a name of the property group, for example, `movies`, a name of ID column and underlying data in the form of a `DataFrame`.
2828

2929
The simple example below creates two property groups: `people` and `movies`.
3030

31-
```scala
31+
````scala
3232
import org.graphframes.propertygraph.property.VertexPropertyGroup
3333

3434
val peopleData = spark
@@ -43,15 +43,31 @@ val moviesData = spark
4343
.toDF("id", "title")
4444

4545
val moviesGroup = VertexPropertyGroup("movies", moviesData, "id")
46+
````
47+
48+
```python
49+
from graphframes.pg import VertexPropertyGroup
50+
51+
people_data = spark.createDataFrame(
52+
[(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David"), (5, "Eve")],
53+
["id", "name"]
54+
)
55+
people_group = VertexPropertyGroup("people", people_data, "id")
56+
57+
movies_data = spark.createDataFrame(
58+
[(1, "Matrix"), (2, "Inception"), (3, "Interstellar")],
59+
["id", "title"]
60+
)
61+
movies_group = VertexPropertyGroup("movies", movies_data, "id")
4662
```
4763

4864
### Edge Property Group
4965

50-
For API details see @:scaladoc(org.graphframes.propertygraph.property.EdgePropertyGroup). It contains a name of the property group, links to the source and target vertex property groups, direction of the edges (`directed` or `undirected`), and underlying data in the form of a `DataFrame`. Optionally, it can contain a column with edge weights as well as names of source and target vertex ID columns.
66+
For API details see @:scaladoc(org.graphframes.propertygraph.property.EdgePropertyGroup) (Scala) or `graphframes.pg.EdgePropertyGroup` (Python). It contains a name of the property group, links to the source and target vertex property groups, direction of the edges (`directed` or `undirected`), and underlying data in the form of a `DataFrame`. Optionally, it can contain a column with edge weights as well as names of source and target vertex ID columns.
5167

5268
The simple example below creates an edge property group with the name `likes` and links to the `people` and `movies` vertex property groups as well as `messages` property group that links people to people.
5369

54-
```scala
70+
````scala
5571
import org.graphframes.propertygraph.property.EdgePropertyGroup
5672

5773
val likesData = spark
@@ -82,22 +98,68 @@ val messagesGroup = EdgePropertyGroup(
8298
"src",
8399
"dst",
84100
col("weight"))
101+
````
102+
103+
```python
104+
from pyspark.sql.functions import col, lit
105+
from graphframes.pg import EdgePropertyGroup
106+
107+
likes_data = spark.createDataFrame(
108+
[(1, 1), (1, 2), (2, 1), (3, 2), (4, 3), (5, 2)],
109+
["src", "dst"]
110+
).withColumn("weight", lit(1.0))
111+
112+
likes_group = EdgePropertyGroup(
113+
"likes",
114+
likes_data,
115+
people_group,
116+
movies_group,
117+
is_directed=False,
118+
src_column_name="src",
119+
dst_column_name="dst",
120+
weight_column_name="weight"
121+
)
122+
123+
messages_data = spark.createDataFrame(
124+
[(1, 2, 5.0), (2, 3, 8.0), (3, 4, 3.0), (4, 5, 6.0), (5, 1, 9.0)],
125+
["src", "dst", "weight"]
126+
)
127+
128+
messages_group = EdgePropertyGroup(
129+
"messages",
130+
messages_data,
131+
people_group,
132+
people_group,
133+
is_directed=True,
134+
src_column_name="src",
135+
dst_column_name="dst",
136+
weight_column_name="weight"
137+
)
85138
```
86139

87140
### Property GraphFrame
88141

89142
Having defined the property groups, we can create a `PropertyGraphFrame` by passing the property groups to the constructor.
90143

91-
```scala
144+
````scala
92145
import org.graphframes.propertygraph.PropertyGraphFrame
93146

94147
peopleMoviesGraph =
95148
PropertyGraphFrame(Seq(peopleGroup, moviesGroup), Seq(likesGroup, messagesGroup))
149+
````
150+
151+
```python
152+
from graphframes.pg import PropertyGraphFrame
153+
154+
people_movies_graph = PropertyGraphFrame(
155+
[people_group, movies_group],
156+
[likes_group, messages_group]
157+
)
96158
```
97159

98160
### Conversion to GraphFrames
99161

100-
The `PropertyGraphFrame` can be converted to a `GraphFrame` by calling `toGraphFrame`. Users can select a subset of vertex and edge property groups to be included in the resulting `GraphFrame`. Under the hood, the conversion will take care about handling potential vertex and edge ID collisions by applying hashing to both vertex and edge IDs.
162+
The `PropertyGraphFrame` can be converted to a `GraphFrame` by calling `toGraphFrame` (Scala) or `to_graphframe` (Python). Users can select a subset of vertex and edge property groups to be included in the resulting `GraphFrame`. Under the hood, the conversion will take care about handling potential vertex and edge ID collisions by applying hashing to both vertex and edge IDs.
101163

102164
```scala
103165
val graph = peopleMoviesGraph.toGraphFrame(
@@ -107,16 +169,37 @@ val graph = peopleMoviesGraph.toGraphFrame(
107169
Map("people" -> lit(true)))
108170
```
109171

110-
For more details see @:scaladoc(org.graphframes.propertygraph.PropertyGraphFrame).
172+
```python
173+
from pyspark.sql.functions import lit
174+
175+
graph = people_movies_graph.to_graphframe(
176+
vertex_property_groups=["people"],
177+
edge_property_groups=["messages"],
178+
edge_group_filters={"messages": lit(True)},
179+
vertex_group_filters={"people": lit(True)}
180+
)
181+
```
182+
183+
For more details see @:scaladoc(org.graphframes.propertygraph.PropertyGraphFrame) (Scala) or @:pydoc(graphframes.pg.PropertyGraphFrame) (Python).
111184

112185
This operation is not free, so user can also explicitly specify for each of `VertexGroup` does it need to be hashed or not.
113186

114-
```scala
187+
````scala
115188
val moviesData = spark
116189
.createDataFrame(Seq((1L, "Matrix"), (2L, "Inception"), (3L, "Interstellar")))
117190
.toDF("id", "title")
118191

119192
val moviesGroup = VertexPropertyGroup("movies", moviesData, "id", applyMaskOnId = false)
193+
````
194+
195+
```python
196+
movies_data = spark.createDataFrame(
197+
[(1, "Matrix"), (2, "Inception"), (3, "Interstellar")],
198+
["id", "title"]
199+
)
200+
movies_group = VertexPropertyGroup(
201+
"movies", movies_data, "id", apply_mask_on_id=False
202+
)
120203
```
121204

122205
### Projection
@@ -127,3 +210,21 @@ The `PropertyGraphFrame` support projection of edges groups to a new edge group.
127210
val projectedGraph = peopleMoviesGraph.projectionBy("people", "movies", "likes")
128211
```
129212

213+
```python
214+
projected_graph = people_movies_graph.projection_by("people", "movies", "likes")
215+
```
216+
217+
### Joining Algorithm Results
218+
219+
After running graph algorithms on a `GraphFrame` created from a `PropertyGraphFrame`, you can join the results back to the original vertex data using `join_vertices` (Python) or `joinVertices` (Scala).
220+
221+
```scala
222+
val components = graph.connectedComponents()
223+
val joinedBack = peopleMoviesGraph.joinVertices(components, Seq("people", "movies"))
224+
```
225+
226+
```python
227+
components = graph.connectedComponents()
228+
joined_back = people_movies_graph.join_vertices(components, ["people", "movies"])
229+
```
230+

python/graphframes/graphframe.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ def is_remote() -> bool:
5353
"""Constant for the edge column name."""
5454
EDGE = "edge"
5555

56+
"""Constant for the weight column name."""
57+
WEIGHT = "weight"
58+
5659

5760
class GraphFrame:
5861
"""
@@ -76,6 +79,7 @@ class GraphFrame:
7679
SRC: str = SRC
7780
DST: str = DST
7881
EDGE: str = EDGE
82+
WEIGHT: str = WEIGHT
7983

8084
@staticmethod
8185
def _from_impl(impl: "GraphFrameClassic | GraphFrameConnect") -> "GraphFrame":

python/graphframes/pg/__init__.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from graphframes.pg.property_graphframe import PropertyGraphFrame
19+
from graphframes.pg.property_groups import EdgePropertyGroup, VertexPropertyGroup
20+
21+
__all__ = [
22+
"VertexPropertyGroup",
23+
"EdgePropertyGroup",
24+
"PropertyGraphFrame",
25+
]

0 commit comments

Comments
 (0)