Problem
At the moment we are doing left-outer join of non-null messages with all the vertices in the graph:
val verticesWithMsg = currentVertices.join(newAggMsgDF, Seq(ID), "left_outer")
And on each iteration we are generating all the triplets and trying to generate all the messages for all the vertices without considering is the vertex active or not. It tends to a huge performance degradation especially for the first iterations of Pregel or for iterations when it is almost converging.
An example of impl of Shortest Paths with GraphFrame Pregel may looks like:

But even on iteration when we have only 1 non-null message we are trying to generate messages for all the triplets!
Possible Solution
- Use
left-join instead of inner one when updating currentVertices (currentVertices = graph.vertices.join(vertexUpdateColDF, ID) -> currentVertices = graph.vertices.join(vertexUpdateColDF, ID, "left-outer")
- Introduce to the
currentVertices a flag "isActive = Pregel.msg.isNotNull". Use this flag to choose only active vertices to generate triplets and messages (val tripletsDF = currentVertices.select(...).join(...) -> val tripletsDF = currentVertices.filter(col("isActive")).select(...).join(...))
In that case we will generate new messages only for triplets where there was an update on the last iteration instead of re-generating all the messages for all the triplets just to realize that are empty.
Problem
At the moment we are doing
left-outerjoin of non-null messages with all the vertices in the graph:And on each iteration we are generating all the triplets and trying to generate all the messages for all the vertices without considering is the vertex active or not. It tends to a huge performance degradation especially for the first iterations of Pregel or for iterations when it is almost converging.
An example of impl of Shortest Paths with GraphFrame Pregel may looks like:

But even on iteration when we have only 1 non-null message we are trying to generate messages for all the triplets!
Possible Solution
left-joininstead of inner one when updatingcurrentVertices(currentVertices = graph.vertices.join(vertexUpdateColDF, ID)->currentVertices = graph.vertices.join(vertexUpdateColDF, ID, "left-outer")currentVerticesa flag "isActive = Pregel.msg.isNotNull". Use this flag to choose only active vertices to generate triplets and messages (val tripletsDF = currentVertices.select(...).join(...)->val tripletsDF = currentVertices.filter(col("isActive")).select(...).join(...))In that case we will generate new messages only for triplets where there was an update on the last iteration instead of re-generating all the messages for all the triplets just to realize that are empty.