Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 74 additions & 10 deletions python/graphframes/graphframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,10 @@ def aggregateMessages(self, aggCol, sendToSrc=None, sendToDst=None):
# Standard algorithms

def connectedComponents(self, algorithm = "graphframes", checkpointInterval = 2,
broadcastThreshold = 1000000):
broadcastThreshold = 1000000,
intermediateStorageLevel = StorageLevel.MEMORY_AND_DISK,
intermediateGraphxVertexStorageLevel = StorageLevel.MEMORY_ONLY,
intermediateGraphxEdgeStorageLevel = StorageLevel.MEMORY_ONLY):
"""
Computes the connected components of the graph.

Expand All @@ -270,30 +273,53 @@ def connectedComponents(self, algorithm = "graphframes", checkpointInterval = 2,
:param checkpointInterval: checkpoint interval in terms of number of iterations (default: 2)
:param broadcastThreshold: broadcast threshold in propagating component assignments
(default: 1000000)
:param intermediateStorageLevel: storage level for intermediate datasets that require
multiple passes (default: ``MEMORY_AND_DISK``)
:param intermediateGraphxVertexStorageLevel: storage level for intermediate `Graph` vertices that
require multiple passes (default: `MEMORY_ONLY`). This parameter is only used when the
algorithm is set to "graphx"
:param intermediateGraphxEdgeStorageLevel: storage level for intermediate `Graph` edges that
require multiple passes (default: `MEMORY_ONLY`). This parameter is only used when the
algorithm is set to "graphx"

:return: DataFrame with new vertices column "component"
"""
jdf = self._jvm_graph.connectedComponents() \
.setAlgorithm(algorithm) \
.setCheckpointInterval(checkpointInterval) \
.setBroadcastThreshold(broadcastThreshold) \
.setIntermediateStorageLevel(self._sc._getJavaStorageLevel(intermediateStorageLevel)) \
.setIntermediateGraphxVertexStorageLevel(self._sc._getJavaStorageLevel(intermediateGraphxVertexStorageLevel)) \
.setIntermediateGraphxEdgeStorageLevel(self._sc._getJavaStorageLevel(intermediateGraphxEdgeStorageLevel)) \
.run()
return DataFrame(jdf, self._sqlContext)

def labelPropagation(self, maxIter):
def labelPropagation(self, maxIter, intermediateVertexStorageLevel = StorageLevel.MEMORY_ONLY,
intermediateEdgeStorageLevel = StorageLevel.MEMORY_ONLY):
"""
Runs static label propagation for detecting communities in networks.

See Scala documentation for more details.

:param maxIter: the number of iterations to be performed
:param intermediateVertexStorageLevel: storage level for intermediate `Graph` vertices that
require multiple passes (default: `MEMORY_ONLY`).
:param intermediateEdgeStorageLevel: storage level for intermediate `Graph` edges that
require multiple passes (default: `MEMORY_ONLY`).
:return: DataFrame with new vertices column "label"
"""
jdf = self._jvm_graph.labelPropagation().maxIter(maxIter).run()

javaVertexStorageLevel = self._sc._getJavaStorageLevel(intermediateVertexStorageLevel)
javaEdgeStorageLevel = self._sc._getJavaStorageLevel(intermediateEdgeStorageLevel)
jdf = self._jvm_graph.labelPropagation().maxIter(maxIter) \
.setIntermediateVertexStorageLevel(javaVertexStorageLevel) \
.setIntermediateEdgeStorageLevel(javaEdgeStorageLevel) \
.run()
return DataFrame(jdf, self._sqlContext)

