From be302dbaaa42fa0194d94793abd73f9680959e7e Mon Sep 17 00:00:00 2001 From: joelrobin18 Date: Sat, 27 Dec 2025 23:23:55 +0530 Subject: [PATCH 1/3] feat(pregel): add requiredSrcColumns/requiredDstColumns API for triplet memory optimization Signed-off-by: joelrobin18 --- connect/src/main/protobuf/graphframes.proto | 4 + .../graphframes/GraphFramesConnectUtils.scala | 13 ++ .../org/graphframes/lib/DetectingCycles.scala | 4 + .../graphframes/lib/LabelPropagation.scala | 3 + .../scala/org/graphframes/lib/Pregel.scala | 66 +++++++++- .../org/graphframes/lib/ShortestPaths.scala | 3 + .../org/graphframes/lib/PregelSuite.scala | 123 ++++++++++++++++++ .../graphframes/connect/graphframes_client.py | 40 ++++++ .../connect/proto/graphframes_pb2.py | 32 ++--- .../connect/proto/graphframes_pb2.pyi | 22 ++-- python/graphframes/lib/pregel.py | 47 +++++++ 11 files changed, 329 insertions(+), 28 deletions(-) diff --git a/connect/src/main/protobuf/graphframes.proto b/connect/src/main/protobuf/graphframes.proto index 0e3e9322a..3ec6ba934 100644 --- a/connect/src/main/protobuf/graphframes.proto +++ b/connect/src/main/protobuf/graphframes.proto @@ -158,6 +158,10 @@ message Pregel { optional ColumnOrExpression initial_active_expr = 13; optional ColumnOrExpression update_active_expr = 14; optional bool skip_messages_from_non_active = 15; + // Required columns for triplet construction (memory optimization) + // Column names separated by comma + optional string required_src_columns = 16; + optional string required_dst_columns = 17; } message ShortestPaths { diff --git a/connect/src/main/scala/org/apache/spark/sql/graphframes/GraphFramesConnectUtils.scala b/connect/src/main/scala/org/apache/spark/sql/graphframes/GraphFramesConnectUtils.scala index c37abb9ad..cbb5203ba 100644 --- a/connect/src/main/scala/org/apache/spark/sql/graphframes/GraphFramesConnectUtils.scala +++ b/connect/src/main/scala/org/apache/spark/sql/graphframes/GraphFramesConnectUtils.scala @@ -346,6 +346,19 @@ object GraphFramesConnectUtils { pregel = pregel.setEarlyStopping(pregelProto.getEarlyStopping) } + // Handle required columns for triplet optimization (comma-separated) + if (pregelProto.hasRequiredSrcColumns) { + val cols = + pregelProto.getRequiredSrcColumns.split(",").map(_.trim).filter(_.nonEmpty).toSeq + if (cols.nonEmpty) pregel = pregel.requiredSrcColumns(cols.head, cols.tail: _*) + } + + if (pregelProto.hasRequiredDstColumns) { + val cols = + pregelProto.getRequiredDstColumns.split(",").map(_.trim).filter(_.nonEmpty).toSeq + if (cols.nonEmpty) pregel = pregel.requiredDstColumns(cols.head, cols.tail: _*) + } + pregel.run() } case proto.GraphFramesAPI.MethodCase.SHORTEST_PATHS => { diff --git a/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala b/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala index c68884f30..8809df7ee 100644 --- a/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala +++ b/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala @@ -96,6 +96,10 @@ object DetectingCycles { .withVertexColumn(storedSeqCol, initSequences, updateSequences) .withVertexColumn(foundSeqCol, foundSequences, updateFound) .aggMsgs(filterOutSequences) + // Memory optimization: only include required columns in triplets + // For cycle detection, we only need the sequences from source vertex + // and just the ID from destination vertex (ID is always included) + .requiredSrcColumns(storedSeqCol) .run() } } diff --git a/core/src/main/scala/org/graphframes/lib/LabelPropagation.scala b/core/src/main/scala/org/graphframes/lib/LabelPropagation.scala index a50fa1a63..6cc0866e4 100644 --- a/core/src/main/scala/org/graphframes/lib/LabelPropagation.scala +++ b/core/src/main/scala/org/graphframes/lib/LabelPropagation.scala @@ -119,6 +119,9 @@ private object LabelPropagation { .setUpdateActiveVertexExpression(col(LABEL_ID) =!= keyWithMaxValue(Pregel.msg)) .setUseLocalCheckpoints(useLocalCheckpoints) .setIntermediateStorageLevel(intermediateStorageLevel) + // Memory optimization: only include required columns in triplets + .requiredSrcColumns(LABEL_ID) + .requiredDstColumns(LABEL_ID) if (isDirected) { pregel = pregel.sendMsgToDst(Pregel.src(LABEL_ID)) diff --git a/core/src/main/scala/org/graphframes/lib/Pregel.scala b/core/src/main/scala/org/graphframes/lib/Pregel.scala index 0d63fe8f8..9232b1f82 100644 --- a/core/src/main/scala/org/graphframes/lib/Pregel.scala +++ b/core/src/main/scala/org/graphframes/lib/Pregel.scala @@ -100,6 +100,11 @@ class Pregel(val graph: GraphFrame) private val sendMsgs = collection.mutable.ListBuffer.empty[(Column, Column)] private var aggMsgsCol: Column = null + // Required columns for source and destination vertices in triplets + // When empty, all columns are selected (default behavior) + private val requiredSrcColumnsList = collection.mutable.ListBuffer.empty[String] + private val requiredDstColumnsList = collection.mutable.ListBuffer.empty[String] + /** Sets the max number of iterations (default: 10). */ def setMaxIter(value: Int): this.type = { maxIter = value @@ -291,6 +296,54 @@ class Pregel(val graph: GraphFrame) this } + /** + * Specifies which source vertex columns are required when constructing triplets. + * + * By default, all source vertex columns are included in triplets, which can create large + * intermediate datasets for algorithms with significant state (e.g., cycle detection, random + * walks). Use this method to reduce memory usage by specifying only the columns that are + * actually needed by the sendMsgToSrc and sendMsgToDst expressions. + * + * The ID column and the active flag column (if used) are always included automatically. + * + * @param colName + * the first required source vertex column name + * @param colNames + * additional required source vertex column names + * @see + * [[requiredDstColumns]] + */ + def requiredSrcColumns(colName: String, colNames: String*): this.type = { + requiredSrcColumnsList.clear() + requiredSrcColumnsList += colName + requiredSrcColumnsList ++= colNames + this + } + + /** + * Specifies which destination vertex columns are required when constructing triplets. + * + * By default, all destination vertex columns are included in triplets, which can create large + * intermediate datasets for algorithms with significant state (e.g., cycle detection, random + * walks). Use this method to reduce memory usage by specifying only the columns that are + * actually needed by the sendMsgToSrc and sendMsgToDst expressions. + * + * The ID column and the active flag column (if used) are always included automatically. + * + * @param colName + * the first required destination vertex column name + * @param colNames + * additional required destination vertex column names + * @see + * [[requiredSrcColumns]] + */ + def requiredDstColumns(colName: String, colNames: String*): this.type = { + requiredDstColumnsList.clear() + requiredDstColumnsList += colName + requiredDstColumnsList ++= colNames + this + } + /** * Defines how messages are aggregated after grouped by target vertex IDs. * @@ -364,16 +417,25 @@ class Pregel(val graph: GraphFrame) } } + // Columns to include in triplet structs (ID + active flag always included if specified) + val srcCols = + if (requiredSrcColumnsList.isEmpty) Seq(col("*")) + else (Seq(ID, Pregel.ACTIVE_FLAG_COL) ++ requiredSrcColumnsList).distinct.map(col) + val dstCols = + if (requiredDstColumnsList.isEmpty) Seq(col("*")) + else (Seq(ID, Pregel.ACTIVE_FLAG_COL) ++ requiredDstColumnsList).distinct.map(col) + breakable { while (iteration <= maxIter) { logInfo(s"start Pregel iteration $iteration / $maxIter") val currRoundPersistent = scala.collection.mutable.Queue[DataFrame]() currRoundPersistent.enqueue(currentVertices.persist(intermediateStorageLevel)) + var tripletsDF = currentVertices - .select(struct(col("*")).as(SRC)) + .select(struct(srcCols: _*).as(SRC)) .join(edges, Pregel.src(ID) === col("edge_src")) .join( - currentVertices.select(struct(col("*")).as(DST)), + currentVertices.select(struct(dstCols: _*).as(DST)), col("edge_dst") === Pregel.dst(ID)) .drop(col("edge_src"), col("edge_dst")) diff --git a/core/src/main/scala/org/graphframes/lib/ShortestPaths.scala b/core/src/main/scala/org/graphframes/lib/ShortestPaths.scala index 09288c2bd..ef3ff77d0 100644 --- a/core/src/main/scala/org/graphframes/lib/ShortestPaths.scala +++ b/core/src/main/scala/org/graphframes/lib/ShortestPaths.scala @@ -233,6 +233,9 @@ private object ShortestPaths extends Logging { .setSkipMessagesFromNonActiveVertices(true) .setCheckpointInterval(checkpointInterval) .setUseLocalCheckpoints(useLocalCheckpoints) + // Memory optimization: only include required columns in triplets + .requiredSrcColumns(DISTANCE_ID) + .requiredDstColumns(DISTANCE_ID) // Experimental feature if (isDirected) { diff --git a/core/src/test/scala/org/graphframes/lib/PregelSuite.scala b/core/src/test/scala/org/graphframes/lib/PregelSuite.scala index 64d4aee0f..a1e7ca00a 100644 --- a/core/src/test/scala/org/graphframes/lib/PregelSuite.scala +++ b/core/src/test/scala/org/graphframes/lib/PregelSuite.scala @@ -170,4 +170,127 @@ class PregelSuite extends SparkFunSuite with GraphFrameTestSparkContext { .map(r => r.getAs[Long]("id") -> r.getAs[Int]("newColumn")) .toMap === Map(1L -> 2, 2L -> 1, 3L -> 2, 4L -> 1)) } + + test("requiredSrcColumns - only specified columns are used in triplets") { + // Test that requiredSrcColumns correctly limits the columns in triplets + // This is a memory optimization test - we verify the result is correct + // with only required source columns + + val edges = Seq((0L, 1L), (1L, 2L), (2L, 4L), (2L, 0L), (3L, 4L), (4L, 0L), (4L, 2L)) + .toDF("src", "dst") + .cache() + val vertices = GraphFrame.fromEdges(edges).outDegrees.cache() + val numVertices = vertices.count() + val graph = GraphFrame(vertices, edges) + + val alpha = 0.15 + // PageRank only needs "rank" and "outDegree" from source vertex + val ranks = graph.pregel + .setMaxIter(5) + .withVertexColumn( + "rank", + lit(1.0 / numVertices), + coalesce(Pregel.msg, lit(0.0)) * (1.0 - alpha) + alpha / numVertices) + .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) + .aggMsgs(sum(Pregel.msg)) + .requiredSrcColumns("rank", "outDegree") + .run() + + val result = ranks + .sort(col("id")) + .select("rank") + .as[Double] + .collect() + assert(result.sum === 1.0 +- 1e-6) + val expected = Seq(0.245, 0.224, 0.303, 0.03, 0.197) + result.zip(expected).foreach { case (r, e) => + assert(r === e +- 1e-3) + } + } + + test("requiredDstColumns - only specified columns are used in triplets") { + // Test that requiredDstColumns correctly limits the columns in triplets + // Reverse chain propagation where we only need dst("value") from destination + + val n = 5 + val verDF = (1 to n).toDF("id").repartition(3) + val edgeDF = (1 until n) + .map(x => (x + 1, x)) + .toDF("src", "dst") + .repartition(3) + + val graph = GraphFrame(verDF, edgeDF) + + val resultDF = graph.pregel + .setMaxIter(n - 1) + .withVertexColumn( + "value", + when(col("id") === lit(1), lit(1)).otherwise(lit(0)), + when(Pregel.msg > col("value"), Pregel.msg).otherwise(col("value"))) + .sendMsgToSrc(when(Pregel.dst("value") =!= Pregel.src("value"), Pregel.dst("value"))) + .aggMsgs(max(Pregel.msg)) + .requiredDstColumns("value") // Only need "value" from destination + .run() + + assert(resultDF.sort("id").select("value").as[Int].collect() === Array.fill(n)(1)) + } + + test("requiredSrcColumns and requiredDstColumns together") { + // Test using both requiredSrcColumns and requiredDstColumns + // Chain propagation where we need "value" from both src and dst + + val n = 5 + val verDF = (1 to n).toDF("id").repartition(3) + val edgeDF = (1 until n) + .map(x => (x, x + 1)) + .toDF("src", "dst") + .repartition(3) + + val graph = GraphFrame(verDF, edgeDF) + + val resultDF = graph.pregel + .setMaxIter(n - 1) + .withVertexColumn( + "value", + when(col("id") === lit(1), lit(1)).otherwise(lit(0)), + when(Pregel.msg > col("value"), Pregel.msg).otherwise(col("value"))) + .sendMsgToDst(when(Pregel.dst("value") =!= Pregel.src("value"), Pregel.src("value"))) + .aggMsgs(max(Pregel.msg)) + .requiredSrcColumns("value") // Only need "value" from source + .requiredDstColumns("value") // Only need "value" from destination + .run() + + assert(resultDF.sort("id").select("value").as[Int].collect() === Array.fill(n)(1)) + } + + test("requiredSrcColumns with empty list uses all columns (default behavior)") { + // Verify that not calling requiredSrcColumns means all columns are used + // This is the same as the original page rank test + + val edges = Seq((0L, 1L), (1L, 2L), (2L, 4L), (2L, 0L), (3L, 4L), (4L, 0L), (4L, 2L)) + .toDF("src", "dst") + .cache() + val vertices = GraphFrame.fromEdges(edges).outDegrees.cache() + val numVertices = vertices.count() + val graph = GraphFrame(vertices, edges) + + val alpha = 0.15 + val ranks = graph.pregel + .setMaxIter(5) + .withVertexColumn( + "rank", + lit(1.0 / numVertices), + coalesce(Pregel.msg, lit(0.0)) * (1.0 - alpha) + alpha / numVertices) + .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) + .aggMsgs(sum(Pregel.msg)) + // No requiredSrcColumns or requiredDstColumns - should use all columns + .run() + + val result = ranks + .sort(col("id")) + .select("rank") + .as[Double] + .collect() + assert(result.sum === 1.0 +- 1e-6) + } } diff --git a/python/graphframes/connect/graphframes_client.py b/python/graphframes/connect/graphframes_client.py index 8a983dc2e..a3ef0f965 100644 --- a/python/graphframes/connect/graphframes_client.py +++ b/python/graphframes/connect/graphframes_client.py @@ -57,6 +57,8 @@ def __init__(self, graph: "GraphFrameConnect") -> None: self._update_active_expr: Column | str | None = None self._stop_if_all_non_active = False self._skip_messages_from_non_active = False + self._required_src_columns: list[str] = [] + self._required_dst_columns: list[str] = [] def setMaxIter(self, value: int) -> Self: self._max_iter = value @@ -117,6 +119,32 @@ def setIntermediateStorageLevel(self, storage_level: StorageLevel) -> Self: self._storage_level = storage_level return self + def requiredSrcColumns(self, colName: str, *colNames: str) -> Self: + """Specifies which source vertex columns are required when constructing triplets. + + By default, all source vertex columns are included in triplets, which can create large + intermediate datasets for algorithms with significant state. Use this method to reduce + memory usage by specifying only the columns that are actually needed. + + :param colName: the first required source vertex column name + :param colNames: additional required source vertex column names + """ + self._required_src_columns = [colName] + list(colNames) + return self + + def requiredDstColumns(self, colName: str, *colNames: str) -> Self: + """Specifies which destination vertex columns are required when constructing triplets. + + By default, all destination vertex columns are included in triplets, which can create large + intermediate datasets for algorithms with significant state. Use this method to reduce + memory usage by specifying only the columns that are actually needed. + + :param colName: the first required destination vertex column name + :param colNames: additional required destination vertex column names + """ + self._required_dst_columns = [colName] + list(colNames) + return self + def run(self) -> DataFrame: @final class Pregel(LogicalPlan): @@ -137,6 +165,8 @@ def __init__( update_active_col: Column | str | None, stop_if_all_non_active: bool, skip_message_from_non_active: bool, + required_src_columns: list[str], + required_dst_columns: list[str], vertices: DataFrame, edges: DataFrame, ) -> None: @@ -156,6 +186,8 @@ def __init__( self.update_active_expr = update_active_col self.stop_if_all_non_active = stop_if_all_non_active self.skip_message_from_non_active = skip_message_from_non_active + self.required_src_columns = required_src_columns + self.required_dst_columns = required_dst_columns self.vertices = vertices self.edges = edges @@ -185,6 +217,12 @@ def plan(self, session: SparkConnectClient) -> proto.Relation: update_active_expr=make_column_or_expr(self.update_active_expr, session) if self.update_active_expr is not None else None, + required_src_columns=",".join(self.required_src_columns) + if self.required_src_columns + else None, + required_dst_columns=",".join(self.required_dst_columns) + if self.required_dst_columns + else None, ) pb_message = pb.GraphFramesAPI( vertices=dataframe_to_proto(self.vertices, session), @@ -221,6 +259,8 @@ def plan(self, session: SparkConnectClient) -> proto.Relation: update_active_col=self._update_active_expr, stop_if_all_non_active=self._stop_if_all_non_active, skip_message_from_non_active=self._skip_messages_from_non_active, + required_src_columns=self._required_src_columns, + required_dst_columns=self._required_dst_columns, storage_level=self._storage_level, vertices=self.graph._vertices, edges=self.graph._edges, diff --git a/python/graphframes/connect/proto/graphframes_pb2.py b/python/graphframes/connect/proto/graphframes_pb2.py index a5a9b7d33..fdb3aabab 100644 --- a/python/graphframes/connect/proto/graphframes_pb2.py +++ b/python/graphframes/connect/proto/graphframes_pb2.py @@ -19,7 +19,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x11graphframes.proto\x12\x1dorg.graphframes.connect.proto"\xbb\x0e\n\x0eGraphFramesAPI\x12\x1a\n\x08vertices\x18\x01 \x01(\x0cR\x08vertices\x12\x14\n\x05\x65\x64ges\x18\x02 \x01(\x0cR\x05\x65\x64ges\x12\x61\n\x12\x61ggregate_messages\x18\x03 \x01(\x0b\x32\x30.org.graphframes.connect.proto.AggregateMessagesH\x00R\x11\x61ggregateMessages\x12\x36\n\x03\x62\x66s\x18\x04 \x01(\x0b\x32".org.graphframes.connect.proto.BFSH\x00R\x03\x62\x66s\x12g\n\x14\x63onnected_components\x18\x05 \x01(\x0b\x32\x32.org.graphframes.connect.proto.ConnectedComponentsH\x00R\x13\x63onnectedComponents\x12k\n\x16\x64rop_isolated_vertices\x18\x06 \x01(\x0b\x32\x33.org.graphframes.connect.proto.DropIsolatedVerticesH\x00R\x14\x64ropIsolatedVertices\x12[\n\x10\x64\x65tecting_cycles\x18\x07 \x01(\x0b\x32..org.graphframes.connect.proto.DetectingCyclesH\x00R\x0f\x64\x65tectingCycles\x12O\n\x0c\x66ilter_edges\x18\x08 \x01(\x0b\x32*.org.graphframes.connect.proto.FilterEdgesH\x00R\x0b\x66ilterEdges\x12X\n\x0f\x66ilter_vertices\x18\t \x01(\x0b\x32-.org.graphframes.connect.proto.FilterVerticesH\x00R\x0e\x66ilterVertices\x12\x39\n\x04\x66ind\x18\n \x01(\x0b\x32#.org.graphframes.connect.proto.FindH\x00R\x04\x66ind\x12^\n\x11label_propagation\x18\x0b \x01(\x0b\x32/.org.graphframes.connect.proto.LabelPropagationH\x00R\x10labelPropagation\x12\x46\n\tpage_rank\x18\x0c \x01(\x0b\x32\'.org.graphframes.connect.proto.PageRankH\x00R\x08pageRank\x12\x84\x01\n\x1fparallel_personalized_page_rank\x18\r \x01(\x0b\x32;.org.graphframes.connect.proto.ParallelPersonalizedPageRankH\x00R\x1cparallelPersonalizedPageRank\x12w\n\x1apower_iteration_clustering\x18\x0e \x01(\x0b\x32\x37.org.graphframes.connect.proto.PowerIterationClusteringH\x00R\x18powerIterationClustering\x12?\n\x06pregel\x18\x0f \x01(\x0b\x32%.org.graphframes.connect.proto.PregelH\x00R\x06pregel\x12U\n\x0eshortest_paths\x18\x10 \x01(\x0b\x32,.org.graphframes.connect.proto.ShortestPathsH\x00R\rshortestPaths\x12\x80\x01\n\x1dstrongly_connected_components\x18\x11 \x01(\x0b\x32:.org.graphframes.connect.proto.StronglyConnectedComponentsH\x00R\x1bstronglyConnectedComponents\x12P\n\rsvd_plus_plus\x18\x12 \x01(\x0b\x32*.org.graphframes.connect.proto.SVDPlusPlusH\x00R\x0bsvdPlusPlus\x12U\n\x0etriangle_count\x18\x13 \x01(\x0b\x32,.org.graphframes.connect.proto.TriangleCountH\x00R\rtriangleCount\x12\x45\n\x08triplets\x18\x14 \x01(\x0b\x32\'.org.graphframes.connect.proto.TripletsH\x00R\x08triplets\x12<\n\x05kcore\x18\x15 \x01(\x0b\x32$.org.graphframes.connect.proto.KCoreH\x00R\x05kcore\x12H\n\x03mis\x18\x16 \x01(\x0b\x32\x34.org.graphframes.connect.proto.MaximalIndependentSetH\x00R\x03misB\x08\n\x06method"\xd7\x02\n\x0cStorageLevel\x12\x1d\n\tdisk_only\x18\x01 \x01(\x08H\x00R\x08\x64iskOnly\x12 \n\x0b\x64isk_only_2\x18\x02 \x01(\x08H\x00R\tdiskOnly2\x12 \n\x0b\x64isk_only_3\x18\x03 \x01(\x08H\x00R\tdiskOnly3\x12(\n\x0fmemory_and_disk\x18\x04 \x01(\x08H\x00R\rmemoryAndDisk\x12+\n\x11memory_and_disk_2\x18\x05 \x01(\x08H\x00R\x0ememoryAndDisk2\x12\x33\n\x15memory_and_disk_deser\x18\x06 \x01(\x08H\x00R\x12memoryAndDiskDeser\x12!\n\x0bmemory_only\x18\x07 \x01(\x08H\x00R\nmemoryOnly\x12$\n\rmemory_only_2\x18\x08 \x01(\x08H\x00R\x0bmemoryOnly2B\x0f\n\rstorage_level"M\n\x12\x43olumnOrExpression\x12\x12\n\x03\x63ol\x18\x01 \x01(\x0cH\x00R\x03\x63ol\x12\x14\n\x04\x65xpr\x18\x02 \x01(\tH\x00R\x04\x65xprB\r\n\x0b\x63ol_or_expr"P\n\x0eStringOrLongID\x12\x19\n\x07long_id\x18\x01 \x01(\x03H\x00R\x06longId\x12\x1d\n\tstring_id\x18\x02 \x01(\tH\x00R\x08stringIdB\x04\n\x02id"\xee\x02\n\x11\x41ggregateMessages\x12J\n\x07\x61gg_col\x18\x01 \x03(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x06\x61ggCol\x12Q\n\x0bsend_to_src\x18\x02 \x03(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\tsendToSrc\x12Q\n\x0bsend_to_dst\x18\x03 \x03(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\tsendToDst\x12U\n\rstorage_level\x18\x04 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\x9d\x02\n\x03\x42\x46S\x12N\n\tfrom_expr\x18\x01 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x08\x66romExpr\x12J\n\x07to_expr\x18\x02 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x06toExpr\x12R\n\x0b\x65\x64ge_filter\x18\x03 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\nedgeFilter\x12&\n\x0fmax_path_length\x18\x04 \x01(\x05R\rmaxPathLength"\x86\x03\n\x13\x43onnectedComponents\x12\x1c\n\talgorithm\x18\x01 \x01(\tR\talgorithm\x12/\n\x13\x63heckpoint_interval\x18\x02 \x01(\x05R\x12\x63heckpointInterval\x12/\n\x13\x62roadcast_threshold\x18\x03 \x01(\x05R\x12\x62roadcastThreshold\x12\x37\n\x18use_labels_as_components\x18\x04 \x01(\x08R\x15useLabelsAsComponents\x12\x32\n\x15use_local_checkpoints\x18\x05 \x01(\x08R\x13useLocalCheckpoints\x12\x19\n\x08max_iter\x18\x06 \x01(\x05R\x07maxIter\x12U\n\rstorage_level\x18\x07 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\xdf\x01\n\x0f\x44\x65tectingCycles\x12\x32\n\x15use_local_checkpoints\x18\x01 \x01(\x08R\x13useLocalCheckpoints\x12/\n\x13\x63heckpoint_interval\x18\x02 \x01(\x05R\x12\x63heckpointInterval\x12U\n\rstorage_level\x18\x03 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\x16\n\x14\x44ropIsolatedVertices"^\n\x0b\x46ilterEdges\x12O\n\tcondition\x18\x01 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\tcondition"a\n\x0e\x46ilterVertices\x12O\n\tcondition\x18\x02 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\tcondition" \n\x04\x46ind\x12\x18\n\x07pattern\x18\x01 \x01(\tR\x07pattern"\x99\x02\n\x10LabelPropagation\x12\x1c\n\talgorithm\x18\x01 \x01(\tR\talgorithm\x12\x19\n\x08max_iter\x18\x02 \x01(\x05R\x07maxIter\x12\x32\n\x15use_local_checkpoints\x18\x03 \x01(\x08R\x13useLocalCheckpoints\x12/\n\x13\x63heckpoint_interval\x18\x04 \x01(\x05R\x12\x63heckpointInterval\x12U\n\rstorage_level\x18\x05 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\xe2\x01\n\x08PageRank\x12+\n\x11reset_probability\x18\x01 \x01(\x01R\x10resetProbability\x12O\n\tsource_id\x18\x02 \x01(\x0b\x32-.org.graphframes.connect.proto.StringOrLongIDH\x00R\x08sourceId\x88\x01\x01\x12\x1e\n\x08max_iter\x18\x03 \x01(\x05H\x01R\x07maxIter\x88\x01\x01\x12\x15\n\x03tol\x18\x04 \x01(\x01H\x02R\x03tol\x88\x01\x01\x42\x0c\n\n_source_idB\x0b\n\t_max_iterB\x06\n\x04_tol"\xb4\x01\n\x1cParallelPersonalizedPageRank\x12+\n\x11reset_probability\x18\x01 \x01(\x01R\x10resetProbability\x12L\n\nsource_ids\x18\x02 \x03(\x0b\x32-.org.graphframes.connect.proto.StringOrLongIDR\tsourceIds\x12\x19\n\x08max_iter\x18\x03 \x01(\x05R\x07maxIter"v\n\x18PowerIterationClustering\x12\x0c\n\x01k\x18\x01 \x01(\x05R\x01k\x12\x19\n\x08max_iter\x18\x02 \x01(\x05R\x07maxIter\x12"\n\nweight_col\x18\x03 \x01(\tH\x00R\tweightCol\x88\x01\x01\x42\r\n\x0b_weight_col"\xe6\t\n\x06Pregel\x12L\n\x08\x61gg_msgs\x18\x01 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x07\x61ggMsgs\x12X\n\x0fsend_msg_to_dst\x18\x02 \x03(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x0csendMsgToDst\x12X\n\x0fsend_msg_to_src\x18\x03 \x03(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x0csendMsgToSrc\x12/\n\x13\x63heckpoint_interval\x18\x04 \x01(\x05R\x12\x63heckpointInterval\x12\x19\n\x08max_iter\x18\x05 \x01(\x05R\x07maxIter\x12.\n\x13\x61\x64\x64itional_col_name\x18\x06 \x01(\tR\x11\x61\x64\x64itionalColName\x12g\n\x16\x61\x64\x64itional_col_initial\x18\x07 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x14\x61\x64\x64itionalColInitial\x12_\n\x12\x61\x64\x64itional_col_upd\x18\x08 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x10\x61\x64\x64itionalColUpd\x12*\n\x0e\x65\x61rly_stopping\x18\t \x01(\x08H\x00R\rearlyStopping\x88\x01\x01\x12\x32\n\x15use_local_checkpoints\x18\n \x01(\x08R\x13useLocalCheckpoints\x12U\n\rstorage_level\x18\x0b \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x01R\x0cstorageLevel\x88\x01\x01\x12\x37\n\x16stop_if_all_non_active\x18\x0c \x01(\x08H\x02R\x12stopIfAllNonActive\x88\x01\x01\x12\x66\n\x13initial_active_expr\x18\r \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionH\x03R\x11initialActiveExpr\x88\x01\x01\x12\x64\n\x12update_active_expr\x18\x0e \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionH\x04R\x10updateActiveExpr\x88\x01\x01\x12\x45\n\x1dskip_messages_from_non_active\x18\x0f \x01(\x08H\x05R\x19skipMessagesFromNonActive\x88\x01\x01\x42\x11\n\x0f_early_stoppingB\x10\n\x0e_storage_levelB\x19\n\x17_stop_if_all_non_activeB\x16\n\x14_initial_active_exprB\x15\n\x13_update_active_exprB \n\x1e_skip_messages_from_non_active"\xfe\x02\n\rShortestPaths\x12K\n\tlandmarks\x18\x01 \x03(\x0b\x32-.org.graphframes.connect.proto.StringOrLongIDR\tlandmarks\x12\x1c\n\talgorithm\x18\x02 \x01(\tR\talgorithm\x12\x32\n\x15use_local_checkpoints\x18\x03 \x01(\x08R\x13useLocalCheckpoints\x12/\n\x13\x63heckpoint_interval\x18\x04 \x01(\x05R\x12\x63heckpointInterval\x12U\n\rstorage_level\x18\x05 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x12$\n\x0bis_directed\x18\x06 \x01(\x08H\x01R\nisDirected\x88\x01\x01\x42\x10\n\x0e_storage_levelB\x0e\n\x0c_is_directed"8\n\x1bStronglyConnectedComponents\x12\x19\n\x08max_iter\x18\x01 \x01(\x05R\x07maxIter"\xd6\x01\n\x0bSVDPlusPlus\x12\x12\n\x04rank\x18\x01 \x01(\x05R\x04rank\x12\x19\n\x08max_iter\x18\x02 \x01(\x05R\x07maxIter\x12\x1b\n\tmin_value\x18\x03 \x01(\x01R\x08minValue\x12\x1b\n\tmax_value\x18\x04 \x01(\x01R\x08maxValue\x12\x16\n\x06gamma1\x18\x05 \x01(\x01R\x06gamma1\x12\x16\n\x06gamma2\x18\x06 \x01(\x01R\x06gamma2\x12\x16\n\x06gamma6\x18\x07 \x01(\x01R\x06gamma6\x12\x16\n\x06gamma7\x18\x08 \x01(\x01R\x06gamma7"x\n\rTriangleCount\x12U\n\rstorage_level\x18\x01 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\n\n\x08Triplets"\xf9\x01\n\x15MaximalIndependentSet\x12/\n\x13\x63heckpoint_interval\x18\x01 \x01(\x05R\x12\x63heckpointInterval\x12U\n\rstorage_level\x18\x02 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x12\x32\n\x15use_local_checkpoints\x18\x03 \x01(\x08R\x13useLocalCheckpoints\x12\x12\n\x04seed\x18\x04 \x01(\x03R\x04seedB\x10\n\x0e_storage_level"\xd5\x01\n\x05KCore\x12\x32\n\x15use_local_checkpoints\x18\x01 \x01(\x08R\x13useLocalCheckpoints\x12/\n\x13\x63heckpoint_interval\x18\x02 \x01(\x05R\x12\x63heckpointInterval\x12U\n\rstorage_level\x18\x03 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_levelB\xd2\x01\n!com.org.graphframes.connect.protoB\x10GraphframesProtoH\x01P\x01\xa0\x01\x01\xa2\x02\x04OGCP\xaa\x02\x1dOrg.Graphframes.Connect.Proto\xca\x02\x1dOrg\\Graphframes\\Connect\\Proto\xe2\x02)Org\\Graphframes\\Connect\\Proto\\GPBMetadata\xea\x02 Org::Graphframes::Connect::Protob\x06proto3' + b'\n\x11graphframes.proto\x12\x1dorg.graphframes.connect.proto"\xbb\x0e\n\x0eGraphFramesAPI\x12\x1a\n\x08vertices\x18\x01 \x01(\x0cR\x08vertices\x12\x14\n\x05\x65\x64ges\x18\x02 \x01(\x0cR\x05\x65\x64ges\x12\x61\n\x12\x61ggregate_messages\x18\x03 \x01(\x0b\x32\x30.org.graphframes.connect.proto.AggregateMessagesH\x00R\x11\x61ggregateMessages\x12\x36\n\x03\x62\x66s\x18\x04 \x01(\x0b\x32".org.graphframes.connect.proto.BFSH\x00R\x03\x62\x66s\x12g\n\x14\x63onnected_components\x18\x05 \x01(\x0b\x32\x32.org.graphframes.connect.proto.ConnectedComponentsH\x00R\x13\x63onnectedComponents\x12k\n\x16\x64rop_isolated_vertices\x18\x06 \x01(\x0b\x32\x33.org.graphframes.connect.proto.DropIsolatedVerticesH\x00R\x14\x64ropIsolatedVertices\x12[\n\x10\x64\x65tecting_cycles\x18\x07 \x01(\x0b\x32..org.graphframes.connect.proto.DetectingCyclesH\x00R\x0f\x64\x65tectingCycles\x12O\n\x0c\x66ilter_edges\x18\x08 \x01(\x0b\x32*.org.graphframes.connect.proto.FilterEdgesH\x00R\x0b\x66ilterEdges\x12X\n\x0f\x66ilter_vertices\x18\t \x01(\x0b\x32-.org.graphframes.connect.proto.FilterVerticesH\x00R\x0e\x66ilterVertices\x12\x39\n\x04\x66ind\x18\n \x01(\x0b\x32#.org.graphframes.connect.proto.FindH\x00R\x04\x66ind\x12^\n\x11label_propagation\x18\x0b \x01(\x0b\x32/.org.graphframes.connect.proto.LabelPropagationH\x00R\x10labelPropagation\x12\x46\n\tpage_rank\x18\x0c \x01(\x0b\x32\'.org.graphframes.connect.proto.PageRankH\x00R\x08pageRank\x12\x84\x01\n\x1fparallel_personalized_page_rank\x18\r \x01(\x0b\x32;.org.graphframes.connect.proto.ParallelPersonalizedPageRankH\x00R\x1cparallelPersonalizedPageRank\x12w\n\x1apower_iteration_clustering\x18\x0e \x01(\x0b\x32\x37.org.graphframes.connect.proto.PowerIterationClusteringH\x00R\x18powerIterationClustering\x12?\n\x06pregel\x18\x0f \x01(\x0b\x32%.org.graphframes.connect.proto.PregelH\x00R\x06pregel\x12U\n\x0eshortest_paths\x18\x10 \x01(\x0b\x32,.org.graphframes.connect.proto.ShortestPathsH\x00R\rshortestPaths\x12\x80\x01\n\x1dstrongly_connected_components\x18\x11 \x01(\x0b\x32:.org.graphframes.connect.proto.StronglyConnectedComponentsH\x00R\x1bstronglyConnectedComponents\x12P\n\rsvd_plus_plus\x18\x12 \x01(\x0b\x32*.org.graphframes.connect.proto.SVDPlusPlusH\x00R\x0bsvdPlusPlus\x12U\n\x0etriangle_count\x18\x13 \x01(\x0b\x32,.org.graphframes.connect.proto.TriangleCountH\x00R\rtriangleCount\x12\x45\n\x08triplets\x18\x14 \x01(\x0b\x32\'.org.graphframes.connect.proto.TripletsH\x00R\x08triplets\x12H\n\x03mis\x18\x16 \x01(\x0b\x32\x34.org.graphframes.connect.proto.MaximalIndependentSetH\x00R\x03mis\x12<\n\x05kcore\x18\x15 \x01(\x0b\x32$.org.graphframes.connect.proto.KCoreH\x00R\x05kcoreB\x08\n\x06method"\xd7\x02\n\x0cStorageLevel\x12\x1d\n\tdisk_only\x18\x01 \x01(\x08H\x00R\x08\x64iskOnly\x12 \n\x0b\x64isk_only_2\x18\x02 \x01(\x08H\x00R\tdiskOnly2\x12 \n\x0b\x64isk_only_3\x18\x03 \x01(\x08H\x00R\tdiskOnly3\x12(\n\x0fmemory_and_disk\x18\x04 \x01(\x08H\x00R\rmemoryAndDisk\x12+\n\x11memory_and_disk_2\x18\x05 \x01(\x08H\x00R\x0ememoryAndDisk2\x12\x33\n\x15memory_and_disk_deser\x18\x06 \x01(\x08H\x00R\x12memoryAndDiskDeser\x12!\n\x0bmemory_only\x18\x07 \x01(\x08H\x00R\nmemoryOnly\x12$\n\rmemory_only_2\x18\x08 \x01(\x08H\x00R\x0bmemoryOnly2B\x0f\n\rstorage_level"M\n\x12\x43olumnOrExpression\x12\x12\n\x03\x63ol\x18\x01 \x01(\x0cH\x00R\x03\x63ol\x12\x14\n\x04\x65xpr\x18\x02 \x01(\tH\x00R\x04\x65xprB\r\n\x0b\x63ol_or_expr"P\n\x0eStringOrLongID\x12\x19\n\x07long_id\x18\x01 \x01(\x03H\x00R\x06longId\x12\x1d\n\tstring_id\x18\x02 \x01(\tH\x00R\x08stringIdB\x04\n\x02id"\xee\x02\n\x11\x41ggregateMessages\x12J\n\x07\x61gg_col\x18\x01 \x03(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x06\x61ggCol\x12Q\n\x0bsend_to_src\x18\x02 \x03(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\tsendToSrc\x12Q\n\x0bsend_to_dst\x18\x03 \x03(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\tsendToDst\x12U\n\rstorage_level\x18\x04 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\x9d\x02\n\x03\x42\x46S\x12N\n\tfrom_expr\x18\x01 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x08\x66romExpr\x12J\n\x07to_expr\x18\x02 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x06toExpr\x12R\n\x0b\x65\x64ge_filter\x18\x03 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\nedgeFilter\x12&\n\x0fmax_path_length\x18\x04 \x01(\x05R\rmaxPathLength"\x86\x03\n\x13\x43onnectedComponents\x12\x1c\n\talgorithm\x18\x01 \x01(\tR\talgorithm\x12/\n\x13\x63heckpoint_interval\x18\x02 \x01(\x05R\x12\x63heckpointInterval\x12/\n\x13\x62roadcast_threshold\x18\x03 \x01(\x05R\x12\x62roadcastThreshold\x12\x37\n\x18use_labels_as_components\x18\x04 \x01(\x08R\x15useLabelsAsComponents\x12\x32\n\x15use_local_checkpoints\x18\x05 \x01(\x08R\x13useLocalCheckpoints\x12\x19\n\x08max_iter\x18\x06 \x01(\x05R\x07maxIter\x12U\n\rstorage_level\x18\x07 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\xdf\x01\n\x0f\x44\x65tectingCycles\x12\x32\n\x15use_local_checkpoints\x18\x01 \x01(\x08R\x13useLocalCheckpoints\x12/\n\x13\x63heckpoint_interval\x18\x02 \x01(\x05R\x12\x63heckpointInterval\x12U\n\rstorage_level\x18\x03 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\x16\n\x14\x44ropIsolatedVertices"^\n\x0b\x46ilterEdges\x12O\n\tcondition\x18\x01 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\tcondition"a\n\x0e\x46ilterVertices\x12O\n\tcondition\x18\x02 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\tcondition" \n\x04\x46ind\x12\x18\n\x07pattern\x18\x01 \x01(\tR\x07pattern"\x99\x02\n\x10LabelPropagation\x12\x1c\n\talgorithm\x18\x01 \x01(\tR\talgorithm\x12\x19\n\x08max_iter\x18\x02 \x01(\x05R\x07maxIter\x12\x32\n\x15use_local_checkpoints\x18\x03 \x01(\x08R\x13useLocalCheckpoints\x12/\n\x13\x63heckpoint_interval\x18\x04 \x01(\x05R\x12\x63heckpointInterval\x12U\n\rstorage_level\x18\x05 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\xe2\x01\n\x08PageRank\x12+\n\x11reset_probability\x18\x01 \x01(\x01R\x10resetProbability\x12O\n\tsource_id\x18\x02 \x01(\x0b\x32-.org.graphframes.connect.proto.StringOrLongIDH\x00R\x08sourceId\x88\x01\x01\x12\x1e\n\x08max_iter\x18\x03 \x01(\x05H\x01R\x07maxIter\x88\x01\x01\x12\x15\n\x03tol\x18\x04 \x01(\x01H\x02R\x03tol\x88\x01\x01\x42\x0c\n\n_source_idB\x0b\n\t_max_iterB\x06\n\x04_tol"\xb4\x01\n\x1cParallelPersonalizedPageRank\x12+\n\x11reset_probability\x18\x01 \x01(\x01R\x10resetProbability\x12L\n\nsource_ids\x18\x02 \x03(\x0b\x32-.org.graphframes.connect.proto.StringOrLongIDR\tsourceIds\x12\x19\n\x08max_iter\x18\x03 \x01(\x05R\x07maxIter"v\n\x18PowerIterationClustering\x12\x0c\n\x01k\x18\x01 \x01(\x05R\x01k\x12\x19\n\x08max_iter\x18\x02 \x01(\x05R\x07maxIter\x12"\n\nweight_col\x18\x03 \x01(\tH\x00R\tweightCol\x88\x01\x01\x42\r\n\x0b_weight_col"\x86\x0b\n\x06Pregel\x12L\n\x08\x61gg_msgs\x18\x01 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x07\x61ggMsgs\x12X\n\x0fsend_msg_to_dst\x18\x02 \x03(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x0csendMsgToDst\x12X\n\x0fsend_msg_to_src\x18\x03 \x03(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x0csendMsgToSrc\x12/\n\x13\x63heckpoint_interval\x18\x04 \x01(\x05R\x12\x63heckpointInterval\x12\x19\n\x08max_iter\x18\x05 \x01(\x05R\x07maxIter\x12.\n\x13\x61\x64\x64itional_col_name\x18\x06 \x01(\tR\x11\x61\x64\x64itionalColName\x12g\n\x16\x61\x64\x64itional_col_initial\x18\x07 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x14\x61\x64\x64itionalColInitial\x12_\n\x12\x61\x64\x64itional_col_upd\x18\x08 \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionR\x10\x61\x64\x64itionalColUpd\x12*\n\x0e\x65\x61rly_stopping\x18\t \x01(\x08H\x00R\rearlyStopping\x88\x01\x01\x12\x32\n\x15use_local_checkpoints\x18\n \x01(\x08R\x13useLocalCheckpoints\x12U\n\rstorage_level\x18\x0b \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x01R\x0cstorageLevel\x88\x01\x01\x12\x37\n\x16stop_if_all_non_active\x18\x0c \x01(\x08H\x02R\x12stopIfAllNonActive\x88\x01\x01\x12\x66\n\x13initial_active_expr\x18\r \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionH\x03R\x11initialActiveExpr\x88\x01\x01\x12\x64\n\x12update_active_expr\x18\x0e \x01(\x0b\x32\x31.org.graphframes.connect.proto.ColumnOrExpressionH\x04R\x10updateActiveExpr\x88\x01\x01\x12\x45\n\x1dskip_messages_from_non_active\x18\x0f \x01(\x08H\x05R\x19skipMessagesFromNonActive\x88\x01\x01\x12\x35\n\x14required_src_columns\x18\x10 \x01(\tH\x06R\x12requiredSrcColumns\x88\x01\x01\x12\x35\n\x14required_dst_columns\x18\x11 \x01(\tH\x07R\x12requiredDstColumns\x88\x01\x01\x42\x11\n\x0f_early_stoppingB\x10\n\x0e_storage_levelB\x19\n\x17_stop_if_all_non_activeB\x16\n\x14_initial_active_exprB\x15\n\x13_update_active_exprB \n\x1e_skip_messages_from_non_activeB\x17\n\x15_required_src_columnsB\x17\n\x15_required_dst_columns"\xfe\x02\n\rShortestPaths\x12K\n\tlandmarks\x18\x01 \x03(\x0b\x32-.org.graphframes.connect.proto.StringOrLongIDR\tlandmarks\x12\x1c\n\talgorithm\x18\x02 \x01(\tR\talgorithm\x12\x32\n\x15use_local_checkpoints\x18\x03 \x01(\x08R\x13useLocalCheckpoints\x12/\n\x13\x63heckpoint_interval\x18\x04 \x01(\x05R\x12\x63heckpointInterval\x12U\n\rstorage_level\x18\x05 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x12$\n\x0bis_directed\x18\x06 \x01(\x08H\x01R\nisDirected\x88\x01\x01\x42\x10\n\x0e_storage_levelB\x0e\n\x0c_is_directed"8\n\x1bStronglyConnectedComponents\x12\x19\n\x08max_iter\x18\x01 \x01(\x05R\x07maxIter"\xd6\x01\n\x0bSVDPlusPlus\x12\x12\n\x04rank\x18\x01 \x01(\x05R\x04rank\x12\x19\n\x08max_iter\x18\x02 \x01(\x05R\x07maxIter\x12\x1b\n\tmin_value\x18\x03 \x01(\x01R\x08minValue\x12\x1b\n\tmax_value\x18\x04 \x01(\x01R\x08maxValue\x12\x16\n\x06gamma1\x18\x05 \x01(\x01R\x06gamma1\x12\x16\n\x06gamma2\x18\x06 \x01(\x01R\x06gamma2\x12\x16\n\x06gamma6\x18\x07 \x01(\x01R\x06gamma6\x12\x16\n\x06gamma7\x18\x08 \x01(\x01R\x06gamma7"x\n\rTriangleCount\x12U\n\rstorage_level\x18\x01 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\n\n\x08Triplets"\xf9\x01\n\x15MaximalIndependentSet\x12/\n\x13\x63heckpoint_interval\x18\x01 \x01(\x05R\x12\x63heckpointInterval\x12U\n\rstorage_level\x18\x02 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x12\x32\n\x15use_local_checkpoints\x18\x03 \x01(\x08R\x13useLocalCheckpoints\x12\x12\n\x04seed\x18\x04 \x01(\x03R\x04seedB\x10\n\x0e_storage_level"\xd5\x01\n\x05KCore\x12\x32\n\x15use_local_checkpoints\x18\x01 \x01(\x08R\x13useLocalCheckpoints\x12/\n\x13\x63heckpoint_interval\x18\x02 \x01(\x05R\x12\x63heckpointInterval\x12U\n\rstorage_level\x18\x03 \x01(\x0b\x32+.org.graphframes.connect.proto.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_levelB\xd2\x01\n!com.org.graphframes.connect.protoB\x10GraphframesProtoH\x01P\x01\xa0\x01\x01\xa2\x02\x04OGCP\xaa\x02\x1dOrg.Graphframes.Connect.Proto\xca\x02\x1dOrg\\Graphframes\\Connect\\Proto\xe2\x02)Org\\Graphframes\\Connect\\Proto\\GPBMetadata\xea\x02 Org::Graphframes::Connect::Protob\x06proto3' ) _globals = globals() @@ -63,19 +63,19 @@ _globals["_POWERITERATIONCLUSTERING"]._serialized_start = 4638 _globals["_POWERITERATIONCLUSTERING"]._serialized_end = 4756 _globals["_PREGEL"]._serialized_start = 4759 - _globals["_PREGEL"]._serialized_end = 6013 - _globals["_SHORTESTPATHS"]._serialized_start = 6016 - _globals["_SHORTESTPATHS"]._serialized_end = 6398 - _globals["_STRONGLYCONNECTEDCOMPONENTS"]._serialized_start = 6400 - _globals["_STRONGLYCONNECTEDCOMPONENTS"]._serialized_end = 6456 - _globals["_SVDPLUSPLUS"]._serialized_start = 6459 - _globals["_SVDPLUSPLUS"]._serialized_end = 6673 - _globals["_TRIANGLECOUNT"]._serialized_start = 6675 - _globals["_TRIANGLECOUNT"]._serialized_end = 6795 - _globals["_TRIPLETS"]._serialized_start = 6797 - _globals["_TRIPLETS"]._serialized_end = 6807 - _globals["_MAXIMALINDEPENDENTSET"]._serialized_start = 6810 - _globals["_MAXIMALINDEPENDENTSET"]._serialized_end = 7059 - _globals["_KCORE"]._serialized_start = 7062 - _globals["_KCORE"]._serialized_end = 7275 + _globals["_PREGEL"]._serialized_end = 6173 + _globals["_SHORTESTPATHS"]._serialized_start = 6176 + _globals["_SHORTESTPATHS"]._serialized_end = 6558 + _globals["_STRONGLYCONNECTEDCOMPONENTS"]._serialized_start = 6560 + _globals["_STRONGLYCONNECTEDCOMPONENTS"]._serialized_end = 6616 + _globals["_SVDPLUSPLUS"]._serialized_start = 6619 + _globals["_SVDPLUSPLUS"]._serialized_end = 6833 + _globals["_TRIANGLECOUNT"]._serialized_start = 6835 + _globals["_TRIANGLECOUNT"]._serialized_end = 6955 + _globals["_TRIPLETS"]._serialized_start = 6957 + _globals["_TRIPLETS"]._serialized_end = 6967 + _globals["_MAXIMALINDEPENDENTSET"]._serialized_start = 6970 + _globals["_MAXIMALINDEPENDENTSET"]._serialized_end = 7219 + _globals["_KCORE"]._serialized_start = 7222 + _globals["_KCORE"]._serialized_end = 7435 # @@protoc_insertion_point(module_scope) diff --git a/python/graphframes/connect/proto/graphframes_pb2.pyi b/python/graphframes/connect/proto/graphframes_pb2.pyi index d772606ea..da06f806f 100644 --- a/python/graphframes/connect/proto/graphframes_pb2.pyi +++ b/python/graphframes/connect/proto/graphframes_pb2.pyi @@ -1,12 +1,8 @@ -from collections.abc import Iterable as _Iterable -from collections.abc import Mapping as _Mapping -from typing import ClassVar as _ClassVar -from typing import Optional as _Optional -from typing import Union as _Union - +from google.protobuf.internal import containers as _containers from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message -from google.protobuf.internal import containers as _containers +from collections.abc import Iterable as _Iterable, Mapping as _Mapping +from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor @@ -32,8 +28,8 @@ class GraphFramesAPI(_message.Message): SVD_PLUS_PLUS_FIELD_NUMBER: _ClassVar[int] TRIANGLE_COUNT_FIELD_NUMBER: _ClassVar[int] TRIPLETS_FIELD_NUMBER: _ClassVar[int] - KCORE_FIELD_NUMBER: _ClassVar[int] MIS_FIELD_NUMBER: _ClassVar[int] + KCORE_FIELD_NUMBER: _ClassVar[int] vertices: bytes edges: bytes aggregate_messages: AggregateMessages @@ -54,8 +50,8 @@ class GraphFramesAPI(_message.Message): svd_plus_plus: SVDPlusPlus triangle_count: TriangleCount triplets: Triplets - kcore: KCore mis: MaximalIndependentSet + kcore: KCore def __init__( self, vertices: _Optional[bytes] = ..., @@ -82,8 +78,8 @@ class GraphFramesAPI(_message.Message): svd_plus_plus: _Optional[_Union[SVDPlusPlus, _Mapping]] = ..., triangle_count: _Optional[_Union[TriangleCount, _Mapping]] = ..., triplets: _Optional[_Union[Triplets, _Mapping]] = ..., - kcore: _Optional[_Union[KCore, _Mapping]] = ..., mis: _Optional[_Union[MaximalIndependentSet, _Mapping]] = ..., + kcore: _Optional[_Union[KCore, _Mapping]] = ..., ) -> None: ... class StorageLevel(_message.Message): @@ -322,6 +318,8 @@ class Pregel(_message.Message): INITIAL_ACTIVE_EXPR_FIELD_NUMBER: _ClassVar[int] UPDATE_ACTIVE_EXPR_FIELD_NUMBER: _ClassVar[int] SKIP_MESSAGES_FROM_NON_ACTIVE_FIELD_NUMBER: _ClassVar[int] + REQUIRED_SRC_COLUMNS_FIELD_NUMBER: _ClassVar[int] + REQUIRED_DST_COLUMNS_FIELD_NUMBER: _ClassVar[int] agg_msgs: ColumnOrExpression send_msg_to_dst: _containers.RepeatedCompositeFieldContainer[ColumnOrExpression] send_msg_to_src: _containers.RepeatedCompositeFieldContainer[ColumnOrExpression] @@ -337,6 +335,8 @@ class Pregel(_message.Message): initial_active_expr: ColumnOrExpression update_active_expr: ColumnOrExpression skip_messages_from_non_active: bool + required_src_columns: str + required_dst_columns: str def __init__( self, agg_msgs: _Optional[_Union[ColumnOrExpression, _Mapping]] = ..., @@ -354,6 +354,8 @@ class Pregel(_message.Message): initial_active_expr: _Optional[_Union[ColumnOrExpression, _Mapping]] = ..., update_active_expr: _Optional[_Union[ColumnOrExpression, _Mapping]] = ..., skip_messages_from_non_active: _Optional[bool] = ..., + required_src_columns: _Optional[str] = ..., + required_dst_columns: _Optional[str] = ..., ) -> None: ... class ShortestPaths(_message.Message): diff --git a/python/graphframes/lib/pregel.py b/python/graphframes/lib/pregel.py index 205b347e9..3d7f7f2d0 100644 --- a/python/graphframes/lib/pregel.py +++ b/python/graphframes/lib/pregel.py @@ -23,6 +23,13 @@ from pyspark.storagelevel import StorageLevel from typing_extensions import Self +try: + # Spark 4 + from pyspark.sql.classic.column import _to_seq +except ImportError: + # Spark 3 + from pyspark.sql.column import _to_seq + from graphframes.classic.utils import storage_level_to_jvm @@ -229,6 +236,46 @@ def setIntermediateStorageLevel(self, storage_level: StorageLevel) -> Self: ) return self + def requiredSrcColumns(self, colName: str, *colNames: str) -> Self: + """Specifies which source vertex columns are required when constructing triplets. + + By default, all source vertex columns are included in triplets, which can create large + intermediate datasets for algorithms with significant state (e.g., cycle detection, + random walks). Use this method to reduce memory usage by specifying only the columns + that are actually needed by the sendMsgToSrc and sendMsgToDst expressions. + + The ID column and the active flag column (if used) are always included automatically. + + :param colName: the first required source vertex column name + :param colNames: additional required source vertex column names + + See also :func:`requiredDstColumns` + """ + self._java_obj.requiredSrcColumns( + colName, _to_seq(self.graph._spark.sparkContext, colNames) + ) + return self + + def requiredDstColumns(self, colName: str, *colNames: str) -> Self: + """Specifies which destination vertex columns are required when constructing triplets. + + By default, all destination vertex columns are included in triplets, which can create large + intermediate datasets for algorithms with significant state (e.g., cycle detection, + random walks). Use this method to reduce memory usage by specifying only the columns + that are actually needed by the sendMsgToSrc and sendMsgToDst expressions. + + The ID column and the active flag column (if used) are always included automatically. + + :param colName: the first required destination vertex column name + :param colNames: additional required destination vertex column names + + See also :func:`requiredSrcColumns` + """ + self._java_obj.requiredDstColumns( + colName, _to_seq(self.graph._spark.sparkContext, colNames) + ) + return self + def run(self) -> DataFrame: """Runs the defined Pregel algorithm. From c3fd80970f77a09defed5334d562c0f496e7bd98 Mon Sep 17 00:00:00 2001 From: joelrobin18 Date: Sat, 27 Dec 2025 23:29:10 +0530 Subject: [PATCH 2/3] lint Signed-off-by: joelrobin18 --- python/graphframes/connect/proto/graphframes_pb2.pyi | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/graphframes/connect/proto/graphframes_pb2.pyi b/python/graphframes/connect/proto/graphframes_pb2.pyi index da06f806f..f4dbc87c0 100644 --- a/python/graphframes/connect/proto/graphframes_pb2.pyi +++ b/python/graphframes/connect/proto/graphframes_pb2.pyi @@ -1,8 +1,12 @@ -from google.protobuf.internal import containers as _containers +from collections.abc import Iterable as _Iterable +from collections.abc import Mapping as _Mapping +from typing import ClassVar as _ClassVar +from typing import Optional as _Optional +from typing import Union as _Union + from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message -from collections.abc import Iterable as _Iterable, Mapping as _Mapping -from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union +from google.protobuf.internal import containers as _containers DESCRIPTOR: _descriptor.FileDescriptor From b77545704585f49584e861f09822fa9ad223c415 Mon Sep 17 00:00:00 2001 From: joelrobin18 Date: Mon, 29 Dec 2025 22:02:06 +0530 Subject: [PATCH 3/3] Feedbacks + Doc update Signed-off-by: joelrobin18 --- docs/src/04-user-guide/10-pregel.md | 30 +++++++++++++++++++ .../graphframes/connect/graphframes_client.py | 16 +++++----- python/graphframes/lib/pregel.py | 20 ++++++------- 3 files changed, 48 insertions(+), 18 deletions(-) diff --git a/docs/src/04-user-guide/10-pregel.md b/docs/src/04-user-guide/10-pregel.md index 6dc618a01..c62f2ffd0 100644 --- a/docs/src/04-user-guide/10-pregel.md +++ b/docs/src/04-user-guide/10-pregel.md @@ -48,6 +48,36 @@ val edgeWeight = Pregel.edge("weight") Under the hood, the passed name of the column will be resolved to get the corresponding element of the triplet structs. +### Memory Optimization for Triplets + +By default, all vertex columns are included when constructing triplets. For algorithms with large per-vertex state (e.g., cycle detection storing sequences, random walks), this can create huge intermediate datasets in memory. + +To reduce memory usage, you can specify only the columns that are actually needed using `requiredSrcColumns` and `requiredDstColumns`: + +```scala +graph.pregel + .withVertexColumn("distances", ...) + .sendMsgToDst(Pregel.src("distances")) // Only needs "distances" from source + .requiredSrcColumns("distances") // Only include "distances" in src struct + .requiredDstColumns("distances") // Only include "distances" in dst struct + .aggMsgs(...) + .run() +``` + +In Python: + +```python +graph.pregel \ + .withVertexColumn("distances", ...) \ + .sendMsgToDst(Pregel.src("distances")) \ + .required_src_columns("distances") \ + .required_dst_columns("distances") \ + .aggMsgs(...) \ + .run() +``` + +The `id` column and the active flag column (if used) are always included automatically, so you don't need to specify them. + ### Sending Messages GraphFrames Pregel API support arbitrary number of messages per vertex. Inside the Pregel API **graphs are always considered directed**. This means that if a vertex has an outgoing edge to another vertex, then the message will be sent to the destination vertex. To emulate the behavior of the undirected graph, the user can send the same message to both the source and the destination vertex. diff --git a/python/graphframes/connect/graphframes_client.py b/python/graphframes/connect/graphframes_client.py index a3ef0f965..da342ea22 100644 --- a/python/graphframes/connect/graphframes_client.py +++ b/python/graphframes/connect/graphframes_client.py @@ -119,30 +119,30 @@ def setIntermediateStorageLevel(self, storage_level: StorageLevel) -> Self: self._storage_level = storage_level return self - def requiredSrcColumns(self, colName: str, *colNames: str) -> Self: + def required_src_columns(self, col_name: str, *col_names: str) -> Self: """Specifies which source vertex columns are required when constructing triplets. By default, all source vertex columns are included in triplets, which can create large intermediate datasets for algorithms with significant state. Use this method to reduce memory usage by specifying only the columns that are actually needed. - :param colName: the first required source vertex column name - :param colNames: additional required source vertex column names + :param col_name: the first required source vertex column name + :param col_names: additional required source vertex column names """ - self._required_src_columns = [colName] + list(colNames) + self._required_src_columns = [col_name] + list(col_names) return self - def requiredDstColumns(self, colName: str, *colNames: str) -> Self: + def required_dst_columns(self, col_name: str, *col_names: str) -> Self: """Specifies which destination vertex columns are required when constructing triplets. By default, all destination vertex columns are included in triplets, which can create large intermediate datasets for algorithms with significant state. Use this method to reduce memory usage by specifying only the columns that are actually needed. - :param colName: the first required destination vertex column name - :param colNames: additional required destination vertex column names + :param col_name: the first required destination vertex column name + :param col_names: additional required destination vertex column names """ - self._required_dst_columns = [colName] + list(colNames) + self._required_dst_columns = [col_name] + list(col_names) return self def run(self) -> DataFrame: diff --git a/python/graphframes/lib/pregel.py b/python/graphframes/lib/pregel.py index 3d7f7f2d0..bf4029225 100644 --- a/python/graphframes/lib/pregel.py +++ b/python/graphframes/lib/pregel.py @@ -236,7 +236,7 @@ def setIntermediateStorageLevel(self, storage_level: StorageLevel) -> Self: ) return self - def requiredSrcColumns(self, colName: str, *colNames: str) -> Self: + def required_src_columns(self, col_name: str, *col_names: str) -> Self: """Specifies which source vertex columns are required when constructing triplets. By default, all source vertex columns are included in triplets, which can create large @@ -246,17 +246,17 @@ def requiredSrcColumns(self, colName: str, *colNames: str) -> Self: The ID column and the active flag column (if used) are always included automatically. - :param colName: the first required source vertex column name - :param colNames: additional required source vertex column names + :param col_name: the first required source vertex column name + :param col_names: additional required source vertex column names - See also :func:`requiredDstColumns` + See also :func:`required_dst_columns` """ self._java_obj.requiredSrcColumns( - colName, _to_seq(self.graph._spark.sparkContext, colNames) + col_name, _to_seq(self.graph._spark.sparkContext, col_names) ) return self - def requiredDstColumns(self, colName: str, *colNames: str) -> Self: + def required_dst_columns(self, col_name: str, *col_names: str) -> Self: """Specifies which destination vertex columns are required when constructing triplets. By default, all destination vertex columns are included in triplets, which can create large @@ -266,13 +266,13 @@ def requiredDstColumns(self, colName: str, *colNames: str) -> Self: The ID column and the active flag column (if used) are always included automatically. - :param colName: the first required destination vertex column name - :param colNames: additional required destination vertex column names + :param col_name: the first required destination vertex column name + :param col_names: additional required destination vertex column names - See also :func:`requiredSrcColumns` + See also :func:`required_src_columns` """ self._java_obj.requiredDstColumns( - colName, _to_seq(self.graph._spark.sparkContext, colNames) + col_name, _to_seq(self.graph._spark.sparkContext, col_names) ) return self