Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ recursive-exclude * __pycache__
recursive-exclude * *.pyc
include README.md
include LICENSE
include graphframes/tutorials/data/.exists
19 changes: 19 additions & 0 deletions python/graphframes/console.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import click
from graphframes.tutorials import download


@click.group()
def cli():
"""GraphFrames CLI: a collection of commands for graphframes."""
pass


cli.add_command(download.stackexchange)


def main():
cli()


if __name__ == "__main__":
main()
88 changes: 88 additions & 0 deletions python/graphframes/tutorials/download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python

"""Download and decompress the Stack Exchange data dump from the Internet Archive."""

import os

import click
import py7zr
import requests # type: ignore


@click.command()
@click.argument("subdomain")
@click.option(
"--data-dir",
default="python/graphframes/tutorials/data",
help="Directory to store downloaded files",
)
@click.option(
"--extract/--no-extract", default=True, help="Whether to extract the archive after download"
)
def stackexchange(subdomain: str, data_dir: str, extract: bool) -> None:
"""Download Stack Exchange archive for a given SUBDOMAIN.

Example: python/graphframes/tutorials/download.py stats.meta

Note: This won't work for stackoverflow.com archives due to size.
"""
# Create data directory if it doesn't exist
os.makedirs(data_dir, exist_ok=True)

# Construct archive URL and filename
archive_url = f"https://archive.org/download/stackexchange/{subdomain}.stackexchange.com.7z"
archive_path = os.path.join(data_dir, f"{subdomain}.stackexchange.com.7z")

click.echo(f"Downloading archive from {archive_url}")

try:
# Download the file with retries
max_retries = 3
retry_count = 0

while retry_count < max_retries:
try:
response = requests.get(archive_url, stream=True)
response.raise_for_status() # Raise exception for bad status codes
break
except (
requests.exceptions.RequestException,
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
requests.exceptions.Timeout,
) as e:
retry_count += 1
if retry_count == max_retries:
click.echo(f"Failed to download after {max_retries} attempts: {e}", err=True)
raise click.Abort()
click.echo(f"Download attempt {retry_count} failed, retrying...")

total_size = int(response.headers.get("content-length", 0))