def pageRank(self, resetProbability = 0.15, sourceId = None, maxIter = None,
tol = None):
tol = None, intermediateVertexStorageLevel = StorageLevel.MEMORY_ONLY,
intermediateEdgeStorageLevel = StorageLevel.MEMORY_ONLY):
"""
Runs the PageRank algorithm on the graph.
Note: Exactly one of fixed_num_iter or tolerance must be set.
Expand All @@ -306,8 +332,14 @@ def pageRank(self, resetProbability = 0.15, sourceId = None, maxIter = None,
of iterations. This may not be set if the `tol` parameter is set.
:param tol: If set, the algorithm is run until the given tolerance.
This may not be set if the `numIter` parameter is set.
:param intermediateVertexStorageLevel: storage level for intermediate `Graph` vertices that
require multiple passes (default: `MEMORY_ONLY`).
:param intermediateEdgeStorageLevel: storage level for intermediate `Graph` edges that
require multiple passes (default: `MEMORY_ONLY`).
:return: GraphFrame with new vertices column "pagerank" and new edges column "weight"
"""
javaVertexStorageLevel = self._sc._getJavaStorageLevel(intermediateVertexStorageLevel)
javaEdgeStorageLevel = self._sc._getJavaStorageLevel(intermediateEdgeStorageLevel)
builder = self._jvm_graph.pageRank().resetProbability(resetProbability)
if sourceId is not None:
builder = builder.sourceId(sourceId)
Expand All @@ -317,11 +349,15 @@ def pageRank(self, resetProbability = 0.15, sourceId = None, maxIter = None,
else:
assert tol is not None, "Exactly one of maxIter or tol should be set."
builder = builder.tol(tol)
jgf = builder.run()
jgf = builder.setIntermediateVertexStorageLevel(javaVertexStorageLevel) \
.setIntermediateEdgeStorageLevel(javaEdgeStorageLevel) \
.run()
return _from_java_gf(jgf, self._sqlContext)

def parallelPersonalizedPageRank(self, resetProbability = 0.15, sourceIds = None,
maxIter = None):
maxIter = None,
intermediateVertexStorageLevel = StorageLevel.MEMORY_ONLY,
intermediateEdgeStorageLevel = StorageLevel.MEMORY_ONLY):
"""
Run the personalized PageRank algorithm on the graph,
from the provided list of sources in parallel for a fixed number of iterations.
Expand All @@ -331,6 +367,10 @@ def parallelPersonalizedPageRank(self, resetProbability = 0.15, sourceIds = None
:param resetProbability: Probability of resetting to a random vertex
:param sourceIds: the source vertices for a personalized PageRank
:param maxIter: the fixed number of iterations this algorithm runs
:param intermediateVertexStorageLevel: storage level for intermediate `Graph` vertices that
require multiple passes (default: `MEMORY_ONLY`).
:param intermediateEdgeStorageLevel: storage level for intermediate `Graph` edges that
require multiple passes (default: `MEMORY_ONLY`).
:return: GraphFrame with new vertices column "pageranks" and new edges column "weight"
"""
assert sourceIds is not None and len(sourceIds) > 0, "Source vertices Ids sourceIds must be provided"
Expand All @@ -340,31 +380,55 @@ def parallelPersonalizedPageRank(self, resetProbability = 0.15, sourceIds = None
builder = builder.resetProbability(resetProbability)
builder = builder.sourceIds(sourceIds)
builder = builder.maxIter(maxIter)
javaVertexStorageLevel = self._sc._getJavaStorageLevel(intermediateVertexStorageLevel)
builder = builder.setIntermediateVertexStorageLevel(javaVertexStorageLevel)
javaEdgeStorageLevel = self._sc._getJavaStorageLevel(intermediateEdgeStorageLevel)
builder = builder.setIntermediateEdgeStorageLevel(javaEdgeStorageLevel)
jgf = builder.run()
return _from_java_gf(jgf, self._sqlContext)

def shortestPaths(self, landmarks):
def shortestPaths(self, landmarks, intermediateVertexStorageLevel = StorageLevel.MEMORY_ONLY,
intermediateEdgeStorageLevel = StorageLevel.MEMORY_ONLY):
"""
Runs the shortest path algorithm from a set of landmark vertices in the graph.

See Scala documentation for more details.

:param landmarks: a set of one or more landmarks
:param intermediateVertexStorageLevel: storage level for intermediate `Graph` vertices that
require multiple passes (default: `MEMORY_ONLY`).
:param intermediateEdgeStorageLevel: storage level for intermediate `Graph` edges that
require multiple passes (default: `MEMORY_ONLY`).
:return: DataFrame with new vertices column "distances"
"""
jdf = self._jvm_graph.shortestPaths().landmarks(landmarks).run()
javaVertexStorageLevel = self._sc._getJavaStorageLevel(intermediateVertexStorageLevel)
javaEdgeStorageLevel = self._sc._getJavaStorageLevel(intermediateEdgeStorageLevel)
jdf = self._jvm_graph.shortestPaths().landmarks(landmarks) \
.setIntermediateVertexStorageLevel(javaVertexStorageLevel) \
.setIntermediateEdgeStorageLevel(javaEdgeStorageLevel) \
.run()
return DataFrame(jdf, self._sqlContext)

def stronglyConnectedComponents(self, maxIter):
def stronglyConnectedComponents(self, maxIter, intermediateVertexStorageLevel = StorageLevel.MEMORY_ONLY,
intermediateEdgeStorageLevel = StorageLevel.MEMORY_ONLY):
"""
Runs the strongly connected components algorithm on this graph.

See Scala documentation for more details.

:param maxIter: the number of iterations to run
:param intermediateVertexStorageLevel: storage level for intermediate `Graph` vertices that
require multiple passes (default: `MEMORY_ONLY`).
:param intermediateEdgeStorageLevel: storage level for intermediate `Graph` edges that
require multiple passes (default: `MEMORY_ONLY`).
:return: DataFrame with new vertex column "component"
"""
jdf = self._jvm_graph.stronglyConnectedComponents().maxIter(maxIter).run()
javaVertexStorageLevel = self._sc._getJavaStorageLevel(intermediateVertexStorageLevel)
javaEdgeStorageLevel = self._sc._getJavaStorageLevel(intermediateEdgeStorageLevel)
jdf = self._jvm_graph.stronglyConnectedComponents().maxIter(maxIter) \
.setIntermediateVertexStorageLevel(javaVertexStorageLevel) \
.setIntermediateEdgeStorageLevel(javaEdgeStorageLevel) \
.run()
return DataFrame(jdf, self._sqlContext)

def svdPlusPlus(self, rank = 10, maxIter = 2, minValue = 0.0, maxValue = 5.0,
Expand Down
77 changes: 77 additions & 0 deletions python/graphframes/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .graphframe import GraphFrame, _java_api, _from_java_gf
from .lib import AggregateMessages as AM
from .examples import Graphs, BeliefPropagation
from pyspark.storagelevel import StorageLevel

class GraphFrameTestUtils(object):

Expand Down Expand Up @@ -273,6 +274,23 @@ def test_connected_components_friends(self):
for c in comps_tests:
self.assertEqual(c.groupBy("component").count().count(), 2)

def test_connected_components_intermediate_storage_level(self):
#graphx implementation
g = self._graph("friends")
expected = g.connectedComponents(algorithm="graphx").collect()
levels = [StorageLevel.DISK_ONLY, StorageLevel.MEMORY_AND_DISK]
for vLabel in levels:
for eLabel in levels:
out = g.connectedComponents(algorithm="graphx", intermediateGraphxVertexStorageLevel=vLabel, intermediateGraphxEdgeStorageLevel=eLabel).collect()
self.assertEqual(out, expected)

#graphframe implementation
expected = g.connectedComponents().collect()
levels = [StorageLevel.DISK_ONLY, StorageLevel.MEMORY_AND_DISK]
for label in levels:
out = g.connectedComponents(intermediateStorageLevel=label).collect()
self.assertEqual(out, expected)

def test_label_progagation(self):
n = 5
g = self._graph("twoBlobs", n)
Expand All @@ -284,6 +302,16 @@ def test_label_progagation(self):
all2 = set([x.label for x in labels2])
self.assertEqual(all2, set([n]))

def test_label_progagation_intermediate_storage_level(self):
g = self._graph("friends")
expected = g.labelPropagation(maxIter=1).collect()
levels = [StorageLevel.DISK_ONLY, StorageLevel.MEMORY_AND_DISK]
for vLabel in levels:
for eLabel in levels:
out = g.labelPropagation(maxIter=1, intermediateVertexStorageLevel=vLabel, intermediateEdgeStorageLevel=eLabel).collect()
self.assertEqual(out, expected)


def test_page_rank(self):
n = 100
g = self._graph("star", n)
Expand All @@ -292,6 +320,18 @@ def test_page_rank(self):
pr = g.pageRank(resetProb, tol=errorTol)
self._hasCols(pr, vcols=['id', 'pagerank'], ecols=['src', 'dst', 'weight'])

def test_page_rank_intermediate_storage_level(self):
g = self._graph("friends")
pr = g.pageRank(maxIter=1)
expected_vertex = pr.vertices.collect()
expected_edge = pr.edges.collect()
levels = [StorageLevel.DISK_ONLY, StorageLevel.MEMORY_AND_DISK]
for vLabel in levels:
for eLabel in levels:
pr_out = g.pageRank(maxIter=1, intermediateVertexStorageLevel=vLabel, intermediateEdgeStorageLevel=eLabel)
self.assertEqual(pr_out.vertices.collect(), expected_vertex)
self.assertEqual(pr_out.edges.collect(), expected_edge)

def test_parallel_personalized_page_rank(self):
if not GraphFrameTestUtils.spark_at_least_of_version("2.1"):
self.skipTest("Parallel Personalized PageRank is only available in Apache Spark 2.1+")
Expand All @@ -303,6 +343,24 @@ def test_parallel_personalized_page_rank(self):
pr = g.parallelPersonalizedPageRank(resetProb, sourceIds=sourceIds, maxIter=maxIter)
self._hasCols(pr, vcols=['id', 'pageranks'], ecols=['src', 'dst', 'weight'])

def test_parallel_personalized_page_rank_intermediate_storage_level(self):
if GraphFrameTestUtils.spark_at_least_of_version("2.2"):
self.skipTest("in Spark 2.2, sourceIds must be smaller than Int.MaxValue \
which might not be the case for LONG_ID in graph.indexedVertices")
if not GraphFrameTestUtils.spark_at_least_of_version("2.1"):
self.skipTest("Parallel Personalized PageRank is only available in Apache Spark 2.1+")
g = self._graph("friends")
sourceIds = ["a", "b"]
ppr = g.parallelPersonalizedPageRank(maxIter=1, sourceIds=sourceIds)
expected_vertex = ppr.vertices.collect()
expected_edge = ppr.edges.collect()
levels = [StorageLevel.DISK_ONLY, StorageLevel.MEMORY_AND_DISK]
for vLabel in levels:
for eLabel in levels:
ppr_out = g.parallelPersonalizedPageRank(maxIter=1, sourceIds=sourceIds, intermediateVertexStorageLevel=vLabel, intermediateEdgeStorageLevel=eLabel)
self.assertEqual(ppr_out.vertices.collect(), expected_vertex)
self.assertEqual(ppr_out.edges.collect(), expected_edge)

def test_shortest_paths(self):
edges = [(1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)]
all_edges = [z for (a, b) in edges for z in [(a, b), (b, a)]]
Expand All @@ -313,6 +371,16 @@ def test_shortest_paths(self):
v2 = g.shortestPaths(landmarks)
self._df_hasCols(v2, vcols=["id", "distances"])

def test_shortest_paths_intermediate_storage_level(self):
g = self._graph("friends")
landmarks = ["a", "d"]
expected = g.shortestPaths(landmarks=landmarks).collect()
levels = [StorageLevel.DISK_ONLY, StorageLevel.MEMORY_AND_DISK]
for vLabel in levels:
for eLabel in levels:
out = g.shortestPaths(landmarks=landmarks, intermediateVertexStorageLevel=vLabel, intermediateEdgeStorageLevel=eLabel).collect()
self.assertEqual(out, expected)

def test_svd_plus_plus(self):
g = self._graph("ALSSyntheticData")
(v2, cost) = g.svdPlusPlus()
Expand All @@ -327,6 +395,15 @@ def test_strongly_connected_components(self):
for row in c.collect():
self.assertEqual(row.id, row.component)

def test_strongly_connected_components_intermediate_storage_level(self):
g = self._graph("friends")
expected = g.stronglyConnectedComponents(maxIter=1).collect()
levels = [StorageLevel.DISK_ONLY, StorageLevel.MEMORY_AND_DISK]
for vLabel in levels:
for eLabel in levels:
out = g.stronglyConnectedComponents(maxIter=1, intermediateVertexStorageLevel=vLabel, intermediateEdgeStorageLevel=eLabel).collect()
self.assertEqual(out, expected)

def test_triangle_counts(self):
edges = self.sqlContext.createDataFrame([(0, 1), (1, 2), (2, 0)], ["src", "dst"])
vertices = self.sqlContext.createDataFrame([(0,), (1,), (2,)], ["id"])
Expand Down
24 changes: 24 additions & 0 deletions src/main/scala/org/graphframes/GraphFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,30 @@ class GraphFrame private(
cachedGraphX.mapVertices((_, _) => ()).mapEdges(e => ())
}

/**
* creates a new GraphX `Graph` from the cachedTopologyGraphX `Graph` with the vertex and edge storage levels passed
* in this method
* @param vertexStorageLevel vertex storage level set in the returned GraphX `Graph` instance
* @param edgeStorageLevel edge storage level set in the returned GraphX `Graph` instance
* @return a copy of the cachedTopologyGraphX `Graph` instance with the provided storage levels or the
* cachedTopologyGraphX instance if the default storage level values are provided (``MEMORY_ONLY`` in both cases)
*
*/
private[graphframes] def cachedTopologyGraphXWithStorageLevel(
vertexStorageLevel: StorageLevel,
edgeStorageLevel: StorageLevel) =
if(vertexStorageLevel == StorageLevel.MEMORY_ONLY && edgeStorageLevel == StorageLevel.MEMORY_ONLY) {
cachedTopologyGraphX
} else {
Graph(
vertices = cachedTopologyGraphX.vertices,
edges = cachedTopologyGraphX.edges,
defaultVertexAttr = null.asInstanceOf[Unit],
vertexStorageLevel = vertexStorageLevel,
edgeStorageLevel = edgeStorageLevel
)
}

/**
* A cached conversion of this graph to the GraphX structure, with the data stored for each edge and vertex.
*/
Expand Down
Loading