Skip to content

Commit 4503055

Browse files
feat: initial top-level structure of PropertyGraph (#613)
* Initial top-level structure of PropertyGraph * Missing commit * WIP * updates from comments * From comments * Drop outdated docstrings for now * Add a projection method * Add tests * from comments
1 parent 0a6c935 commit 4503055

7 files changed

Lines changed: 805 additions & 0 deletions

File tree

core/src/main/scala/org/graphframes/GraphFrame.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,15 @@ object GraphFrame extends Serializable with Logging {
746746
*/
747747
val EDGE: String = "edge"
748748

749+
/**
750+
* Column name representing the weight attribute of edges in a graph.
751+
*
752+
* This field is used to identify and represent the weight associated with edges in a
753+
* GraphFrame. The weight generally encodes the strength or importance of the connection between
754+
* two nodes in a graph.
755+
*/
756+
val WEIGHT: String = "weight"
757+
749758
// ============================ Constructors and converters =================================
750759

751760
/**

core/src/main/scala/org/graphframes/exceptions.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,14 @@ class InvalidPatternException() extends Exception()
2323
*/
2424
class GraphFramesUnreachableException()
2525
extends Exception("This exception should not be reachable")
26+
27+
/**
28+
* Exception thrown when an invalid property group is encountered.
29+
*
30+
* This exception typically indicates that an operation or configuration is using a property group
31+
* that is not supported, invalid, or improperly defined.
32+
*
33+
* @param message
34+
* A detailed error message describing the issue.
35+
*/
36+
class InvalidPropertyGroupException(message: String) extends Exception(message)
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package org.graphframes.propertygraph
2+
3+
import org.apache.spark.sql.Column
4+
import org.apache.spark.sql.DataFrame
5+
import org.apache.spark.sql.functions.col
6+
import org.apache.spark.sql.functions.lit
7+
import org.graphframes.GraphFrame
8+
import org.graphframes.propertygraph.property.EdgePropertyGroup
9+
import org.graphframes.propertygraph.property.VertexPropertyGroup
10+
11+
/**
12+
* A high-level abstraction for working with property graphs that simplifies interaction with the
13+
* GraphFrames library.
14+
*
15+
* PropertyGraphFrame serves as a logical structure that manages collections of vertex and edge
16+
* property groups, providing a user-friendly API for graph operations. It handles various
17+
* internal complexities such as:
18+
* - ID conversion and collision prevention
19+
* - Management of directed/undirected graph representations
20+
* - Handling of weighted/unweighted edges
21+
* - Data consistency across different property groups
22+
*
23+
* The class maintains separate collections for vertex and edge properties, allowing for flexible
24+
* graph construction while ensuring data integrity. Each property (vertex or edge) handles its
25+
* data internally, while this class provides a simplified interface for working with the
26+
* underlying GraphFrame structure.
27+
*
28+
* @param vertexPropertyGroups
29+
* Sequence of vertex property groups that define the graph's vertices
30+
* @param edgesPropertyGroups
31+
* Sequence of edge property groups that define the graph's edges
32+
*/
33+
case class PropertyGraphFrame(
34+
vertexPropertyGroups: Seq[VertexPropertyGroup],
35+
edgesPropertyGroups: Seq[EdgePropertyGroup]) {
36+
import PropertyGraphFrame._
37+
lazy private val vertexGroups: Map[String, VertexPropertyGroup] =
38+
vertexPropertyGroups.map(pg => pg.name -> pg).toMap
39+
lazy private val edgeGroups: Map[String, EdgePropertyGroup] =
40+
edgesPropertyGroups.map(pg => pg.name -> pg).toMap
41+
42+
/**
43+
* Converts a heterogeneous property graph into a unified GraphFrame representation.
44+
*
45+
* This method transforms a property graph that may contain multiple vertex types and both
46+
* directed and undirected edges into a single GraphFrame object where all vertices and edges
47+
* share the same schema. The conversion process handles:
48+
*
49+
* - Internal ID generation and collision prevention by hashing vertex/edge IDs with their
50+
* group names
51+
* - Merging of different vertex types into a unified vertex DataFrame
52+
* - Conversion of directed/undirected edge relationships into a consistent edge DataFrame
53+
* - Filtering of vertices and edges based on provided predicates
54+
*
55+
* The method allows selecting a subset of property groups and applying filters to control which
56+
* data is included in the final GraphFrame.
57+
*
58+
* @param vertexPropertyGroups
59+
* Sequence of vertex property group names to include in the GraphFrame
60+
* @param edgePropertyGroups
61+
* Sequence of edge property group names to include in the GraphFrame
62+
* @param edgeGroupFilters
63+
* Map of edge property group names to filter predicates (Column expressions)
64+
* @param vertexGroupFilters
65+
* Map of vertex property group names to filter predicates (Column expressions)
66+
* @return
67+
* A GraphFrame containing the unified representation of the selected and filtered property
68+
* groups
69+
* @throws IllegalArgumentException
70+
* if any specified property group name doesn't exist
71+
*/
72+
def toGraphFrame(
73+
vertexPropertyGroups: Seq[String],
74+
edgePropertyGroups: Seq[String],
75+
edgeGroupFilters: Map[String, Column],
76+
vertexGroupFilters: Map[String, Column]): GraphFrame = {
77+
vertexPropertyGroups.foreach(name =>
78+
require(vertexGroups.contains(name), s"Vertex property group $name does not exist"))
79+
edgePropertyGroups.foreach(name =>
80+
require(edgeGroups.contains(name), s"Edge property group $name does not exist"))
81+
82+
val vertices = vertexPropertyGroups
83+
.map(name => vertexGroups(name).getData(vertexGroupFilters(name)))
84+
.reduce(_ union _)
85+
86+
val edges = edgePropertyGroups
87+
.map(name => edgeGroups(name).getData(edgeGroupFilters(name)))
88+
.reduce(_ union _)
89+
90+
GraphFrame(vertices, edges)
91+
}
92+
93+
/**
94+
* Projects a bipartite graph onto one of its parts, creating edges between vertices that share
95+
* neighbors in the other part. Drops the property group used for projection through and returns
96+
* a new property graph.
97+
*
98+
* @param leftBiGraphPart
99+
* Name of the vertex property group to project onto
100+
* @param rightBiGraphPart
101+
* Name of the vertex property group to project through
102+
* @param edgeGroup
103+
* Name of the edge property group connecting the two parts
104+
* @param newEdgeWeight
105+
* Optional function that takes two weight columns (Column objects) of edges as input and
106+
* returns a new weight column. If None, a default weight of 1.0 is used for all projected
107+
* edges.
108+
* @return
109+
* A new PropertyGraphFrame containing the projected graph
110+
*/
111+
def projectionBy(
112+
leftBiGraphPart: String,
113+
rightBiGraphPart: String,
114+
edgeGroup: String,
115+
newEdgeWeight: Option[(Column, Column) => Column] = None): PropertyGraphFrame = {
116+
require(
117+
edgeGroups(edgeGroup).srcPropertyGroup.name == leftBiGraphPart,
118+
s"Edge Property Group should have $leftBiGraphPart source group but has ${edgeGroups(edgeGroup).srcPropertyGroup.name}")
119+
require(
120+
edgeGroups(edgeGroup).dstPropertyGroup.name == rightBiGraphPart,
121+
s"Edge Property Group should have $rightBiGraphPart destination group but has ${edgeGroups(edgeGroup).dstPropertyGroup.name}")
122+
val keptVPropertyGroups = vertexPropertyGroups.filterNot(g => g.name == rightBiGraphPart)
123+
val keptEPropertyGroups = edgesPropertyGroups.filterNot(g => g.name == edgeGroup)
124+
val oldGroup = edgeGroups(edgeGroup)
125+
val oldEdgesData = oldGroup.data
126+
127+
// Create new edges by joining vertices through their common neighbors
128+
val projectedEdges = oldEdgesData
129+
.as("e1")
130+
.join(oldEdgesData.as("e2"), col("e1.dst") === col("e2.dst"))
131+
.where("e1.src < e2.src")
132+
.select(
133+
col("e1.src").alias(GraphFrame.SRC),
134+
col("e2.src").alias(GraphFrame.DST),
135+
newEdgeWeight match {
136+
case Some(newEdgeFunc) =>
137+
newEdgeFunc(
138+
col(s"e1.${oldGroup.weightColumnName}"),
139+
col(s"e2.${oldGroup.weightColumnName}")).alias(GraphFrame.WEIGHT)
140+
case None => lit(1.0).alias(GraphFrame.WEIGHT)
141+
})
142+
143+
val newEdgeGroup = EdgePropertyGroup(
144+
name = s"projected_$edgeGroup",
145+
data = projectedEdges,
146+
srcPropertyGroup = vertexGroups(leftBiGraphPart),
147+
dstPropertyGroup = vertexGroups(leftBiGraphPart),
148+
isDirected = false,
149+
srcColumnName = GraphFrame.SRC,
150+
dstColumnName = GraphFrame.DST,
151+
weightColumnName = GraphFrame.WEIGHT)
152+
153+
PropertyGraphFrame(keptVPropertyGroups, keptEPropertyGroups :+ newEdgeGroup)
154+
}
155+
156+
/**
157+
* Joins the vertices data with the specified vertex property groups to produce a unified
158+
* DataFrame. Each vertex property group defines how the data should be structured and filtered.
159+
*
160+
* @param verticesData
161+
* The DataFrame containing the vertices data to join. It must include vertex properties and
162+
* the group identifiers to filter and map. It is expected to be an output of calling graph
163+
* algorithms on GraphFrame, made by the method toGraphFrame.
164+
* @param vertexGroups
165+
* A sequence of vertex group names that are to be joined. Each name must correspond to a
166+
* valid vertex property group defined in the PropertyGraphFrame.
167+
* @return
168+
* A DataFrame representing the unified vertices data where each group has been appropriately
169+
* filtered, joined, and processed based on its configuration.
170+
* @throws IllegalArgumentException
171+
* If any of the specified vertex group names do not exist in the PropertyGraphFrame
172+
* configuration.
173+
*/
174+
def joinVertices(verticesData: DataFrame, vertexGroups: Seq[String]): DataFrame = {
175+
require(vertexGroups.forall(this.vertexGroups.contains))
176+
vertexGroups
177+
.map { vg: String =>
178+
{
179+
val associatedGroup = this.vertexGroups(vg)
180+
val filteredForGroup = verticesData.filter(col(PROPERTY_GROUP_COL_NAME) === lit(vg))
181+
if (associatedGroup.applyMaskOnId) {
182+
associatedGroup.internalIdMapping
183+
.join(filteredForGroup, Seq(GraphFrame.ID), "left")
184+
.drop(GraphFrame.ID)
185+
} else {
186+
associatedGroup
187+
.getData()
188+
.join(filteredForGroup, GraphFrame.ID, "left")
189+
.withColumnRenamed(GraphFrame.ID, EXTERNAL_ID)
190+
}
191+
}
192+
}
193+
.reduce(_ union _)
194+
}
195+
}
196+
197+
object PropertyGraphFrame {
198+
199+
/**
200+
* A constant representing the column name used for property grouping. It is used within the
201+
* context of a property graph structure to manage or identify property group associations.
202+
*/
203+
val PROPERTY_GROUP_COL_NAME = "property_group"
204+
205+
/**
206+
* A constant representing the column name used for external identifiers. It serves as a key to
207+
* associate external data or entities within the context of a property graph structure.
208+
*/
209+
val EXTERNAL_ID = "external_id"
210+
}

0 commit comments

Comments
 (0)