with click.progressbar(length=total_size, label="Downloading") as bar: # type: ignore
with open(archive_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
bar.update(len(chunk))

click.echo(f"Download complete: {archive_path}")

# Extract if requested
if extract:
click.echo("Extracting archive...")
output_dir = f"{subdomain}.stackexchange.com"
with py7zr.SevenZipFile(archive_path, mode="r") as z:
z.extractall(path=os.path.join(data_dir, output_dir))
click.echo(f"Extraction complete: {output_dir}")

except requests.exceptions.RequestException as e:
click.echo(f"Error downloading archive: {e}", err=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maybe try for a couple of times in case of network errors? 2-3 should be enough

raise click.Abort()
except py7zr.Bad7zFile as e:
click.echo(f"Error extracting archive: {e}", err=True)
raise click.Abort()


if __name__ == "__main__":
stackexchange()
203 changes: 203 additions & 0 deletions python/graphframes/tutorials/motif.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Demonstrate GraphFrames network motif finding capabilities. Code from the Network Motif Finding Tutorial."""

#
# Interactive Usage: pyspark --packages graphframes:graphframes:0.8.4-spark3.5-s_2.12
#
# Batch Usage: spark-submit --packages graphframes:graphframes:0.8.4-spark3.5-s_2.12 python/graphframes/tutorials/motif.py
#

import click
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession

from graphframes import GraphFrame

# Initialize a SparkSession
spark: SparkSession = SparkSession.builder.appName("Stack Overflow Motif Analysis").getOrCreate()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about passing checkpoint dir during the session init and avoid at all any usage of SparkContext (in PySpark docs it is recommended to use SparkSession instead of SparkContext)?

spark.sparkContext.setCheckpointDir("/tmp/graphframes-checkpoints/motif")

# Change me if you download a different stackexchange site
STACKEXCHANGE_SITE = "stats.meta.stackexchange.com"
BASE_PATH = f"python/graphframes/tutorials/data/{STACKEXCHANGE_SITE}"


#
# Load the nodes and edges from disk, repartition, checkpoint [plan got long for some reason] and cache.
#

# We created these in stackexchange.py from Stack Exchange data dump XML files
NODES_PATH: str = f"{BASE_PATH}/Nodes.parquet"
nodes_df: DataFrame = spark.read.parquet(NODES_PATH)

# Repartition the nodes to give our motif searches parallelism
nodes_df = nodes_df.repartition(50).checkpoint().cache()

# We created these in stackexchange.py from Stack Exchange data dump XML files
EDGES_PATH: str = f"{BASE_PATH}/Edges.parquet"
edges_df: DataFrame = spark.read.parquet(EDGES_PATH)

# Repartition the edges to give our motif searches parallelism
edges_df = edges_df.repartition(50).checkpoint().cache()

# What kind of nodes we do we have to work with?
node_counts = (
nodes_df.select("id", F.col("Type").alias("Node Type"))
.groupBy("Node Type")
.count()
.orderBy(F.col("count").desc())
# Add a comma formatted column for display
.withColumn("count", F.format_number(F.col("count"), 0))
)
node_counts.show()

# What kind of edges do we have to work with?
edge_counts = (
edges_df.select("src", "dst", F.col("relationship").alias("Edge Type"))
.groupBy("Edge Type")
.count()
.orderBy(F.col("count").desc())
# Add a comma formatted column for display
.withColumn("count", F.format_number(F.col("count"), 0))
)
edge_counts.show()

g = GraphFrame(nodes_df, edges_df)

g.vertices.show(10)
click.echo(f"Node columns: {g.vertices.columns}")

g.edges.sample(0.0001).show(10)

# Sanity test that all edges have valid ids
edge_count = g.edges.count()
valid_edge_count = (
g.edges.join(g.vertices, on=g.edges.src == g.vertices.id)
.select("src", "dst", "relationship")
.join(g.vertices, on=g.edges.dst == g.vertices.id)
.count()
)

# Just up and die if we have edges that point to non-existent nodes
assert (
edge_count == valid_edge_count
), f"Edge count {edge_count} != valid edge count {valid_edge_count}"
click.echo(f"Edge count: {edge_count:,} == Valid edge count: {valid_edge_count:,}")

# G4: Continuous Triangles
paths = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(a)")

# Show the first path
paths.show(3)

graphlet_type_df = paths.select(
F.col("a.Type").alias("A_Type"),
F.col("e1.relationship").alias("(a)-[e1]->(b)"),
F.col("b.Type").alias("B_Type"),
F.col("e2.relationship").alias("(b)-[e2]->(c)"),
F.col("c.Type").alias("C_Type"),
F.col("e3.relationship").alias("(c)-[e3]->(a)"),
)

graphlet_count_df = (
graphlet_type_df.groupby(
"A_Type", "(a)-[e1]->(b)", "B_Type", "(b)-[e2]->(c)", "C_Type", "(c)-[e3]->(a)"
)
.count()
.orderBy(F.col("count").desc())
# Add a comma formatted column for display
.withColumn("count", F.format_number(F.col("count"), 0))
)
graphlet_count_df.show()

# G5: Divergent Triangles
paths = g.find("(a)-[e1]->(b); (a)-[e2]->(c); (c)-[e3]->(b)")

graphlet_type_df = paths.select(
F.col("a.Type").alias("A_Type"),
F.col("e1.relationship").alias("(a)-[e1]->(b)"),
F.col("b.Type").alias("B_Type"),
F.col("e2.relationship").alias("(a)-[e2]->(c)"),
F.col("c.Type").alias("C_Type"),
F.col("e3.relationship").alias("(c)-[e3]->(b)"),
)

graphlet_count_df = (
graphlet_type_df.groupby(
"A_Type", "(a)-[e1]->(b)", "B_Type", "(a)-[e2]->(c)", "C_Type", "(c)-[e3]->(b)"
)
.count()
.orderBy(F.col("count").desc())
# Add a comma formatted column for display
.withColumn("count", F.format_number(F.col("count"), 0))
)
graphlet_count_df.show()

# G17: A directed 3-path is a surprisingly diverse graphlet
paths = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (d)-[e3]->(c)")

# Visualize the four-path by counting instances of paths by node / edge type
graphlet_type_df = paths.select(
F.col("a.Type").alias("A_Type"),
F.col("e1.relationship").alias("(a)-[e1]->(b)"),
F.col("b.Type").alias("B_Type"),
F.col("e2.relationship").alias("(b)-[e2]->(c)"),
F.col("c.Type").alias("C_Type"),
F.col("e3.relationship").alias("(d)-[e3]->(c)"),
F.col("d.Type").alias("D_Type"),
)
graphlet_count_df = (
graphlet_type_df.groupby(
"A_Type",
"(a)-[e1]->(b)",
"B_Type",
"(b)-[e2]->(c)",
"C_Type",
"(d)-[e3]->(c)",
"D_Type",
)
.count()
.orderBy(F.col("count").desc())
# Add a comma formatted column for display
.withColumn("count", F.format_number(F.col("count"), 0))
)
graphlet_count_df.show()

graphlet_count_df.orderBy(
[
"A_Type",
"(a)-[e1]->(b)",
"B_Type",
"(b)-[e2]->(c)",
"C_Type",
"(d)-[e3]->(c)",
"D_Type",
],
ascending=False,
).show(104)

# A user answers an answer that answers a question that links to an answer.
linked_vote_paths = paths.filter(
(F.col("a.Type") == "Vote")
& (F.col("e1.relationship") == "CastFor")
& (F.col("b.Type") == "Question")
& (F.col("e2.relationship") == "Links")
& (F.col("c.Type") == "Question")
& (F.col("e3.relationship") == "CastFor")
& (F.col("d.Type") == "Vote")
)

# Sanity check the count - it should match the table above
linked_vote_paths.count()

b_vote_counts = linked_vote_paths.select("a", "b").distinct().groupBy("b").count()
c_vote_counts = linked_vote_paths.select("c", "d").distinct().groupBy("c").count()

linked_vote_counts = (
linked_vote_paths.filter((F.col("a.VoteTypeId") == 2) & (F.col("d.VoteTypeId") == 2))
.select("b", "c")
.join(b_vote_counts, on="b", how="inner")
.withColumnRenamed("count", "b_count")
.join(c_vote_counts, on="c", how="inner")
.withColumnRenamed("count", "c_count")
)
linked_vote_counts.stat.corr("b_count", "c_count")
Loading