|
| 1 | +package org.graphframes.propertygraph |
| 2 | + |
| 3 | +import org.apache.spark.sql.Column |
| 4 | +import org.apache.spark.sql.DataFrame |
| 5 | +import org.apache.spark.sql.functions.col |
| 6 | +import org.apache.spark.sql.functions.lit |
| 7 | +import org.graphframes.GraphFrame |
| 8 | +import org.graphframes.propertygraph.property.EdgePropertyGroup |
| 9 | +import org.graphframes.propertygraph.property.VertexPropertyGroup |
| 10 | + |
| 11 | +/** |
| 12 | + * A high-level abstraction for working with property graphs that simplifies interaction with the |
| 13 | + * GraphFrames library. |
| 14 | + * |
| 15 | + * PropertyGraphFrame serves as a logical structure that manages collections of vertex and edge |
| 16 | + * property groups, providing a user-friendly API for graph operations. It handles various |
| 17 | + * internal complexities such as: |
| 18 | + * - ID conversion and collision prevention |
| 19 | + * - Management of directed/undirected graph representations |
| 20 | + * - Handling of weighted/unweighted edges |
| 21 | + * - Data consistency across different property groups |
| 22 | + * |
| 23 | + * The class maintains separate collections for vertex and edge properties, allowing for flexible |
| 24 | + * graph construction while ensuring data integrity. Each property (vertex or edge) handles its |
| 25 | + * data internally, while this class provides a simplified interface for working with the |
| 26 | + * underlying GraphFrame structure. |
| 27 | + * |
| 28 | + * @param vertexPropertyGroups |
| 29 | + * Sequence of vertex property groups that define the graph's vertices |
| 30 | + * @param edgesPropertyGroups |
| 31 | + * Sequence of edge property groups that define the graph's edges |
| 32 | + */ |
| 33 | +case class PropertyGraphFrame( |
| 34 | + vertexPropertyGroups: Seq[VertexPropertyGroup], |
| 35 | + edgesPropertyGroups: Seq[EdgePropertyGroup]) { |
| 36 | + import PropertyGraphFrame._ |
| 37 | + lazy private val vertexGroups: Map[String, VertexPropertyGroup] = |
| 38 | + vertexPropertyGroups.map(pg => pg.name -> pg).toMap |
| 39 | + lazy private val edgeGroups: Map[String, EdgePropertyGroup] = |
| 40 | + edgesPropertyGroups.map(pg => pg.name -> pg).toMap |
| 41 | + |
| 42 | + /** |
| 43 | + * Converts a heterogeneous property graph into a unified GraphFrame representation. |
| 44 | + * |
| 45 | + * This method transforms a property graph that may contain multiple vertex types and both |
| 46 | + * directed and undirected edges into a single GraphFrame object where all vertices and edges |
| 47 | + * share the same schema. The conversion process handles: |
| 48 | + * |
| 49 | + * - Internal ID generation and collision prevention by hashing vertex/edge IDs with their |
| 50 | + * group names |
| 51 | + * - Merging of different vertex types into a unified vertex DataFrame |
| 52 | + * - Conversion of directed/undirected edge relationships into a consistent edge DataFrame |
| 53 | + * - Filtering of vertices and edges based on provided predicates |
| 54 | + * |
| 55 | + * The method allows selecting a subset of property groups and applying filters to control which |
| 56 | + * data is included in the final GraphFrame. |
| 57 | + * |
| 58 | + * @param vertexPropertyGroups |
| 59 | + * Sequence of vertex property group names to include in the GraphFrame |
| 60 | + * @param edgePropertyGroups |
| 61 | + * Sequence of edge property group names to include in the GraphFrame |
| 62 | + * @param edgeGroupFilters |
| 63 | + * Map of edge property group names to filter predicates (Column expressions) |
| 64 | + * @param vertexGroupFilters |
| 65 | + * Map of vertex property group names to filter predicates (Column expressions) |
| 66 | + * @return |
| 67 | + * A GraphFrame containing the unified representation of the selected and filtered property |
| 68 | + * groups |
| 69 | + * @throws IllegalArgumentException |
| 70 | + * if any specified property group name doesn't exist |
| 71 | + */ |
| 72 | + def toGraphFrame( |
| 73 | + vertexPropertyGroups: Seq[String], |
| 74 | + edgePropertyGroups: Seq[String], |
| 75 | + edgeGroupFilters: Map[String, Column], |
| 76 | + vertexGroupFilters: Map[String, Column]): GraphFrame = { |
| 77 | + vertexPropertyGroups.foreach(name => |
| 78 | + require(vertexGroups.contains(name), s"Vertex property group $name does not exist")) |
| 79 | + edgePropertyGroups.foreach(name => |
| 80 | + require(edgeGroups.contains(name), s"Edge property group $name does not exist")) |
| 81 | + |
| 82 | + val vertices = vertexPropertyGroups |
| 83 | + .map(name => vertexGroups(name).getData(vertexGroupFilters(name))) |
| 84 | + .reduce(_ union _) |
| 85 | + |
| 86 | + val edges = edgePropertyGroups |
| 87 | + .map(name => edgeGroups(name).getData(edgeGroupFilters(name))) |
| 88 | + .reduce(_ union _) |
| 89 | + |
| 90 | + GraphFrame(vertices, edges) |
| 91 | + } |
| 92 | + |
| 93 | + /** |
| 94 | + * Projects a bipartite graph onto one of its parts, creating edges between vertices that share |
| 95 | + * neighbors in the other part. Drops the property group used for projection through and returns |
| 96 | + * a new property graph. |
| 97 | + * |
| 98 | + * @param leftBiGraphPart |
| 99 | + * Name of the vertex property group to project onto |
| 100 | + * @param rightBiGraphPart |
| 101 | + * Name of the vertex property group to project through |
| 102 | + * @param edgeGroup |
| 103 | + * Name of the edge property group connecting the two parts |
| 104 | + * @param newEdgeWeight |
| 105 | + * Optional function that takes two weight columns (Column objects) of edges as input and |
| 106 | + * returns a new weight column. If None, a default weight of 1.0 is used for all projected |
| 107 | + * edges. |
| 108 | + * @return |
| 109 | + * A new PropertyGraphFrame containing the projected graph |
| 110 | + */ |
| 111 | + def projectionBy( |
| 112 | + leftBiGraphPart: String, |
| 113 | + rightBiGraphPart: String, |
| 114 | + edgeGroup: String, |
| 115 | + newEdgeWeight: Option[(Column, Column) => Column] = None): PropertyGraphFrame = { |
| 116 | + require( |
| 117 | + edgeGroups(edgeGroup).srcPropertyGroup.name == leftBiGraphPart, |
| 118 | + s"Edge Property Group should have $leftBiGraphPart source group but has ${edgeGroups(edgeGroup).srcPropertyGroup.name}") |
| 119 | + require( |
| 120 | + edgeGroups(edgeGroup).dstPropertyGroup.name == rightBiGraphPart, |
| 121 | + s"Edge Property Group should have $rightBiGraphPart destination group but has ${edgeGroups(edgeGroup).dstPropertyGroup.name}") |
| 122 | + val keptVPropertyGroups = vertexPropertyGroups.filterNot(g => g.name == rightBiGraphPart) |
| 123 | + val keptEPropertyGroups = edgesPropertyGroups.filterNot(g => g.name == edgeGroup) |
| 124 | + val oldGroup = edgeGroups(edgeGroup) |
| 125 | + val oldEdgesData = oldGroup.data |
| 126 | + |
| 127 | + // Create new edges by joining vertices through their common neighbors |
| 128 | + val projectedEdges = oldEdgesData |
| 129 | + .as("e1") |
| 130 | + .join(oldEdgesData.as("e2"), col("e1.dst") === col("e2.dst")) |
| 131 | + .where("e1.src < e2.src") |
| 132 | + .select( |
| 133 | + col("e1.src").alias(GraphFrame.SRC), |
| 134 | + col("e2.src").alias(GraphFrame.DST), |
| 135 | + newEdgeWeight match { |
| 136 | + case Some(newEdgeFunc) => |
| 137 | + newEdgeFunc( |
| 138 | + col(s"e1.${oldGroup.weightColumnName}"), |
| 139 | + col(s"e2.${oldGroup.weightColumnName}")).alias(GraphFrame.WEIGHT) |
| 140 | + case None => lit(1.0).alias(GraphFrame.WEIGHT) |
| 141 | + }) |
| 142 | + |
| 143 | + val newEdgeGroup = EdgePropertyGroup( |
| 144 | + name = s"projected_$edgeGroup", |
| 145 | + data = projectedEdges, |
| 146 | + srcPropertyGroup = vertexGroups(leftBiGraphPart), |
| 147 | + dstPropertyGroup = vertexGroups(leftBiGraphPart), |
| 148 | + isDirected = false, |
| 149 | + srcColumnName = GraphFrame.SRC, |
| 150 | + dstColumnName = GraphFrame.DST, |
| 151 | + weightColumnName = GraphFrame.WEIGHT) |
| 152 | + |
| 153 | + PropertyGraphFrame(keptVPropertyGroups, keptEPropertyGroups :+ newEdgeGroup) |
| 154 | + } |
| 155 | + |
| 156 | + /** |
| 157 | + * Joins the vertices data with the specified vertex property groups to produce a unified |
| 158 | + * DataFrame. Each vertex property group defines how the data should be structured and filtered. |
| 159 | + * |
| 160 | + * @param verticesData |
| 161 | + * The DataFrame containing the vertices data to join. It must include vertex properties and |
| 162 | + * the group identifiers to filter and map. It is expected to be an output of calling graph |
| 163 | + * algorithms on GraphFrame, made by the method toGraphFrame. |
| 164 | + * @param vertexGroups |
| 165 | + * A sequence of vertex group names that are to be joined. Each name must correspond to a |
| 166 | + * valid vertex property group defined in the PropertyGraphFrame. |
| 167 | + * @return |
| 168 | + * A DataFrame representing the unified vertices data where each group has been appropriately |
| 169 | + * filtered, joined, and processed based on its configuration. |
| 170 | + * @throws IllegalArgumentException |
| 171 | + * If any of the specified vertex group names do not exist in the PropertyGraphFrame |
| 172 | + * configuration. |
| 173 | + */ |
| 174 | + def joinVertices(verticesData: DataFrame, vertexGroups: Seq[String]): DataFrame = { |
| 175 | + require(vertexGroups.forall(this.vertexGroups.contains)) |
| 176 | + vertexGroups |
| 177 | + .map { vg: String => |
| 178 | + { |
| 179 | + val associatedGroup = this.vertexGroups(vg) |
| 180 | + val filteredForGroup = verticesData.filter(col(PROPERTY_GROUP_COL_NAME) === lit(vg)) |
| 181 | + if (associatedGroup.applyMaskOnId) { |
| 182 | + associatedGroup.internalIdMapping |
| 183 | + .join(filteredForGroup, Seq(GraphFrame.ID), "left") |
| 184 | + .drop(GraphFrame.ID) |
| 185 | + } else { |
| 186 | + associatedGroup |
| 187 | + .getData() |
| 188 | + .join(filteredForGroup, GraphFrame.ID, "left") |
| 189 | + .withColumnRenamed(GraphFrame.ID, EXTERNAL_ID) |
| 190 | + } |
| 191 | + } |
| 192 | + } |
| 193 | + .reduce(_ union _) |
| 194 | + } |
| 195 | +} |
| 196 | + |
| 197 | +object PropertyGraphFrame { |
| 198 | + |
| 199 | + /** |
| 200 | + * A constant representing the column name used for property grouping. It is used within the |
| 201 | + * context of a property graph structure to manage or identify property group associations. |
| 202 | + */ |
| 203 | + val PROPERTY_GROUP_COL_NAME = "property_group" |
| 204 | + |
| 205 | + /** |
| 206 | + * A constant representing the column name used for external identifiers. It serves as a key to |
| 207 | + * associate external data or entities within the context of a property graph structure. |
| 208 | + */ |
| 209 | + val EXTERNAL_ID = "external_id" |
| 210 | +} |
0 commit comments