diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index 8b84d6d82..36b6b97e7 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -7,8 +7,8 @@ jobs: matrix: include: - spark-version: 3.5.4 - scala-version: 2.12.18 - python-version: 3.9.19 + scala-version: 2.12.20 + python-version: 3.11.11 runs-on: ubuntu-22.04 env: # define Java options for both official sbt and sbt-extras @@ -35,8 +35,11 @@ jobs: run: | python -m pip install --upgrade pip wheel pip install -r ./python/requirements.txt + pip install -r ./python/requirements-dev.txt pip install pyspark==${{ matrix.spark-version }} - name: Test run: | + python python/setup.py install + python python/setup.py bdist_wheel export SPARK_HOME=$(python -c "import os; from importlib.util import find_spec; print(os.path.join(os.path.dirname(find_spec('pyspark').origin)))") ./python/run-tests.sh diff --git a/.gitignore b/.gitignore index a07973c1e..dcbde8186 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,13 @@ project/plugins/project/ # Mac *.DS_Store +.vscode + +# Python specific +python/build +python/dist +build/lib +python/graphframes.egg-info +python/graphframes/tutorials/data +python/docs/_build +python/docs/_site diff --git a/Dockerfile b/Dockerfile index 1c4430912..b9fe8c528 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,16 +1,16 @@ FROM ubuntu:22.04 -ARG PYTHON_VERSION=3.8 +ARG PYTHON_VERSION=3.9 ARG DEBIAN_FRONTEND=noninteractive RUN apt-get update && \ - apt-get install -y wget bzip2 build-essential openjdk-8-jdk ssh sudo && \ + apt-get install -y wget bzip2 build-essential openjdk-11-jdk ssh sudo && \ apt-get clean # Install Spark and update env variables. -ENV SCALA_VERSION 2.12.17 -ENV SPARK_VERSION "3.4.1" -ENV SPARK_BUILD "spark-${SPARK_VERSION}-bin-hadoop3.2" +ENV SCALA_VERSION 2.12.20 +ENV SPARK_VERSION "3.5.4" +ENV SPARK_BUILD "spark-${SPARK_VERSION}-bin-hadoop3" ENV SPARK_BUILD_URL "https://dist.apache.org/repos/dist/release/spark/spark-${SPARK_VERSION}/${SPARK_BUILD}.tgz" RUN wget --quiet "$SPARK_BUILD_URL" -O /tmp/spark.tgz && \ tar -C /opt -xf /tmp/spark.tgz && \ diff --git a/README.md b/README.md index 69c3a8eac..fa55a5bb3 100644 --- a/README.md +++ b/README.md @@ -6,34 +6,165 @@ # GraphFrames: DataFrame-based Graphs -This is a package for DataFrame-based graphs on top of Apache Spark. -Users can write highly expressive queries by leveraging the DataFrame API, combined with a new -API for motif finding. The user also benefits from DataFrame performance optimizations -within the Spark SQL engine. +This is a package for DataFrame-based graphs on top of Apache Spark. Users can write highly expressive queries by leveraging the DataFrame API, combined with a new API for network motif finding. The user also benefits from DataFrame performance optimizations within the Spark SQL engine. GraphFrames works in Java, Scala, and Python. -You can find user guide and API docs at https://graphframes.github.io/graphframes. +You can find user guide and API docs at https://graphframes.github.io/graphframes + +## Installation and Quick-Start + +The easiest way to start using GraphFrames is through the [Spark Packages system](https://spark-packages.org/package/graphframes/graphframes). Just run the following command: + +```bash +# Interactive Scala/Java +$ spark-shell --packages graphframes:graphframes:0.8.3-spark3.5-s_2.12 + +# Interactive Python +$ pyspark --packages graphframes:graphframes:0.8.3-spark3.5-s_2.12 + +# Submit a script in Scala/Java/Python +$ spark-submit --packages graphframes:graphframes:0.8.3-spark3.5-s_2.12 script.py +``` + +Now you can create a GraphFrame as follows. + +In Python: + +```python +from pyspark.sql import SparkSession +from graphframes import GraphFrame + +spark = SparkSession.builder.getOrCreate() + +nodes = [ + (1, "Alice", 30), + (2, "Bob", 25), + (3, "Charlie", 35) +] +nodes_df = spark.createDataFrame(nodes, ["id", "name", "age"]) + +edges = [ + (1, 2, "friend"), + (2, 1, "friend"), + (2, 3, "friend"), + (3, 2, "enemy") # eek! +] +edges_df = spark.createDataFrame(edges, ["src", "dst", "relationship"]) + +g = GraphFrame(nodes_df, edges_df) +``` + +Now let's run some graph algorithms at scale! + +```python +g.inDegrees.show() + +# +---+--------+ +# | id|inDegree| +# +---+--------+ +# | 2| 2| +# | 1| 1| +# | 3| 1| +# +---+--------+ + +g.outDegrees.show() + +# +---+---------+ +# | id|outDegree| +# +---+---------+ +# | 1| 1| +# | 2| 2| +# | 3| 1| +# +---+---------+ + +g.degrees.show() + +# +---+------+ +# | id|degree| +# +---+------+ +# | 1| 2| +# | 2| 4| +# | 3| 2| +# +---+------+ + +g2 = g.pageRank(resetProbability=0.15, tol=0.01) +g2.vertices.show() + +# +---+-----+---+------------------+ +# | id| name|age| pagerank| +# +---+-----+---+------------------+ +# | 1| John| 30|0.7758750474847483| +# | 2|Alice| 25|1.4482499050305027| +# | 3| Bob| 35|0.7758750474847483| +# +---+-----+---+------------------+ + +# GraphFrames' most used feature... +# Connected components can do big data entity resolution on billions or even trillions of records! +# First connect records with a similarity metric, then run connectedComponents. +# This gives you groups of identical records, which you then link by same_as edges or merge into list-based master records. +sc.setCheckpointDir("/tmp/graphframes-example-connected-components") # required by GraphFrames.connectedComponents +g.connectedComponents().show() + +# +---+-----+---+---------+ +# | id| name|age|component| +# +---+-----+---+---------+ +# | 1| John| 30| 1| +# | 2|Alice| 25| 1| +# | 3| Bob| 35| 1| +# +---+-----+---+---------+ + +# Find frenemies with network motif finding! See how graph and relational queries are combined? +( + g.find("(a)-[e]->(b); (b)-[e2]->(a)") + .filter("e.relationship = 'friend' and e2.relationship = 'enemy'") + .show() +) + +# These are paths, which you can aggregate and count to find complex patterns. +# +------------+--------------+----------------+-------------+ +# | a| e| b| e2| +# +------------+--------------+----------------+-------------+ +# |{2, Bob, 25}|{2, 3, friend}|{3, Charlie, 35}|{3, 2, enemy}| +# +------------+--------------+----------------+-------------+ +``` + +## Learn GraphFrames + +To learn more about GraphFrames, check out these resources: + +* [GraphFrames Network Motif Finding Tutorial](https://graphframes.github.io/graphframes/docs/_site/motif-tutorial.html) +* [Introducing GraphFrames](https://databricks.com/blog/2016/03/03/introducing-graphframes.html) +* [On-Time Flight Performance with GraphFrames for Apache Spark](https://databricks.com/blog/2016/03/16/on-time-flight-performance-with-graphframes-for-apache-spark.html) + +## GraphFrames on PyPI is Unofficial + +The project is not in ownership or control of the [graphframes PyPI package](https://pypi.org/project/graphframes/) (installs 0.6.0) or [graphframes-latest PyPI package](https://pypi.org/project/graphframes-latest/) (installs 0.8.3). We recommend using the Spark Packages system to install the latest version of GraphFrames. The PyPI packages are not maintained by the GraphFrames project. + +If you are in control of one of these packages, please reach out to us to discuss how we can work together to keep them up to date. Hopefully this situation will be addressed in the near future. + +See [Installation and Quick-Start](#installation-and-quick-start) for the best way to install and use GraphFrames. + +## GraphFrames Internals + +To learn how GraphFrames works internally to combine graph and relational queries, check out the paper [GraphFrames: An Integrated API for Mixing Graph and +Relational Queries, Dave et al. 2016](https://people.eecs.berkeley.edu/~matei/papers/2016/grades_graphframes.pdf). ## Building and running unit tests -To compile this project, run `build/sbt assembly` from the project home directory. -This will also run the Scala unit tests. +To compile this project, run `build/sbt assembly` from the project home directory. This will also run the Scala unit tests. -To run the Python unit tests, run the `run-tests.sh` script from the `python/` directory. -You will need to set `SPARK_HOME` to your local Spark installation directory. +To run the Python unit tests, run the `run-tests.sh` script from the `python/` directory. You will need to set `SPARK_HOME` to your local Spark installation directory. ## Release new version + Please see guide `dev/release_guide.md`. ## Spark version compatibility -This project is compatible with Spark 2.4+. However, significant speed improvements have been -made to DataFrames in more recent versions of Spark, so you may see speedups from using the latest -Spark version. +This project is compatible with Spark 2.4+. However, significant speed improvements have been made to DataFrames in more recent versions of Spark, so you may see speedups from using the latest Spark version. ## Contributing -GraphFrames is collaborative effort among UC Berkeley, MIT, and Databricks. -We welcome open source contributions as well! +GraphFrames is collaborative effort among UC Berkeley, MIT, Databricks and the open source community. We welcome open source contributions as well! ## Releases: diff --git a/build.sbt b/build.sbt index 061901717..63168c57d 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,7 @@ import ReleaseTransformations._ lazy val sparkVer = sys.props.getOrElse("spark.version", "3.5.4") lazy val sparkBranch = sparkVer.substring(0, 3) lazy val defaultScalaVer = sparkBranch match { - case "3.5" => "2.12.18" + case "3.5" => "2.12.20" case _ => throw new IllegalArgumentException(s"Unsupported Spark version: $sparkVer.") } lazy val scalaVer = sys.props.getOrElse("scala.version", defaultScalaVer) diff --git a/dev/release_guide.md b/dev/release_guide.md index 19be87366..f89708c9c 100644 --- a/dev/release_guide.md +++ b/dev/release_guide.md @@ -1,8 +1,8 @@ -# Guild for releasing a new Graphframe version +# Guild for releasing a new Graphframes version -## How to build GraphFrame package ? +## How to build GraphFrames package ? -To build a GraphFrame package for releasing, you only need to run the following command: +To build a GraphFrames package for releasing, you only need to run the following command: ``` cd graphframe_repo @@ -30,10 +30,9 @@ then upload the zip file generated by instructions in "How to build GraphFrame p ## How to publish the GraphFrame doc ? -GraphFrame doc is hosted in 'https://graphframes.github.io/graphframes/', to publish doc, -you just need to build doc content, then push the doc content to gh-pages branch of https://github.com/graphframes/graphframes project. +GraphFrames docs are hosted in 'https://graphframes.github.io/graphframes/'. To publish the docs, you just need to build the doc content, then push the doc content to gh-pages branch of the https://github.com/graphframes/graphframes project. -Before building doc, you need to install jekyll, please refer to 'docs/README.md' for details. +Before building the docs, you need to install jekyll, please refer to 'docs/README.md' for details. The following command is for building and publishing doc: ``` diff --git a/docs/README.md b/docs/README.md index 769c1cd3f..3d7da67d5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,28 +1,18 @@ Welcome to the GraphFrames Spark Package documentation! -This readme will walk you through navigating and building the GraphFrames documentation, which is -included here with the source code. +This readme will walk you through navigating and building the GraphFrames documentation, which is included here with the source code. -Read on to learn more about viewing documentation in plain text (i.e., markdown) or building the -documentation yourself. Why build it yourself? So that you have the docs that correspond to -whichever version of GraphFrames you currently have checked out of revision control. +Read on to learn more about viewing documentation in plain text (i.e., markdown) or building the documentation yourself. Why build it yourself? So that you have the docs that correspond to whichever version of GraphFrames you currently have checked out of revision control. ## Generating the Documentation HTML -We include the GraphFrames documentation as part of the source (as opposed to using a hosted wiki, such as -the github wiki, as the definitive documentation) to enable the documentation to evolve along with -the source code and be captured by revision control (currently git). This way the code automatically -includes the version of the documentation that is relevant regardless of which version or release -you have checked out or downloaded. +We include the GraphFrames documentation as part of the source (as opposed to using a hosted wiki, such as the github wiki, as the definitive documentation) to enable the documentation to evolve along with the source code and be captured by revision control (currently git). This way the code automatically +includes the version of the documentation that is relevant regardless of which version or release you have checked out or downloaded. -In this directory you will find textfiles formatted using Markdown, with an ".md" suffix. You can -read those text files directly if you want. Start with index.md. +In this directory you will find textfiles formatted using Markdown, with an ".md" suffix. You can read those text files directly if you want. Start with index.md. The markdown code can be compiled to HTML using the [Jekyll tool](http://jekyllrb.com). -`Jekyll` and a few dependencies must be installed for this to work. We recommend -installing via the Ruby Gem dependency manager. Since the exact HTML output -varies between versions of Jekyll and its dependencies, we list specific versions here -in some cases: +`Jekyll` and a few dependencies must be installed for this to work. We recommend installing via the Ruby Gem dependency manager. Since the exact HTML output varies between versions of Jekyll and its dependencies, we list specific versions here in some cases: $ sudo gem install jekyll $ sudo gem install jekyll-redirect-from @@ -32,8 +22,7 @@ On macOS, with the default Ruby, please install Jekyll with Bundler as [instruct $ sudo gem install jekyll bundler $ sudo gem install jekyll-redirect-from -Execute `jekyll build` from the `docs/` directory to compile the site. Compiling the site with Jekyll will create a directory -called `_site` containing index.html as well as the rest of the compiled files. +Execute `jekyll build` from the `docs/` directory to compile the site. Compiling the site with Jekyll will create a directory called `_site` containing index.html as well as the rest of the compiled files. You can modify the default Jekyll build as follows: @@ -45,27 +34,23 @@ You can modify the default Jekyll build as follows: $ PRODUCTION=1 jekyll build Note that `SPARK_HOME` must be set to your local Spark installation in order to generate the docs. + To manually point to a specific `Spark` installation, $ SPARK_HOME= PRODUCTION=1 jekyll build ## Sphinx We use Sphinx to generate Python API docs, so you will need to install it by running -`sudo pip install sphinx`. + + sudo pip install sphinx ## API Docs (Scaladoc, Sphinx) You can build just the scaladoc by running `build/sbt unidoc` from the GRAPHFRAMES_PROJECT_ROOT directory. -Similarly, you can build just the Python docs by running `make html` from the -GRAPHFRAMES_PROJECT_ROOT/python/docs directory. Documentation is only generated for classes that are listed as -public in `__init__.py`. +Similarly, you can build just the Python docs by running `make html` from the GRAPHFRAMES_PROJECT_ROOT/python/docs directory. Documentation is only generated for classes that are listed as public in `__init__.py`. -When you run `jekyll` in the `docs` directory, it will also copy over the scaladoc for the various -subprojects into the `docs` directory (and then also into the `_site` directory). We use a -jekyll plugin to run `build/sbt unidoc` before building the site so if you haven't run it (recently) it -may take some time as it generates all of the scaladoc. The jekyll plugin also generates the +When you run `jekyll` in the `docs` directory, it will also copy over the scaladoc for the various subprojects into the `docs` directory (and then also into the `_site` directory). We use a jekyll plugin to run `build/sbt unidoc` before building the site so if you haven't run it (recently) it may take some time as it generates all of the scaladoc. The jekyll plugin also generates the Python docs [Sphinx](http://sphinx-doc.org/). -NOTE: To skip the step of building and copying over the Scala, Python API docs, run `SKIP_API=1 -jekyll build`. To skip building Scala API docs, run `SKIP_SCALADOC=1 jekyll build`; to skip building Python API docs, run `SKIP_PYTHONDOC=1 jekyll build`. +NOTE: To skip the step of building and copying over the Scala, Python API docs, run `SKIP_API=1 jekyll build`. To skip building Scala API docs, run `SKIP_SCALADOC=1 jekyll build`; to skip building Python API docs, run `SKIP_PYTHONDOC=1 jekyll build`. diff --git a/docs/_config.yml b/docs/_config.yml index 4c1ab075c..379fc242f 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -13,7 +13,7 @@ include: # These allow the documentation to be updated with newer releases # of Spark, Scala, and Mesos. -GRAPHFRAMES_VERSION: 0.8.4 +GRAPHFRAMES_VERSION: 0.8.5 #SCALA_BINARY_VERSION: "2.10" #SCALA_VERSION: "2.10.4" #MESOS_VERSION: 0.21.0 diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index 3ee1a85ee..51bc021ea 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -74,6 +74,7 @@ diff --git a/docs/img/4-node-directed-graphlets.png b/docs/img/4-node-directed-graphlets.png new file mode 100644 index 000000000..74d8e1806 Binary files /dev/null and b/docs/img/4-node-directed-graphlets.png differ diff --git a/docs/img/Directed-Graphlet-G17.png b/docs/img/Directed-Graphlet-G17.png new file mode 100644 index 000000000..4feea4f37 Binary files /dev/null and b/docs/img/Directed-Graphlet-G17.png differ diff --git a/docs/img/Directed-Graphlet-G22.png b/docs/img/Directed-Graphlet-G22.png new file mode 100644 index 000000000..1778e56fe Binary files /dev/null and b/docs/img/Directed-Graphlet-G22.png differ diff --git a/docs/img/G11_motif.png b/docs/img/G11_motif.png new file mode 100644 index 000000000..1e2524093 Binary files /dev/null and b/docs/img/G11_motif.png differ diff --git a/docs/img/G4_and_G5_directed_network_motif.png b/docs/img/G4_and_G5_directed_network_motif.png new file mode 100644 index 000000000..83d34c901 Binary files /dev/null and b/docs/img/G4_and_G5_directed_network_motif.png differ diff --git a/docs/img/GraphFrames-Logo-Dark-Small.png b/docs/img/GraphFrames-Logo-Dark-Small.png new file mode 100644 index 000000000..05b0c3bc3 Binary files /dev/null and b/docs/img/GraphFrames-Logo-Dark-Small.png differ diff --git a/docs/img/GraphFrames-Logo-Large.png b/docs/img/GraphFrames-Logo-Large.png new file mode 100644 index 000000000..bfac7ebcb Binary files /dev/null and b/docs/img/GraphFrames-Logo-Large.png differ diff --git a/docs/img/GraphFrames-Logo-Small.png b/docs/img/GraphFrames-Logo-Small.png new file mode 100644 index 000000000..1e052e776 Binary files /dev/null and b/docs/img/GraphFrames-Logo-Small.png differ diff --git a/docs/img/directed_graphlets.webp b/docs/img/directed_graphlets.webp new file mode 100644 index 000000000..caa02321c Binary files /dev/null and b/docs/img/directed_graphlets.webp differ diff --git a/docs/index.md b/docs/index.md index 9bd2ccb82..b9d8917bc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -63,14 +63,21 @@ GraphFrames supplied as a package. * [Quick Start](quick-start.html): a quick introduction to the GraphFrames API; start here! * [GraphFrames User Guide](user-guide.html): detailed overview of GraphFrames in all supported languages (Scala, Java, Python) +* [Motif Finding Tutorial](motif-tutorial.html): learn to perform pattern recognition with GraphFrames using a technique called network motif finding over the knowledge graph for the `stackexchange.com` subdomain [data dump](https://archive.org/details/stackexchange) **API Docs:** * [GraphFrames Scala API (Scaladoc)](api/scala/index.html#org.graphframes.package) * [GraphFrames Python API (Sphinx)](api/python/index.html) +**Community Forums:** + +* [GraphFrames Mailing List](https://groups.google.com/g/graphframes/): ask questions about GraphFrames here +* [#graphframes Discord Channel on GraphGeeks](https://discord.com/channels/1162999022819225631/1326257052368113674) + **External Resources:** * [Apache Spark Homepage](http://spark.apache.org) * [Apache Spark Wiki](https://cwiki.apache.org/confluence/display/SPARK) -* [Mailing Lists](http://spark.apache.org/mailing-lists.html): Ask questions about Spark here +* [Apache Spark Mailing Lists](http://spark.apache.org/mailing-lists.html) +* [GraphFrames on Stack Overflow](https://stackoverflow.com/questions/tagged/graphframes) diff --git a/docs/motif-tutorial.md b/docs/motif-tutorial.md new file mode 100644 index 000000000..2f95952ee --- /dev/null +++ b/docs/motif-tutorial.md @@ -0,0 +1,768 @@ +--- +layout: global +displayTitle: GraphFrames Network Motif Finding Tutorial +title: Network Motif Finding Tutorial +description: GraphFrames GRAPHFRAMES_VERSION motif finding tutorial - teaches you to find motifs using Stack Exchange data +--- + +This tutorial covers GraphFrames' motif finding feature. We perform pattern matching on a property graph representing a Stack Exchange site using Apache Spark and [GraphFrames' motif finding](user-guide.html#motif-finding) feature. We will download the `stats.meta` archive from the [Stack Exchange Data Dump at the Internet Archive](https://archive.org/details/stackexchange), use PySpark to build a property graph and then mine it for property graph network motifs by combining both graph and relational queries. + +* Table of contents (This text will be scraped.) + {:toc} + +# What are graphlets and network motifs? + +Graphlets are small, connected subgraphs of a larger graph. Network motifs are recurring patterns in complex networks that are significantly more frequent than in random networks. They are the building blocks of complex networks and can be used to understand the structure and function of networks. Network motifs can be used to identify functional modules in biological networks, detect anomalies in social networks, detect money laundering and terrorism financing in financial networks, and predict the behavior of complex systems. + +
+
+ Directed network motifs for up to Four nodes +
Extending the Applicability of +Graphlets to Directed Networks, Aparicio et al. 2017
+
+
+ +We are going to mine motifs using Stack Exchange data. The Stack Exchange network is a complex network of users, posts, votes, badges, and tags. We will use GraphFrames to build a property graph from the Stack Exchange data dump and then use GraphFrames' motif finding feature to find network motifs in the graph. You'll see how to combine graph and relational queries to find complex patterns in the graph. + +# Download the Stack Exchange Dump for [stats.meta](https://stats.meta.stackexchange.com) + +The Python tutorials include a CLI utility at [`python/graphframes/tutorials/download.py`](python/graphframes/tutorials/download.py) for downloading any site's [Stack Exchange Data Dump](https://archive.org/details/stackexchange) from the Internet Archive. The script takes the subdomain as an argument, downloads the corresponding 7zip archive and expands it into the `python/graphframes/tutorials/data` folder. + +
+{% highlight bash %} +Usage: download.py [OPTIONS] SUBDOMAIN + + Download Stack Exchange archive for a given SUBDOMAIN. + + Example: python/graphframes/tutorials/download.py stats.meta + + Note: This won't work for stackoverflow.com archives due to size. + +Options: + --data-dir TEXT Directory to store downloaded files + --extract / --no-extract Whether to extract the archive after download + --help Show this message and exit. +{% endhighlight %} +
+ +Use `download.py` to download the Stack Exchange Data Dump for `stats.meta.stackexchange.com`. + +
+{% highlight bash %} +$ python python/graphframes/tutorials/download.py stats.meta + +Downloading archive from +Downloading [####################################] 100% +Download complete: python/graphframes/tutorials/data/stats.meta.stackexchange.com.7z +Extracting archive... +Extraction complete: stats.meta.stackexchange.com +{% endhighlight %} +
+ +# Build the Graph + +We will build a property graph from the Stack Exchange data dump using PySpark in the [python/graphframes/tutorials/stackexchange.py](python/graphframes/tutorials/stackexchange.py) script. The data comes as a single XML file, so we use [spark-xml](https://github.com/databricks/spark-xml) (moving inside Spark as of 4.0) to load the data, extract the relevant fields and build the nodes and edges of the graph. For some reason Spark XML uses a lot of RAM, so we need to increase the driver and executor memory to at least 4GB. + +
+{% highlight bash %} +$ spark-submit --packages com.databricks:spark-xml_2.12:0.18.0 --driver-memory 4g --executor-memory 4g python/graphframes/tutorials/stackexchange.py +{% endhighlight %} +
+ +The script will output the nodes and edges of the graph in the `python/graphframes/tutorials/data` folder. We can now use GraphFrames to load the graph and perform motif finding. + +# Motif Finding + +We will use GraphFrames to find motifs in the Stack Exchange property graph. The script [python/graphframes/tutorials/motif.py](python/graphframes/tutorials/motif.py) demonstrates how to load the graph, define various motifs and find all instances of the motif in the graph. + +NOTE: I use the terms `node` as interchangaable with `vertex` and `edge` with `link` or `relationship`. The API is [GraphFrame.vertices](https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html#graphframes.GraphFrame.vertices) and [GraphFrames.edges](https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html#graphframes.GraphFrame.edges) but some documentation says `relationships`. We need to add an alias from `g.vertices` to `g.nodes` and `g.edges` to both `g.relationships` and `g.links`. + +For a quick run-through of the script, use the following command: + +
+{% highlight bash %} +spark-submit --packages com.databricks:spark-xml_2.12:0.18.0 python/graphframes/tutorials/stackexchange.py +{% endhighlight %} +
+ +Let's walk through what it does, line-by-line. The script starts by importing the necessary modules and defining some utility functions for visualizing paths returned by [g.find()](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#motif-finding). Note that if you give `python/graphframes/tutorials/download.py` CLI a different subdomain, you will need to change the `STACKEXCHANGE_SITE` variable. + +
+{% highlight python %} +import pyspark.sql.functions as F +from graphframes import GraphFrame +from pyspark import SparkContext +from pyspark.sql import DataFrame, SparkSession + + +# Initialize a SparkSession +spark: SparkSession = ( + SparkSession.builder.appName("Stack Overflow Motif Analysis") + # Lets the Id:(Stack Overflow int) and id:(GraphFrames ULID) coexist + .config("spark.sql.caseSensitive", True) + .getOrCreate() +) +sc: SparkContext = spark.sparkContext +sc.setCheckpointDir("/tmp/graphframes-checkpoints") + +# Change me if you download a different stackexchange site +STACKEXCHANGE_SITE = "stats.meta.stackexchange.com" +BASE_PATH = f"python/graphframes/tutorials/data/{STACKEXCHANGE_SITE}" +{% endhighlight %} +
+ +Load the nodes and edges of the graph from the `data` folder and count the types of node and edge. We repartition the nodes and edges to give our motif searches parallelism. GraphFrames likes nodes/vertices and edges/relatonships to be cached. + +
+{% highlight python %} +# +# Load the nodes and edges from disk, repartition, checkpoint [plan got long for some reason] and cache. +# + +# We created these in stackexchange.py from Stack Exchange data dump XML files +NODES_PATH: str = f"{BASE_PATH}/Nodes.parquet" +nodes_df: DataFrame = spark.read.parquet(NODES_PATH) + +# Repartition the nodes to give our motif searches parallelism +nodes_df = nodes_df.repartition(50).checkpoint().cache() + +# We created these in stackexchange.py from Stack Exchange data dump XML files +EDGES_PATH: str = f"{BASE_PATH}/Edges.parquet" +edges_df: DataFrame = spark.read.parquet(EDGES_PATH) + +# Repartition the edges to give our motif searches parallelism +edges_df = edges_df.repartition(50).checkpoint().cache() +{% endhighlight %} +
+ +Check out the node types we have to work with: + +
+{% highlight python %} +# What kind of nodes we do we have to work with? +node_counts = ( + nodes_df + .select("id", F.col("Type").alias("Node Type")) + .groupBy("Node Type") + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) +) +node_counts.show() +{% endhighlight %} +
+ +
+{% highlight python %} ++---------+------+ +|Node Type| count| ++---------+------+ +| Badge|43,029| +| Vote|42,593| +| User|37,709| +| Answer| 2,978| +| Question| 2,025| +|PostLinks| 1,274| +| Tag| 143| ++---------+------+ +{% endhighlight %} +
+ +Check out the edge types we have to work with: + +
+{% highlight python %} +# What kind of edges do we have to work with? +edge_counts = ( + edges_df + .select("src", "dst", F.col("relationship").alias("Edge Type")) + .groupBy("Edge Type") + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) +) +edge_counts.show() +{% endhighlight %} +
+ +
+{% highlight python %} ++----------+------+ +| Edge Type| count| ++----------+------+ +| Earns|43,029| +| CastFor|40,701| +| Tags| 4,427| +| Answers| 2,978| +| Posts| 2,767| +| Asks| 1,934| +| Links| 1,180| +|Duplicates| 88| ++----------+------+ +{% endhighlight %} +
+ +

Combining Node Types

+ +Note: you don't need to run the code in this section, it is just for reference. The data we loaded above is already prepared for use. Jump ahead to Creating GraphFrames and run that next :) + +At the moment, GraphFrames has a limitation: there is only one node and edge type (for now). There are many fields in the nodes of our `GraphFrame` because there only one node type is available. I have combined different types of node into a single type by including all properties of all types in one class of node. I created a `Type` field for each type of node, then merged all fields into a single, global `nodes_df` `DataFrame`. This `Type` column can then be used in relational [DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html) operations to distinguish between types of nodes. + +This limitation is an annoyance that should be fixed in the future, with the ability to have multiple node types in a `GraphFrame`. In practice it isn't a big hit in productivity, but it means you have to [DataFrame.select](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.select.html) certain columns for each node `Type` when you do a [DataFrame.show()](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.show.html) or the width of the DataFrame will be too wide to easily read. + +Here is how that was accomplished in python/graphframes/tutorials/stackexchange.py. + +
+{% highlight python %} +# +# Form the nodes from the UNION of posts, users, votes and their combined schemas +# + +all_cols: List[Tuple[str, T.StructField]] = list( + set( + list(zip(posts_df.columns, posts_df.schema)) + + list(zip(post_links_df.columns, post_links_df.schema)) + + list(zip(comments_df.columns, comments_df.schema)) + + list(zip(users_df.columns, users_df.schema)) + + list(zip(votes_df.columns, votes_df.schema)) + + list(zip(tags_df.columns, tags_df.schema)) + + list(zip(badges_df.columns, badges_df.schema)) + ) +) +all_column_names: List[str] = sorted([x[0] for x in all_cols]) + + +def add_missing_columns(df: DataFrame, all_cols: List[Tuple[str, T.StructField]]) -> DataFrame: + """Add any missing columns from any DataFrame among several we want to merge.""" + for col_name, schema_field in all_cols: + if col_name not in df.columns: + df = df.withColumn(col_name, F.lit(None).cast(schema_field.dataType)) + return df + + +# Now apply this function to each of your DataFrames to get a consistent schema +posts_df = add_missing_columns(posts_df, all_cols).select(all_column_names) +post_links_df = add_missing_columns(post_links_df, all_cols).select(all_column_names) +users_df = add_missing_columns(users_df, all_cols).select(all_column_names) +votes_df = add_missing_columns(votes_df, all_cols).select(all_column_names) +tags_df = add_missing_columns(tags_df, all_cols).select(all_column_names) +badges_df = add_missing_columns(badges_df, all_cols).select(all_column_names) +assert ( + set(posts_df.columns) + == set(post_links_df.columns) + == set(users_df.columns) + == set(votes_df.columns) + == set(all_column_names) + == set(tags_df.columns) + == set(badges_df.columns) +) +{% endhighlight %} +
+ +

Creating GraphFrames

+ +Now we create a [GraphFrame object](https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html#graphframes.GraphFrame) from the `nodes_df` and `edges_df` `DataFrames`. We will use this object to find motifs in the graph. + +Back to our motifs :) It is time to create our GraphFrame object. It has a number of powerful APIs, including the [GraphFrame.find()](https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html#graphframes.GraphFrame.find) method for finding motifs in the graph. + +
+{% highlight python %} +g = GraphFrame(nodes_df, edges_df) + +g.vertices.show(10) +print(f"Node columns: {g.vertices.columns}") + +g.edges.sample(0.0001).show(10) +{% endhighlight %} +
+ +The `GraphFrame` object is created and the node columns and edges are displayed. + +
+{% highlight python %} +# Node DataFrame is too wide to display here... because it has this many columns. +Node columns: ['id', 'AboutMe', 'AcceptedAnswerId', 'AccountId', 'AnswerCount', 'Body', 'Class', 'ClosedDate', 'CommentCount', 'CommunityOwnedDate', 'ContentLicense', 'Count', 'CreationDate', 'Date', 'DisplayName', 'DownVotes', 'ExcerptPostId', 'FavoriteCount', 'Id', 'IsModeratorOnly', 'IsRequired', 'LastAccessDate', 'LastActivityDate', 'LastEditDate', 'LastEditorDisplayName', 'LastEditorUserId', 'LinkTypeId', 'Location', 'Name', 'OwnerDisplayName', 'OwnerUserId', 'ParentId', 'PostId', 'PostTypeId', 'RelatedPostId', 'Reputation', 'Score', 'TagBased', 'TagName', 'Tags', 'Text', 'Title', 'Type', 'UpVotes', 'UserDisplayName', 'UserId', 'ViewCount', 'Views', 'VoteType', 'VoteTypeId', 'WebsiteUrl', 'WikiPostId', 'degree'] + +# Edge DataFrame is simpler ++--------------------+--------------------+------------+ +| src| dst|relationship| ++--------------------+--------------------+------------+ +|b0d39443-5b0b-42e...|3e84a1ed-1c20-413...| Answers| +|4c781826-3112-4b2...|07665936-d759-4f6...| Earns| +|a11d77a7-da09-4b0...|a9ea6e7d-7cc1-408...| CastFor| +|bd42f75a-b3ee-4d0...|fe216e41-1ae0-4c0...| Earns| +|4dd3c6be-b103-4ab...|2aa18136-59a7-498...| Earns| +|13540451-5823-417...|37966108-de38-4aa...| CastFor| +|f60ed1aa-5361-4ab...|1c5352c1-d084-47c...| Earns| +|9cb948f8-c7d5-40d...|71bc77c4-dfe7-47e...| Earns| +|03980309-e97e-402...|d0b4c366-c8d0-458...| Asks| +|b3736001-b654-419...|14920c81-232b-479...| Earns| ++--------------------+--------------------+------------+ +only showing top 10 rows +{% endhighlight %} +
+ +

Validating GraphFrames

+ +Let's validate that all edges in our `GraphFrame` object have valid IDs - it is common to make mistakes in ETL for knowledge graph construction and have edges that point nowhere. GraphFrames tries to validate itself but can sometimes accept bogus edges. + +
+{% highlight python %} +# Sanity test that all edges have valid ids +edge_count = g.edges.count() +valid_edge_count = ( + g.edges.join(g.vertices, on=g.edges.src == g.vertices.id) + .select("src", "dst", "relationship") + .join(g.vertices, on=g.edges.dst == g.vertices.id) + .count() +) + +# Just up and die if we have edges that point to non-existent nodes +assert ( + edge_count == valid_edge_count +), f"Edge count {edge_count} != valid edge count {valid_edge_count}" +print(f"Edge count: {edge_count:,} == Valid edge count: {valid_edge_count:,}") +{% endhighlight %} +
+ +
+{% highlight python %} +Edge count: 97,104 == Valid edge count: 97,104 +{% endhighlight %} +
+ +

Structural Motifs

+ +Let's look for a simple motif: a directed triangle. We will find all instances of a directed triangle in the graph. The [`GraphFrame.find()`](https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html#graphframes.GraphFrame.find) method takes a string as an argument that specifies the structure of a motif one edge at a time, in the same syntax as Cypher, with a semi-colon between edges. For a triangle motif, that works out to: `(a)-[e]->(b); (b)-[e2]->(c); (c)-[e3]->(a)`. Edge labels are optional, this is valid graph query: `(a)-[]->(b)`. + +The `g.find()` method returns a `DataFrame` with fields fo each of the node and edge labels in the pattern. To further express the motif you're interested in, you can now use relational `DataFrame` operations to filter, group, and aggregate the results. This makes the network motif finding in GraphFrames very powerful, and this type of property graph motif was originally defined in the [graphframes paper](https://people.eecs.berkeley.edu/~matei/papers/2016/grades_graphframes.pdf). + +A complete description of the graph query language is in the [GraphFrames User Guide](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#motif-finding). Let's look at an example: a directed triangle. We will find all instances of a directed triangle in the graph. + +
+
+ G4 and G5 Directed Network Motifs +
+ G4 is a continuous triangle. G5 is a divergent triangle. +
+
+
+ +
+{% highlight python %} +# G4: Continuous Triangles +paths = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(a)") + +# Show the first path +paths.show(3) +{% endhighlight %} +
+ +The resulting path has a field for each step in the `find()`; each field has all properties of our nodes or edges. + +
+{% highlight python %} ++------------+------------+------------+------------+------------+------------+ +| a| e1| b| e2| c| e3| ++------------+------------+------------+------------+------------+------------+ +|{57198c52...|{57198c52...|{7fd044f5...|{7fd044f5...|{695b549b...|{695b549b...| +|{8f534b7c...|{8f534b7c...|{e65038cf...|{e65038cf...|{d5ea2a3d...|{d5ea2a3d...| +|{695b549b...|{695b549b...|{57198c52...|{57198c52...|{7fd044f5...|{7fd044f5...| ++------------+------------+------------+------------+------------+------------+ +only showing top 3 rows +{% endhighlight %} +
+ +This can be overwhelming to look at, so in practice you will `DataFrame.select()` (a path is just a pyspark.sql.DataFrame) the properties of interest. + +Aggregating paths can express powerful semantics. Let's count the types of paths of this triangle motif in the graph of each node and edge type. + +
+{% highlight python %} +graphlet_type_df = paths.select( + F.col("a.Type").alias("A_Type"), + F.col("e1.relationship").alias("(a)-[e1]->(b)"), + F.col("b.Type").alias("B_Type"), + F.col("e2.relationship").alias("(b)-[e2]->(c)"), + F.col("c.Type").alias("C_Type"), + F.col("e3.relationship").alias("(c)-[e3]->(a)"), +) + +graphlet_count_df = ( + graphlet_type_df.groupby( + "A_Type", "(a)-[e1]->(b)", "B_Type", "(b)-[e2]->(c)", "C_Type", "(c)-[e3]->(a)" + ) + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) +) +graphlet_count_df.show() +{% endhighlight %} +
+ +The result shows the only continuous triangles in the graph are 39 question-link loops. Motif matching for simple motifs based on topology alone can be used to for exploratory data analysis over a knowledge graph in the same way you might run GROUP BY / COUNT queries on a table in a relational database to start to understand its contents. + +
+{% highlight python %} ++--------+-------------+--------+-------------+--------+-------------+-----+ +| A_Type|(a)-[e1]->(b)| B_Type|(b)-[e2]->(c)| C_Type|(c)-[e3]->(a)|count| ++--------+-------------+--------+-------------+--------+-------------+-----+ +|Question| Links|Question| Links|Question| Links| 24| +|Question| Duplicates|Question| Links|Question| Links| 4| +|Question| Links|Question| Links|Question| Duplicates| 4| +|Question| Links|Question| Duplicates|Question| Links| 4| +|Question| Duplicates|Question| Links|Question| Duplicates| 1| +|Question| Duplicates|Question| Duplicates|Question| Links| 1| +|Question| Links|Question| Duplicates|Question| Duplicates| 1| ++--------+-------------+--------+-------------+--------+-------------+-----+ +{% endhighlight %} +
+ +Let's try a different triangle, a divergent triangle. The code to visualize a 3-edged motif is the same each time. + +
+{% highlight python %} +# G5: Divergent Triangles +paths = g.find("(a)-[e1]->(b); (a)-[e2]->(c); (c)-[e3]->(b)") + +graphlet_type_df = paths.select( + F.col("a.Type").alias("A_Type"), + F.col("e1.relationship").alias("(a)-[e1]->(b)"), + F.col("b.Type").alias("B_Type"), + F.col("e2.relationship").alias("(a)-[e2]->(c)"), + F.col("c.Type").alias("C_Type"), + F.col("e3.relationship").alias("(c)-[e3]->(b)"), +) + +graphlet_count_df = ( + graphlet_type_df.groupby( + "A_Type", "(a)-[e1]->(b)", "B_Type", "(a)-[e2]->(c)", "C_Type", "(c)-[e3]->(b)" + ) + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) +) +graphlet_count_df.show() +{% endhighlight %} +
+ +The result is a count of the divergent triangles in the graph by type. + +
+{% highlight python %} ++--------+-------------+--------+-------------+--------+-------------+-----+ +| A_Type|(a)-[e1]->(b)| B_Type|(a)-[e2]->(c)| C_Type|(c)-[e3]->(b)|count| ++--------+-------------+--------+-------------+--------+-------------+-----+ +| Tag| Tags|Question| Tags|Question| Links|1,775| +| User| Asks|Question| Posts| Answer| Answers| 274| +|Question| Links|Question| Links|Question| Links| 236| +| Tag| Tags|Question| Tags|Question| Duplicates| 140| +| User| Asks|Question| Asks|Question| Links| 103| +|Question| Links|Question| Links|Question| Duplicates| 14| +|Question| Duplicates|Question| Links|Question| Links| 13| +|Question| Links|Question| Duplicates|Question| Links| 12| +| User| Asks|Question| Asks|Question| Duplicates| 8| +|Question| Duplicates|Question| Links|Question| Duplicates| 8| +|Question| Duplicates|Question| Duplicates|Question| Links| 7| +|Question| Duplicates|Question| Duplicates|Question| Duplicates| 2| +|Question| Links|Question| Duplicates|Question| Duplicates| 1| ++--------+-------------+--------+-------------+--------+-------------+-----+ +{% endhighlight %} +
+ +1. (Tag)-[Tags]->(Question B); (Tag)-[Tags]->(Question C); (Question C)-[Links]->(Question B), or "A tag is used on a question, that tag is used on another question, and the two questions are linked." It makes sense that questions sharing tags are often linked. +2. (User)-[Asks]->(Question B); (User)-[Posts]->(Answer C); (Answer C)-[Answers]->(Question B), or "A user answers their own question." +3. A triangle of linked questions. +4. (Tag)-[Tags]->(Question B); (Tag)-[Tags]->(Question C); (Question B)-[Duplicates]->(Question C), or "A tag appears for a pair of duplicate answers." +5. A user asks linked questions. + +

Property Graph Motifs

+ +Simple motif finding can be used to explore a knowledge graph. It is also possibel to use domain knowledge to define and match known patterns and then explore new variant motifs. This can be used to apply and then expand domain knowledge about a knowledge graph. It is powerful stuff! + +We can do more with the properties of paths than just count them by node and edge type. We can use the properties of the nodes and edges in the paths to filter, group, and aggregate the results to form property graph motifs. Such complex motifs were first defined (without being formally named) in the paper describing this prject: GraphFrames: An Integrated API for Mixing Graph and Relational Queries, Dave et al. 2016. They are a combination of graph and relational queries. We can use them to find complex patterns in the graph. + +The larger motifs get, the more interesting they are. Five nodes is often the limit with a Spark cluster, depending on how large your graph is. In this instance I will limit myself to a 4-path pattern as you may not have a Spark cluster on which to learn. Keep in mind that I am talking about paths - through aggregation a motif might cover thousands of nodes! + +First lets express the structural logic of the motif we are looking for. Let's try G22 - a triangle with a fourth node pointing at the node with in-degree of 2. The pattern is (a)-[e1]->(b); (a)-[e2]->(c); (c)-[e3]->(b); (d)-[e4]->(b). + +Visually this pattern looks like this: + +
+
+ G30: an opposed 3-path +
+ G30: an opposed 3-path +
+
+
+ +The simplest pattern with four nodes is a 3-path, directed graphlet G30. Let's see how aggregation makes this a more powerful pattern than we might at first guess. + +
+{% highlight python %} +# G17: A directed 3-path is a surprisingly diverse graphlet +paths = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (d)-[e3]->(c)") +{% endhighlight %} +
+ +Let's count the number of instances by type for of this path in the graph. To let you know of a hard-won tip: alias the edge with its pattern. This makes it easier to read the results, even when C points to A or B, not D. + +
+{% highlight python %} +# Visualize the four-path by counting instances of paths by node / edge type +graphlet_type_df = paths.select( + F.col("a.Type").alias("A_Type"), + F.col("e1.relationship").alias("(a)-[e1]->(b)"), + F.col("b.Type").alias("B_Type"), + F.col("e2.relationship").alias("(b)-[e2]->(c)"), + F.col("c.Type").alias("C_Type"), + F.col("e3.relationship").alias("(d)-[e3]->(c)"), + F.col("d.Type").alias("D_Type"), +) +graphlet_count_df = ( + graphlet_type_df.groupby( + "A_Type", + "(a)-[e1]->(b)", + "B_Type", + "(b)-[e2]->(c)", + "C_Type", + "(d)-[e3]->(c)", + "D_Type", + ) + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) +) +graphlet_count_df.show() +{% endhighlight %} +
+ +
+{% highlight python %} ++--------+-------------+--------+-------------+--------+-------------+--------+-------+ +| A_Type|(a)-[e1]->(b)| B_Type|(b)-[e2]->(c)| C_Type|(d)-[e3]->(c)| D_Type| count| ++--------+-------------+--------+-------------+--------+-------------+--------+-------+ +| Vote| CastFor| Answer| Answers|Question| CastFor| Vote|445,707| +| Vote| CastFor|Question| Links|Question| CastFor| Vote|300,017| +| Vote| CastFor| Answer| Answers|Question| Answers| Answer|117,981| +| Vote| CastFor|Question| Links|Question| Links|Question| 73,227| +| Vote| CastFor| Answer| Answers|Question| Tags| Tag| 64,510| +| Tag| Tags|Question| Links|Question| CastFor| Vote| 62,203| +| Answer| Answers|Question| Links|Question| CastFor| Vote| 56,119| +| Vote| CastFor|Question| Links|Question| Answers| Answer| 55,938| +| Vote| CastFor| Answer| Answers|Question| Links|Question| 55,139| +| Vote| CastFor|Question| Links|Question| Tags| Tag| 38,633| +| User| Posts| Answer| Answers|Question| CastFor| Vote| 37,655| +|Question| Links|Question| Links|Question| CastFor| Vote| 33,747| +| Vote| CastFor| Answer| Answers|Question| Asks| User| 23,234| +| User| Asks|Question| Links|Question| CastFor| Vote| 22,243| +| Tag| Tags|Question| Links|Question| Links|Question| 17,266| +| Answer| Answers|Question| Links|Question| Links|Question| 16,362| +| Answer| Answers|Question| Links|Question| Answers| Answer| 14,013| +| Vote| CastFor|Question| Links|Question| Asks| User| 13,252| +| Tag| Tags|Question| Links|Question| Answers| Answer| 12,920| +| User| Posts| Answer| Answers|Question| Answers| Answer| 12,105| ++--------+-------------+--------+-------------+--------+-------------+--------+-------+ +only showing top 20 rows +{% endhighlight %} +
+ +How many of these motifs are there in the graph? Let's count them. + +
+{% highlight python %} +graphlet_count_df.count() +104 +{% endhighlight %} +
+ +104 - we hit the network motif jackpot with this pattern! Let's order by the successive elements of the pattern to group them logically. + +
+{% highlight python %} +graphlet_count_df.orderBy([ + "A_Type", + "(a)-[e1]->(b)", + "B_Type", + "(b)-[e2]->(c)", + "C_Type", + "(d)-[e3]->(c)", + "D_Type", +], ascending=False).show(104) +{% endhighlight %} +
+ +
+{% highlight python %} ++--------+-------------+--------+-------------+--------+-------------+--------+-------+ +| A_Type|(a)-[e1]->(b)| B_Type|(b)-[e2]->(c)| C_Type|(d)-[e3]->(c)| D_Type| count| ++--------+-------------+--------+-------------+--------+-------------+--------+-------+ +| Vote| CastFor|Question| Links|Question| Tags| Tag| 38,633| +| Vote| CastFor|Question| Links|Question| Links|Question| 73,227| +| Vote| CastFor|Question| Links|Question| Duplicates|Question| 3,337| +| Vote| CastFor|Question| Links|Question| CastFor| Vote|300,017| +| Vote| CastFor|Question| Links|Question| Asks| User| 13,252| +| Vote| CastFor|Question| Links|Question| Answers| Answer| 55,938| +| Vote| CastFor|Question| Links| Answer| Posts| User| 18| +| Vote| CastFor|Question| Links| Answer| Links|Question| 18| +| Vote| CastFor|Question| Links| Answer| CastFor| Vote| 36| +| Vote| CastFor|Question| Duplicates|Question| Tags| Tag| 1,292| +| Vote| CastFor|Question| Duplicates|Question| Links|Question| 1,556| +| Vote| CastFor|Question| Duplicates|Question| Duplicates|Question| 693| +| Vote| CastFor|Question| Duplicates|Question| CastFor| Vote| 8,205| +| Vote| CastFor|Question| Duplicates|Question| Asks| User| 418| +| Vote| CastFor|Question| Duplicates|Question| Answers| Answer| 2,423| +| Vote| CastFor| Answer| Answers|Question| Tags| Tag| 64,510| +| Vote| CastFor| Answer| Answers|Question| Links|Question| 55,139| +| Vote| CastFor| Answer| Answers|Question| Duplicates|Question| 3,941| +| Vote| CastFor| Answer| Answers|Question| CastFor| Vote|445,707| +| Vote| CastFor| Answer| Answers|Question| Asks| User| 23,234| +| Vote| CastFor| Answer| Answers|Question| Answers| Answer|117,981| +| User| Posts| Answer| Answers|Question| Tags| Tag| 7,164| +| User| Posts| Answer| Answers|Question| Links|Question| 4,494| +| User| Posts| Answer| Answers|Question| Duplicates|Question| 378| +| User| Posts| Answer| Answers|Question| CastFor| Vote| 37,655| +| User| Posts| Answer| Answers|Question| Asks| User| 2,614| +| User| Posts| Answer| Answers|Question| Answers| Answer| 12,105| +| User| Asks|Question| Links|Question| Tags| Tag| 3,169| +| User| Asks|Question| Links|Question| Links|Question| 6,119| +| User| Asks|Question| Links|Question| Duplicates|Question| 331| +| User| Asks|Question| Links|Question| CastFor| Vote| 22,243| +| User| Asks|Question| Links|Question| Asks| User| 1,064| +| User| Asks|Question| Links|Question| Answers| Answer| 4,599| +| User| Asks|Question| Links| Answer| Posts| User| 1| +| User| Asks|Question| Links| Answer| Links|Question| 1| +| User| Asks|Question| Links| Answer| CastFor| Vote| 2| +| User| Asks|Question| Duplicates|Question| Tags| Tag| 264| +| User| Asks|Question| Duplicates|Question| Links|Question| 338| +| User| Asks|Question| Duplicates|Question| Duplicates|Question| 134| +| User| Asks|Question| Duplicates|Question| CastFor| Vote| 1,528| +| User| Asks|Question| Duplicates|Question| Asks| User| 86| +| User| Asks|Question| Duplicates|Question| Answers| Answer| 374| +| Tag| Tags|Question| Links|Question| Tags| Tag| 9,332| +| Tag| Tags|Question| Links|Question| Links|Question| 17,266| +| Tag| Tags|Question| Links|Question| Duplicates|Question| 931| +| Tag| Tags|Question| Links|Question| CastFor| Vote| 62,203| +| Tag| Tags|Question| Links|Question| Asks| User| 3,037| +| Tag| Tags|Question| Links|Question| Answers| Answer| 12,920| +| Tag| Tags|Question| Links| Answer| Posts| User| 8| +| Tag| Tags|Question| Links| Answer| Links|Question| 8| +| Tag| Tags|Question| Links| Answer| CastFor| Vote| 16| +| Tag| Tags|Question| Duplicates|Question| Tags| Tag| 666| +| Tag| Tags|Question| Duplicates|Question| Links|Question| 828| +| Tag| Tags|Question| Duplicates|Question| Duplicates|Question| 341| +| Tag| Tags|Question| Duplicates|Question| CastFor| Vote| 3,715| +| Tag| Tags|Question| Duplicates|Question| Asks| User| 215| +| Tag| Tags|Question| Duplicates|Question| Answers| Answer| 965| +|Question| Links|Question| Links|Question| Tags| Tag| 5,220| +|Question| Links|Question| Links|Question| Links|Question| 10,140| +|Question| Links|Question| Links|Question| Duplicates|Question| 387| +|Question| Links|Question| Links|Question| CastFor| Vote| 33,747| +|Question| Links|Question| Links|Question| Asks| User| 1,740| +|Question| Links|Question| Links|Question| Answers| Answer| 7,330| +|Question| Links|Question| Links| Answer| Posts| User| 2| +|Question| Links|Question| Links| Answer| Links|Question| 2| +|Question| Links|Question| Links| Answer| CastFor| Vote| 4| +|Question| Links|Question| Duplicates|Question| Tags| Tag| 102| +|Question| Links|Question| Duplicates|Question| Links|Question| 163| +|Question| Links|Question| Duplicates|Question| Duplicates|Question| 85| +|Question| Links|Question| Duplicates|Question| CastFor| Vote| 611| +|Question| Links|Question| Duplicates|Question| Asks| User| 45| +|Question| Links|Question| Duplicates|Question| Answers| Answer| 308| +|Question| Links| Answer| Answers|Question| Tags| Tag| 4| +|Question| Links| Answer| Answers|Question| Links|Question| 4| +|Question| Links| Answer| Answers|Question| CastFor| Vote| 10| +|Question| Links| Answer| Answers|Question| Asks| User| 2| +|Question| Links| Answer| Answers|Question| Answers| Answer| 17| +|Question| Duplicates|Question| Links|Question| Tags| Tag| 328| +|Question| Duplicates|Question| Links|Question| Links|Question| 511| +|Question| Duplicates|Question| Links|Question| Duplicates|Question| 38| +|Question| Duplicates|Question| Links|Question| CastFor| Vote| 2,019| +|Question| Duplicates|Question| Links|Question| Asks| User| 125| +|Question| Duplicates|Question| Links|Question| Answers| Answer| 559| +|Question| Duplicates|Question| Duplicates|Question| Tags| Tag| 19| +|Question| Duplicates|Question| Duplicates|Question| Links|Question| 20| +|Question| Duplicates|Question| Duplicates|Question| Duplicates|Question| 17| +|Question| Duplicates|Question| Duplicates|Question| CastFor| Vote| 98| +|Question| Duplicates|Question| Duplicates|Question| Asks| User| 9| +|Question| Duplicates|Question| Duplicates|Question| Answers| Answer| 67| +| Answer| Answers|Question| Links|Question| Tags| Tag| 8,187| +| Answer| Answers|Question| Links|Question| Links|Question| 16,362| +| Answer| Answers|Question| Links|Question| Duplicates|Question| 811| +| Answer| Answers|Question| Links|Question| CastFor| Vote| 56,119| +| Answer| Answers|Question| Links|Question| Asks| User| 2,758| +| Answer| Answers|Question| Links|Question| Answers| Answer| 14,013| +| Answer| Answers|Question| Links| Answer| Posts| User| 2| +| Answer| Answers|Question| Links| Answer| Links|Question| 2| +| Answer| Answers|Question| Links| Answer| CastFor| Vote| 4| +| Answer| Answers|Question| Duplicates|Question| Tags| Tag| 224| +| Answer| Answers|Question| Duplicates|Question| Links|Question| 316| +| Answer| Answers|Question| Duplicates|Question| Duplicates|Question| 198| +| Answer| Answers|Question| Duplicates|Question| CastFor| Vote| 1,330| +| Answer| Answers|Question| Duplicates|Question| Asks| User| 110| +| Answer| Answers|Question| Duplicates|Question| Answers| Answer| 1,174| ++--------+-------------+--------+-------------+--------+-------------+--------+-------+ +{% endhighlight %} +
+ +The fourth row catches my eye - there are 300,017 matches for the votes cast for linked questions: (Vote A)-[CastFor]->(Question B); (Question B)-[Links]->(Question C); (Vote D)-[CastFor]->(Question C). This gives a way to compare the popularity of linked questions! Let's calculate how correlated linked questions are. + +
+{% highlight python %} +# A user answers an answer that answers a question that links to an answer. +linked_vote_paths = paths.filter( + (F.col("a.Type") == "Vote") & + (F.col("e1.relationship") == "CastFor") & + (F.col("b.Type") == "Question") & + (F.col("e2.relationship") == "Links") & + (F.col("c.Type") == "Question") & + (F.col("e3.relationship") == "CastFor") & + (F.col("d.Type") == "Vote") +) + +# Sanity check the count - it should match the table above +linked_vote_paths.count() + +300017 +{% endhighlight %} +
+ +We start by using aggregation to count the total votes cast for each end of a link questions. To get the count for Question B, get the distinct 3-paths, group by it's ID and count the votes. + +
+{% highlight python %} +b_vote_counts = linked_vote_paths.select("a", "b").distinct().groupBy("b").count() +c_vote_counts = linked_vote_paths.select("c", "d").distinct().groupBy("c").count() +{% endhighlight %} +
+ +Now join the counts to the links to get the total votes for each pair of linked questions. Then run `pyspark.sql.DataFrame.stats.corr()` to get the correlation between the vote counts for linked questions. We'll use the `Vote.VoteTypeId` to ensure only positive votes are counted. + +
+{% highlight python %} +linked_vote_counts = ( + linked_vote_paths + .filter((F.col("a.VoteTypeId") == 2) & (F.col("d.VoteTypeId") == 2)) + .select("b", "c") + .join(b_vote_counts, on="b", how="inner") + .withColumnRenamed("count", "b_count") + .join(c_vote_counts, on="c", how="inner") + .withColumnRenamed("count", "c_count") +) +linked_vote_counts.stat.corr("b_count", "c_count") +0.4287709940689788 +{% endhighlight %} + +We conclude there is a moderate correlation in the vote counts of linked questions. This makes sense. Note that this is only the fourth row, there are many more patterns to be examined and considered. + +This is just one type of aggregation you can employ - but hopefully it illustrates the way properties and aggregation and other relational operators can transform simple pattern matching into a powerful tool for exploring a knowledge graph. + +

Conclusion

+ +In this tutorial, we learned to use GraphFrames to find network motifs in a property graph. We saw how to combine graph and relational queries to find complex patterns in the graph. We also saw how to use the properties of the nodes and edges in the paths to filter, group, and aggregate the results to form complex property graph motifs. Motif finding in GraphFrames is a powerful technique that can be used to explore and understand complex networks. Network motifs are the building blocks of complex networks. diff --git a/docs/user-guide.md b/docs/user-guide.md index 7e66f74ff..5d2a112a9 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -5,8 +5,7 @@ title: User Guide description: GraphFrames GRAPHFRAMES_VERSION user guide --- -This page gives examples of how to use GraphFrames for basic queries, motif finding, and -general graph algorithms. This includes code examples in Scala and Python. +This page gives examples of how to use GraphFrames for basic queries, motif finding, and general graph algorithms. This includes code examples in Scala and Python. * Table of contents (This text will be scraped.) {:toc} @@ -174,7 +173,9 @@ from graphframes.examples import Graphs g = Graphs(spark).friends() # Get example graph # Display the vertex and edge DataFrames + g.vertices.show() + # +--+-------+---+ # |id| name|age| # +--+-------+---+ @@ -188,6 +189,7 @@ g.vertices.show() # +--+-------+---+ g.edges.show() + # +---+---+------------+ # |src|dst|relationship| # +---+---+------------+ @@ -204,12 +206,12 @@ g.edges.show() # Get a DataFrame with columns "id" and "inDegree" (in-degree) vertexInDegrees = g.inDegrees -# Find the youngest user's age in the graph. -# This queries the vertex DataFrame. +# Find the youngest user's age in the graph +# This queries the vertex DataFrame g.vertices.groupBy().min("age").show() -# Count the number of "follows" in the graph. -# This queries the edge DataFrame. +# Count the number of "follows" in the graph +# This queries the edge DataFrame numFollows = g.edges.filter("relationship = 'follow'").count() {% endhighlight %}
@@ -218,13 +220,9 @@ numFollows = g.edges.filter("relationship = 'follow'").count() # Motif finding -Motif finding refers to searching for structural patterns in a graph. +Motif finding refers to searching for structural patterns in a graph. For an example of real-world use, check out the [Motif Finding Tutorial](motif-tutorial.html). -GraphFrame motif finding uses a simple Domain-Specific Language (DSL) for expressing structural -queries. For example, `graph.find("(a)-[e]->(b); (b)-[e2]->(a)")` will search for pairs of vertices -`a,b` connected by edges in both directions. It will return a `DataFrame` of all such -structures in the graph, with columns for each of the named elements (vertices or edges) -in the motif. In this case, the returned columns will be "a, b, e, e2." +GraphFrame motif finding uses a simple Domain-Specific Language (DSL) for expressing structural queries. For example, `graph.find("(a)-[e]->(b); (b)-[e2]->(a)")` will search for pairs of vertices `a,b` connected by edges in both directions. It will return a `DataFrame` of all such structures in the graph, with columns for each of the named elements (vertices or edges) in the motif. In this case, the returned columns will be "a, b, e, e2." DSL for expressing structural patterns: @@ -304,11 +302,11 @@ from graphframes.examples import Graphs g = Graphs(spark).friends() # Get example graph -# Search for pairs of vertices with edges in both directions between them. +# Search for pairs of vertices with edges in both directions between them motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)") motifs.show() -# More complex queries can be expressed by applying filters. +# More complex queries can be expressed by applying filters motifs.filter("b.age > 30").show() {% endhighlight %} @@ -375,14 +373,16 @@ g = Graphs(spark).friends() # Get example graph chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)") # Query on sequence, with state (cnt) -# (a) Define method for updating state given the next element of the motif. +# (a) Define method for updating state given the next element of the motif sumFriends =\ lambda cnt,relationship: when(relationship == "friend", cnt+1).otherwise(cnt) -# (b) Use sequence operation to apply method to sequence of elements in motif. -# In this case, the elements are the 3 edges. + +# (b) Use sequence operation to apply method to sequence of elements in motif +# In this case, the elements are the 3 edges condition =\ reduce(lambda cnt,e: sumFriends(cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0)) -# (c) Apply filter to DataFrame. + +# (c) Apply filter to DataFrame chainWith2Friends2 = chain4.where(condition >= 2) chainWith2Friends2.show() {% endhighlight %} @@ -428,8 +428,8 @@ from graphframes.examples import Graphs g = Graphs(spark).friends() # Get example graph -# Select subgraph of users older than 30, and relationships of type "friend". -# Drop isolated vertices (users) which are not contained in any edges (relationships). +# Select subgraph of users older than 30, and relationships of type "friend" +# Drop isolated vertices (users) which are not contained in any edges (relationships) g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices() {% endhighlight %} @@ -470,15 +470,17 @@ from graphframes.examples import Graphs g = Graphs(spark).friends() # Get example graph # Select subgraph based on edges "e" of type "follow" -# pointing from a younger user "a" to an older user "b". +# pointing from a younger user "a" to an older user "b" paths = g.find("(a)-[e]->(b)")\ .filter("e.relationship = 'follow'")\ .filter("a.age < b.age") -# "paths" contains vertex info. Extract the edges. + +# "paths" contains vertex info. Extract the edges + e2 = paths.select("e.src", "e.dst", "e.relationship") -# In Spark 1.5+, the user may simplify this call: -# val e2 = paths.select("e.*") +# In Spark 1.5+, the user may simplify this call +# val e2 = paths.select("e.*") # Construct the subgraph g2 = GraphFrame(g.vertices, e2) {% endhighlight %} @@ -539,11 +541,11 @@ from graphframes.examples import Graphs g = Graphs(spark).friends() # Get example graph -# Search from "Esther" for users of age < 32. +# Search from "Esther" for users of age < 32 paths = g.bfs("name = 'Esther'", "age < 32") paths.show() -# Specify edge filters or max path lengths. +# Specify edge filters or max path lengths g.bfs("name = 'Esther'", "age < 32",\ edgeFilter="relationship != 'friend'", maxPathLength=3) {% endhighlight %} @@ -741,15 +743,16 @@ from graphframes.examples import Graphs g = Graphs(spark).friends() # Get example graph -# Run PageRank until convergence to tolerance "tol". +# Run PageRank until convergence to tolerance "tol" results = g.pageRank(resetProbability=0.15, tol=0.01) + # Display resulting pageranks and final edge weights -# Note that the displayed pagerank may be truncated, e.g., missing the E notation. -# In Spark 1.5+, you can use show(truncate=False) to avoid truncation. +# Note that the displayed pagerank may be truncated, e.g., missing the E notation +# In Spark 1.5+, you can use show(truncate=False) to avoid truncation results.vertices.select("id", "pagerank").show() results.edges.select("src", "dst", "weight").show() -# Run PageRank for a fixed number of iterations. +# Run PageRank for a fixed number of iterations results2 = g.pageRank(resetProbability=0.15, maxIter=10) # Run PageRank personalized for vertex "a" @@ -874,15 +877,15 @@ from graphframes.examples import Graphs g = Graphs(spark).friends() # Get example graph -# Save vertices and edges as Parquet to some location. +# Save vertices and edges as Parquet to some location g.vertices.write.parquet("hdfs://myLocation/vertices") g.edges.write.parquet("hdfs://myLocation/edges") -# Load the vertices and edges back. +# Load the vertices and edges back sameV = spark.read.parquet("hdfs://myLocation/vertices") sameE = spark.read.parquet("hdfs://myLocation/edges") -# Create an identical GraphFrame. +# Create an identical GraphFrame sameG = GraphFrame(sameV, sameE) {% endhighlight %} @@ -945,7 +948,7 @@ from pyspark.sql.functions import sum as sqlsum g = Graphs(spark).friends() # Get example graph -# For each user, sum the ages of the adjacent users. +# For each user, sum the ages of the adjacent users msgToSrc = AM.dst["age"] msgToDst = AM.src["age"] agg = g.aggregateMessages( @@ -1038,3 +1041,8 @@ val g2: GraphFrame = GraphFrame.fromGraphX(gx) These conversions are only supported in Scala since GraphX does not have a Python API. + +# GraphFrames Internals + +To learn how GraphFrames works internally to combine graph and relational queries, check out the paper [GraphFrames: An Integrated API for Mixing Graph and +Relational Queries, Dave et al. 2016](https://people.eecs.berkeley.edu/~matei/papers/2016/grades_graphframes.pdf). diff --git a/python/.gitignore b/python/.gitignore deleted file mode 100644 index 81410ca55..000000000 --- a/python/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -*.pyc -docs/_build/ -build/ -dist/ diff --git a/python/MANIFEST.in b/python/MANIFEST.in index 73eaf8ba2..4eb0ee5af 100644 --- a/python/MANIFEST.in +++ b/python/MANIFEST.in @@ -2,3 +2,7 @@ # https://github.com/pypa/sampleproject/blob/master/MANIFEST.in # For more details about the MANIFEST file, you may read the docs at # https://docs.python.org/2/distutils/sourcedist.html#the-manifest-in-template +recursive-include python/graphframes *.py +recursive-exclude * __pycache__ +recursive-exclude * *.pyc +include graphframes/tutorials/data/.exists diff --git a/python/docs/epytext.py b/python/docs/epytext.py index b02996415..f287f00c0 100644 --- a/python/docs/epytext.py +++ b/python/docs/epytext.py @@ -1,6 +1,7 @@ import re from sphinx.application import Sphinx + RULES = ( (r"<(!BLANKLINE)[\w.]+>", r""), (r"L{([\w.()]+)}", r":class:`\1`"), @@ -10,6 +11,7 @@ ('pyspark.rdd.RDD', 'RDD'), ) + def _convert_epytext(line: str) -> str: """ >>> _convert_epytext("L{A}") @@ -20,9 +22,11 @@ def _convert_epytext(line: str) -> str: line = re.sub(p, sub, line) return line + def _process_docstring(app: "Sphinx", what: str, name: str, obj: object, options: dict, lines: list[str]) -> None: for i in range(len(lines)): lines[i] = _convert_epytext(lines[i]) + def setup(app: "Sphinx") -> None: app.connect("autodoc-process-docstring", _process_docstring) diff --git a/python/docs/underscores.py b/python/docs/underscores.py index cabad3313..d990b76c3 100644 --- a/python/docs/underscores.py +++ b/python/docs/underscores.py @@ -33,27 +33,30 @@ from typing import Any from sphinx.application import Sphinx + def setup(app: Sphinx) -> None: """ Add a html-page-context and a build-finished event handlers """ - app.connect('html-page-context', change_pathto) - app.connect('build-finished', move_private_folders) + app.connect("html-page-context", change_pathto) + app.connect("build-finished", move_private_folders) + -def change_pathto(app: Sphinx, - pagename: str, - templatename: str, - context: dict[str, Any], - doctree: Any | None) -> None: +def change_pathto( + app: Sphinx, pagename: str, templatename: str, context: dict[str, Any], doctree: Any | None +) -> None: """ Replace pathto helper to change paths to folders with a leading underscore. """ - pathto: Callable = context.get('pathto') + pathto: Callable = context.get("pathto") + def gh_pathto(otheruri: str, *args: Any, **kw: Any) -> Any: - if otheruri.startswith('_'): + if otheruri.startswith("_"): otheruri = otheruri[1:] return pathto(otheruri, *args, **kw) - context['pathto'] = gh_pathto + + context["pathto"] = gh_pathto + def move_private_folders(app: Sphinx, e: Exception | None) -> None: """ @@ -61,9 +64,10 @@ def move_private_folders(app: Sphinx, e: Exception | None) -> None: :todo: should only affect html built """ + def join(dir): return os.path.join(app.builder.outdir, dir) for item in os.listdir(app.builder.outdir): - if item.startswith('_') and os.path.isdir(join(item)): + if item.startswith("_") and os.path.isdir(join(item)): shutil.move(join(item), join(item[1:])) diff --git a/python/graphframes/tests.py b/python/graphframes/tests.py index 9a7ad1371..259435759 100644 --- a/python/graphframes/tests.py +++ b/python/graphframes/tests.py @@ -15,63 +15,72 @@ # limitations under the License. # -import sys +import os import tempfile import shutil import re -if sys.version_info[:2] <= (2, 6): - try: - import unittest2 as unittest - except ImportError: - sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier') - sys.exit(1) -else: - import unittest - -from pyspark import SparkContext -from pyspark.sql import functions as sqlfunctions, SparkSession +import pytest +from pyspark import SparkConf, SparkContext +from pyspark.sql import functions as F, SparkSession from .graphframe import GraphFrame, Pregel, _java_api, _from_java_gf from .lib import AggregateMessages as AM from .examples import Graphs, BeliefPropagation + +VERSION = open("version.sbt").read().strip() + + +@pytest.fixture(scope="class", autouse=True) +def set_spark(request, spark_session): + request.cls.spark = spark_session + + +@pytest.mark.usefixtures("set_spark") class GraphFrameTestUtils(object): @classmethod def parse_spark_version(cls, version_str): - """ take an input version string - return version items in a dictionary + """take an input version string + return version items in a dictionary """ - _sc_ver_patt = r'(\d+)\.(\d+)(\.(\d+)(-(.+))?)?' + _sc_ver_patt = r"(\d+)\.(\d+)(\.(\d+)(-(.+))?)?" m = re.match(_sc_ver_patt, version_str) if not m: - raise TypeError("version {} shoud be in ..".format(version_str)) + raise TypeError( + "version {} shoud be in ..".format(version_str) + ) version_info = {} try: - version_info['major'] = int(m.group(1)) + version_info["major"] = int(m.group(1)) except: raise TypeError("invalid minor version") try: - version_info['minor'] = int(m.group(2)) + version_info["minor"] = int(m.group(2)) except: raise TypeError("invalid major version") try: - version_info['maintenance'] = int(m.group(4)) + version_info["maintenance"] = int(m.group(4)) except: - version_info['maintenance'] = 0 + version_info["maintenance"] = 0 try: - version_info['special'] = m.group(6) + version_info["special"] = m.group(6) except: pass return version_info @classmethod def createSparkContext(cls): - cls.sc = sc = SparkContext('local[4]', "GraphFramesTests") + cls.conf = SparkConf().setAppName("GraphFramesTests") + cls.conf.set( + "spark.submit.pyFiles", + os.path.abspath("python/dist/graphframes-{VERSION}-py3-none-any.whl"), + ) + cls.sc = SparkContext(master="local[4]", appName="GraphFramesTests", conf=cls.conf) cls.checkpointDir = tempfile.mkdtemp() cls.sc.setCheckpointDir(cls.checkpointDir) - cls.spark_version = cls.parse_spark_version(sc.version) + cls.spark_version = cls.parse_spark_version(cls.sc.version) @classmethod def stopSparkContext(cls): @@ -81,10 +90,10 @@ def stopSparkContext(cls): @classmethod def spark_at_least_of_version(cls, version_str): - assert hasattr(cls, 'spark_version') + assert hasattr(cls, "spark_version") required_version = cls.parse_spark_version(version_str) spark_version = cls.spark_version - for _name in ['major', 'minor', 'maintenance']: + for _name in ["major", "minor", "maintenance"]: sc_ver = spark_version[_name] req_ver = required_version[_name] if sc_ver != req_ver: @@ -92,28 +101,31 @@ def spark_at_least_of_version(cls, version_str): # All major.minor.maintenance equal return True -def setUpModule(): - GraphFrameTestUtils.createSparkContext() -def tearDownModule(): +@pytest.fixture(scope="module", autouse=True) +def spark_context(): + GraphFrameTestUtils.createSparkContext() + yield GraphFrameTestUtils.stopSparkContext() -class GraphFrameTestCase(unittest.TestCase): +@pytest.fixture(scope="class") +def spark_session(): + # Create a SparkSession with a smaller number of shuffle partitions. + spark = ( + SparkSession(GraphFrameTestUtils.sc) + .builder.config("spark.sql.shuffle.partitions", 4) + .getOrCreate() + ) + yield spark + # No explicit stop; SparkContext shutdown will clean up. - @classmethod - def setUpClass(cls): - # Small tests run much faster with spark.sql.shuffle.partitions = 4 - cls.spark = SparkSession(GraphFrameTestUtils.sc).builder.config('spark.sql.shuffle.partitions', 4).getOrCreate() - - @classmethod - def tearDownClass(cls): - cls.spark = None +@pytest.mark.usefixtures("set_spark") +class GraphFrameTest: -class GraphFrameTest(GraphFrameTestCase): - def setUp(self): - super(GraphFrameTest, self).setUp() + def setup_method(self, method): + # Mimic setUp: create a simple GraphFrame instance for each test. localVertices = [(1, "A"), (2, "B"), (3, "C")] localEdges = [(1, 2, "love"), (2, 1, "hate"), (2, 3, "follow")] v = self.spark.createDataFrame(localVertices, ["id", "name"]) @@ -123,28 +135,38 @@ def setUp(self): def test_spark_version_check(self): gtu = GraphFrameTestUtils gtu.spark_version = gtu.parse_spark_version("2.0.2") - self.assertTrue(gtu.spark_at_least_of_version("1.7")) - self.assertTrue(gtu.spark_at_least_of_version("2.0")) - self.assertTrue(gtu.spark_at_least_of_version("2.0.1")) - self.assertTrue(gtu.spark_at_least_of_version("2.0.2")) - self.assertFalse(gtu.spark_at_least_of_version("2.0.3")) - self.assertFalse(gtu.spark_at_least_of_version("2.1")) + + assert gtu.spark_at_least_of_version("1.7") + assert gtu.spark_at_least_of_version("2.0") + assert gtu.spark_at_least_of_version("2.0.1") + assert gtu.spark_at_least_of_version("2.0.2") + assert not gtu.spark_at_least_of_version("2.0.3") + assert not gtu.spark_at_least_of_version("2.1") def test_construction(self): g = self.g - vertexIDs = map(lambda x: x[0], g.vertices.select("id").collect()) + vertexIDs = [row[0] for row in g.vertices.select("id").collect()] assert sorted(vertexIDs) == [1, 2, 3] - edgeActions = map(lambda x: x[0], g.edges.select("action").collect()) + + edgeActions = [row[0] for row in g.edges.select("action").collect()] assert sorted(edgeActions) == ["follow", "hate", "love"] - tripletsFirst = list(map(lambda x: (x[0][1], x[1][1], x[2][2]), - g.triplets.sort("src.id").select("src", "dst", "edge").take(1))) + + tripletsFirst = list( + map( + lambda x: (x[0][1], x[1][1], x[2][2]), + g.triplets.sort("src.id").select("src", "dst", "edge").take(1), + ) + ) assert tripletsFirst == [("A", "B", "love")], tripletsFirst + # Try with invalid vertices and edges DataFrames v_invalid = self.spark.createDataFrame( - [(1, "A"), (2, "B"), (3, "C")], ["invalid_colname_1", "invalid_colname_2"]) + [(1, "A"), (2, "B"), (3, "C")], ["invalid_colname_1", "invalid_colname_2"] + ) e_invalid = self.spark.createDataFrame( - [(1, 2), (2, 3), (3, 1)], ["invalid_colname_3", "invalid_colname_4"]) - with self.assertRaises(ValueError): + [(1, 2), (2, 3), (3, 1)], ["invalid_colname_3", "invalid_colname_4"] + ) + with pytest.raises(ValueError): GraphFrame(v_invalid, e_invalid) def test_cache(self): @@ -155,17 +177,17 @@ def test_cache(self): def test_degrees(self): g = self.g outDeg = g.outDegrees - self.assertSetEqual(set(outDeg.columns), {"id", "outDegree"}) + assert set(outDeg.columns) == {"id", "outDegree"} inDeg = g.inDegrees - self.assertSetEqual(set(inDeg.columns), {"id", "inDegree"}) + assert set(inDeg.columns) == {"id", "inDegree"} deg = g.degrees - self.assertSetEqual(set(deg.columns), {"id", "degree"}) + assert set(deg.columns) == {"id", "degree"} def test_motif_finding(self): g = self.g motifs = g.find("(a)-[e]->(b)") assert motifs.count() == 3 - self.assertSetEqual(set(motifs.columns), {"a", "e", "b"}) + assert set(motifs.columns) == {"a", "e", "b"} def test_filterVertices(self): g = self.g @@ -178,8 +200,8 @@ def test_filterVertices(self): e2 = g2.edges.select("src", "dst", "action").collect() assert len(v2) == len(expected_v) assert len(e2) == len(expected_e) - self.assertSetEqual(set(v2), set(expected_v)) - self.assertSetEqual(set(e2), set(expected_e)) + assert set(v2) == set(expected_v) + assert set(e2) == set(expected_e) def test_filterEdges(self): g = self.g @@ -192,8 +214,8 @@ def test_filterEdges(self): e2 = g2.edges.select("src", "dst", "action").collect() assert len(v2) == len(expected_v) assert len(e2) == len(expected_e) - self.assertSetEqual(set(v2), set(expected_v)) - self.assertSetEqual(set(e2), set(expected_e)) + assert set(v2) == set(expected_v) + assert set(e2) == set(expected_e) def test_dropIsolatedVertices(self): g = self.g @@ -204,74 +226,93 @@ def test_dropIsolatedVertices(self): expected_e = [(2, 3, "follow")] assert len(v2) == len(expected_v) assert len(e2) == len(expected_e) - self.assertSetEqual(set(v2), set(expected_v)) - self.assertSetEqual(set(e2), set(expected_e)) + assert set(v2) == set(expected_v) + assert set(e2) == set(expected_e) def test_bfs(self): g = self.g paths = g.bfs("name='A'", "name='C'") - self.assertEqual(paths.count(), 1) - self.assertEqual(paths.select("v1.name").head()[0], "B") + assert paths.count() == 1 + # Expecting that the first intermediary vertex in the BFS is "B" + assert paths.select("v1.name").head()[0] == "B" + paths2 = g.bfs("name='A'", "name='C'", edgeFilter="action!='follow'") - self.assertEqual(paths2.count(), 0) + assert paths2.count() == 0 + paths3 = g.bfs("name='A'", "name='C'", maxPathLength=1) - self.assertEqual(paths3.count(), 0) + assert paths3.count() == 0 -class PregelTest(GraphFrameTestCase): - def setUp(self): - super(PregelTest, self).setUp() +@pytest.mark.usefixtures("set_spark") +class TestPregel: def test_page_rank(self): - from pyspark.sql.functions import coalesce, col, lit, sum, when - edges = self.spark.createDataFrame([[0, 1], - [1, 2], - [2, 4], - [2, 0], - [3, 4], # 3 has no in-links - [4, 0], - [4, 2]], ["src", "dst"]) + # Create an edge DataFrame; note that vertex 3 has no in-links. + edges = self.spark.createDataFrame( + [[0, 1], [1, 2], [2, 4], [2, 0], [3, 4], [4, 0], [4, 2]], + ["src", "dst"], + ) edges.cache() + + # Create a vertex DataFrame and count vertices. vertices = self.spark.createDataFrame([[0], [1], [2], [3], [4]], ["id"]) numVertices = vertices.count() + + # Get the outDegrees DataFrame from a GraphFrame built on the original vertices and edges. vertices = GraphFrame(vertices, edges).outDegrees vertices.cache() + + # Construct a new GraphFrame with the updated vertices DataFrame. graph = GraphFrame(vertices, edges) alpha = 0.15 - ranks = graph.pregel \ - .setMaxIter(5) \ - .withVertexColumn("rank", lit(1.0 / numVertices), - coalesce(Pregel.msg(), - lit(0.0)) * lit(1.0 - alpha) + lit(alpha / numVertices)) \ - .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) \ - .aggMsgs(sum(Pregel.msg())) \ + + # Run PageRank via Pregel. + ranks = ( + graph.pregel.setMaxIter(5) + .withVertexColumn( + "rank", + F.lit(1.0 / numVertices), + F.coalesce(Pregel.msg(), F.lit(0.0)) * F.lit(1.0 - alpha) + + F.lit(alpha / numVertices), + ) + .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) + .aggMsgs(F.sum(Pregel.msg())) .run() + ) + + # Collect and sort results. resultRows = ranks.sort(ranks.id).collect() - result = map(lambda x: x.rank, resultRows) + result = list(map(lambda x: x.rank, resultRows)) expected = [0.245, 0.224, 0.303, 0.03, 0.197] + + # Compare each result with its expected value using a tolerance of 1e-3. for a, b in zip(result, expected): - self.assertAlmostEqual(a, b, delta = 1e-3) + assert a == pytest.approx(b, abs=1e-3) + +@pytest.mark.usefixtures("set_spark") +class TestGraphFrameLib: -class GraphFrameLibTest(GraphFrameTestCase): - def setUp(self): - super(GraphFrameLibTest, self).setUp() + def setup_method(self, method): + # Set up the Java API instance for each test. self.japi = _java_api(self.spark._sc) - def _hasCols(self, graph, vcols = [], ecols = []): - map(lambda c: self.assertIn(c, graph.vertices.columns), vcols) - map(lambda c: self.assertIn(c, graph.edges.columns), ecols) + def _hasCols(self, graph, vcols=[], ecols=[]): + for c in vcols: + assert c in graph.vertices.columns, f"Vertex DataFrame missing column: {c}" + for c in ecols: + assert c in graph.edges.columns, f"Edge DataFrame missing column: {c}" - def _df_hasCols(self, vertices, vcols = []): - map(lambda c: self.assertIn(c, vertices.columns), vcols) + def _df_hasCols(self, df, vcols=[]): + for c in vcols: + assert c in df.columns, f"DataFrame missing column: {c}" def _graph(self, name, *args): """ - Convenience to call one of the example graphs, passing the arguments and wrapping the result back - as a python object. - :param name: the name of the example graph - :param args: all the required arguments, without the initial spark session - :return: + Convenience to call one of the example graphs, passing the arguments and wrapping the result as a Python object. + :param name: the name of the example graph. + :param args: all the required arguments (excluding the initial SparkSession). + :return: a GraphFrame object. """ examples = self.japi.examples() jgraph = getattr(examples, name)(*args) @@ -281,83 +322,79 @@ def test_aggregate_messages(self): g = self._graph("friends") # For each user, sum the ages of the adjacent users, # plus 1 for the src's sum if the edge is "friend". - sendToSrc = ( - AM.dst['age'] + - sqlfunctions.when( - AM.edge['relationship'] == 'friend', - sqlfunctions.lit(1) - ).otherwise(0)) - sendToDst = AM.src['age'] + sendToSrc = AM.dst["age"] + F.when(AM.edge["relationship"] == "friend", F.lit(1)).otherwise( + 0 + ) + sendToDst = AM.src["age"] agg = g.aggregateMessages( - sqlfunctions.sum(AM.msg).alias('summedAges'), - sendToSrc=sendToSrc, - sendToDst=sendToDst) - # Run the aggregation again providing SQL expressions as String instead. + F.sum(AM.msg).alias("summedAges"), sendToSrc=sendToSrc, sendToDst=sendToDst + ) + # Run the aggregation again using SQL expressions as Strings. agg2 = g.aggregateMessages( "sum(MSG) AS `summedAges`", sendToSrc="(dst['age'] + CASE WHEN (edge['relationship'] = 'friend') THEN 1 ELSE 0 END)", - sendToDst="src['age']") - # Convert agg and agg2 to a mapping from id to the aggregated message. - aggMap = {id_: s for id_, s in agg.select('id', 'summedAges').collect()} - agg2Map = {id_: s for id_, s in agg2.select('id', 'summedAges').collect()} - # Compute the truth via brute force. - user2age = {id_: age for id_, age in g.vertices.select('id', 'age').collect()} + sendToDst="src['age']", + ) + # Build mappings from id to the aggregated message. + aggMap = {row.id: row.summedAges for row in agg.select("id", "summedAges").collect()} + agg2Map = {row.id: row.summedAges for row in agg2.select("id", "summedAges").collect()} + # Compute the expected aggregation via brute force. + user2age = {row.id: row.age for row in g.vertices.select("id", "age").collect()} trueAgg = {} - for src, dst, rel in g.edges.select("src", "dst", "relationship").collect(): - trueAgg[src] = trueAgg.get(src, 0) + user2age[dst] + (1 if rel == 'friend' else 0) + for row in g.edges.select("src", "dst", "relationship").collect(): + src, dst, rel = row.src, row.dst, row.relationship + trueAgg[src] = trueAgg.get(src, 0) + user2age[dst] + (1 if rel == "friend" else 0) trueAgg[dst] = trueAgg.get(dst, 0) + user2age[src] - # Compare if the agg mappings match the brute force mapping - self.assertEqual(aggMap, trueAgg) - self.assertEqual(agg2Map, trueAgg) - # Check that TypeError is raises with messages of wrong type - with self.assertRaises(TypeError): + # Verify both aggregations match the expected results. + assert aggMap == trueAgg, f"aggMap {aggMap} does not equal expected {trueAgg}" + assert agg2Map == trueAgg, f"agg2Map {agg2Map} does not equal expected {trueAgg}" + # Check that passing a wrong type for messages raises a TypeError. + with pytest.raises(TypeError): g.aggregateMessages( - "sum(MSG) AS `summedAges`", - sendToSrc=object(), - sendToDst="src['age']") - with self.assertRaises(TypeError): + "sum(MSG) AS `summedAges`", sendToSrc=object(), sendToDst="src['age']" + ) + with pytest.raises(TypeError): g.aggregateMessages( - "sum(MSG) AS `summedAges`", - sendToSrc=dst['age'], - sendToDst=object()) + "sum(MSG) AS `summedAges`", sendToSrc=F.col("dst")["age"], sendToDst=object() + ) def test_connected_components(self): - v = self.spark.createDataFrame([ - (0, "a", "b")], ["id", "vattr", "gender"]) + v = self.spark.createDataFrame([(0, "a", "b")], ["id", "vattr", "gender"]) e = self.spark.createDataFrame([(0, 0, 1)], ["src", "dst", "test"]).filter("src > 10") g = GraphFrame(v, e) comps = g.connectedComponents() - self._df_hasCols(comps, vcols=['id', 'component', 'vattr', 'gender']) - self.assertEqual(comps.count(), 1) + self._df_hasCols(comps, vcols=["id", "component", "vattr", "gender"]) + assert comps.count() == 1 def test_connected_components2(self): v = self.spark.createDataFrame([(0, "a0", "b0"), (1, "a1", "b1")], ["id", "A", "B"]) e = self.spark.createDataFrame([(0, 1, "a01", "b01")], ["src", "dst", "A", "B"]) g = GraphFrame(v, e) comps = g.connectedComponents() - self._df_hasCols(comps, vcols=['id', 'component', 'A', 'B']) - self.assertEqual(comps.count(), 2) + self._df_hasCols(comps, vcols=["id", "component", "A", "B"]) + assert comps.count() == 2 def test_connected_components_friends(self): g = self._graph("friends") - comps_tests = [] - comps_tests += [g.connectedComponents()] - comps_tests += [g.connectedComponents(broadcastThreshold=1)] - comps_tests += [g.connectedComponents(checkpointInterval=0)] - comps_tests += [g.connectedComponents(checkpointInterval=10)] - comps_tests += [g.connectedComponents(algorithm="graphx")] + comps_tests = [ + g.connectedComponents(), + g.connectedComponents(broadcastThreshold=1), + g.connectedComponents(checkpointInterval=0), + g.connectedComponents(checkpointInterval=10), + g.connectedComponents(algorithm="graphx"), + ] for c in comps_tests: - self.assertEqual(c.groupBy("component").count().count(), 2) + assert c.groupBy("component").count().count() == 2 def test_label_progagation(self): n = 5 g = self._graph("twoBlobs", n) labels = g.labelPropagation(maxIter=4 * n) labels1 = labels.filter("id < 5").select("label").collect() - all1 = set([x.label for x in labels1]) + all1 = {row.label for row in labels1} assert len(all1) == 1 labels2 = labels.filter("id >= 5").select("label").collect() - all2 = set([x.label for x in labels2]) + all2 = {row.label for row in labels2} assert len(all2) == 1 assert all1 != all2 @@ -367,7 +404,7 @@ def test_page_rank(self): resetProb = 0.15 errorTol = 1.0e-5 pr = g.pageRank(resetProb, tol=errorTol) - self._hasCols(pr, vcols=['id', 'pagerank'], ecols=['src', 'dst', 'weight']) + self._hasCols(pr, vcols=["id", "pagerank"], ecols=["src", "dst", "weight"]) def test_parallel_personalized_page_rank(self): n = 100 @@ -376,31 +413,34 @@ def test_parallel_personalized_page_rank(self): maxIter = 15 sourceIds = [1, 2, 3, 4] pr = g.parallelPersonalizedPageRank(resetProb, sourceIds=sourceIds, maxIter=maxIter) - self._hasCols(pr, vcols=['id', 'pageranks'], ecols=['src', 'dst', 'weight']) + self._hasCols(pr, vcols=["id", "pageranks"], ecols=["src", "dst", "weight"]) def test_shortest_paths(self): edges = [(1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)] + # Create bidirectional edges. all_edges = [z for (a, b) in edges for z in [(a, b), (b, a)]] - edges = self.spark.createDataFrame(all_edges, ["src", "dst"]) + edgesDF = self.spark.createDataFrame(all_edges, ["src", "dst"]) vertices = self.spark.createDataFrame([(i,) for i in range(1, 7)], ["id"]) - g = GraphFrame(vertices, edges) + g = GraphFrame(vertices, edgesDF) landmarks = [1, 4] v2 = g.shortestPaths(landmarks) self._df_hasCols(v2, vcols=["id", "distances"]) def test_svd_plus_plus(self): g = self._graph("ALSSyntheticData") - (v2, cost) = g.svdPlusPlus() - self._df_hasCols(v2, vcols=['id', 'column1', 'column2', 'column3', 'column4']) + v2, cost = g.svdPlusPlus() + self._df_hasCols(v2, vcols=["id", "column1", "column2", "column3", "column4"]) def test_strongly_connected_components(self): - # Simple island test + # Simple island test. vertices = self.spark.createDataFrame([(i,) for i in range(1, 6)], ["id"]) edges = self.spark.createDataFrame([(7, 8)], ["src", "dst"]) g = GraphFrame(vertices, edges) c = g.stronglyConnectedComponents(5) for row in c.collect(): - self.assertEqual(row.id, row.component) + assert ( + row.id == row.component + ), f"Vertex {row.id} not equal to its component {row.component}" def test_triangle_counts(self): edges = self.spark.createDataFrame([(0, 1), (1, 2), (2, 0)], ["src", "dst"]) @@ -408,61 +448,66 @@ def test_triangle_counts(self): g = GraphFrame(vertices, edges) c = g.triangleCount() for row in c.select("id", "count").collect(): - self.assertEqual(row.asDict()['count'], 1) - + assert row.asDict()["count"] == 1, f"Triangle count for vertex {row.id} is not 1" + def test_mutithreaded_sparksession_usage(self): - # Test that we can use the GraphFrame API from multiple threads + # Test that the GraphFrame API works correctly from multiple threads. localVertices = [(1, "A"), (2, "B"), (3, "C")] localEdges = [(1, 2, "love"), (2, 1, "hate"), (2, 3, "follow")] v = self.spark.createDataFrame(localVertices, ["id", "name"]) e = self.spark.createDataFrame(localEdges, ["src", "dst", "action"]) - - + exc = None + def run_graphframe() -> None: + nonlocal exc try: GraphFrame(v, e) except Exception as _e: - nonlocal exc exc = _e - + import threading + thread = threading.Thread(target=run_graphframe) thread.start() thread.join() - self.assertIsNone(exc, f"Exception was raised in thread: {exc}") + assert exc is None, f"Exception was raised in thread: {exc}" + +@pytest.mark.usefixtures("set_spark") +class TestGraphFrameExamples: -class GraphFrameExamplesTest(GraphFrameTestCase): - def setUp(self): - super(GraphFrameExamplesTest, self).setUp() + def setup_method(self, method): + # Set up the Java API instance for use in the tests. self.japi = _java_api(self.spark._sc) def test_belief_propagation(self): - # create graphical model g of size 3 x 3 + # Create a graphical model g of size 3x3. g = Graphs(self.spark).gridIsingModel(3) - # run BP for 5 iterations + # Run Belief Propagation (BP) for 5 iterations. numIter = 5 results = BeliefPropagation.runBPwithGraphFrames(g, numIter) - # check beliefs are valid - for row in results.vertices.select('belief').collect(): - belief = row['belief'] - self.assertTrue( - 0 <= belief <= 1, - msg="Expected belief to be probability in [0,1], but found {}".format(belief)) + # Check that each belief is a valid probability in [0, 1]. + for row in results.vertices.select("belief").collect(): + belief = row["belief"] + assert ( + 0 <= belief <= 1 + ), f"Expected belief to be probability in [0,1], but found {belief}" def test_graph_friends(self): - # construct graph + # Construct the graph. g = Graphs(self.spark).friends() - # check that a GraphFrame instance was returned - self.assertIsInstance(g, GraphFrame) + # Check that the result is an instance of GraphFrame. + assert isinstance(g, GraphFrame) def test_graph_grid_ising_model(self): - # construct graph + # Construct a grid Ising model graph. n = 3 g = Graphs(self.spark).gridIsingModel(n) - # check that all the vertices exist - ids = [v['id'] for v in g.vertices.collect()] + # Collect the vertex ids. + ids = [v["id"] for v in g.vertices.collect()] + # Verify that every expected vertex id appears. for i in range(n): for j in range(n): - self.assertIn('{},{}'.format(i, j), ids) + expected_id = f"{i},{j}" + assert expected_id in ids, f"Vertex {expected_id} not found in {ids}" diff --git a/VERSION b/python/graphframes/tutorials/data/.exists similarity index 100% rename from VERSION rename to python/graphframes/tutorials/data/.exists diff --git a/python/graphframes/tutorials/download.py b/python/graphframes/tutorials/download.py new file mode 100755 index 000000000..154d84c14 --- /dev/null +++ b/python/graphframes/tutorials/download.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +import os +import click +import requests +import py7zr + + +@click.command() +@click.argument("subdomain") +@click.option("--data-dir", default="python/graphframes/tutorials/data", help="Directory to store downloaded files") +@click.option( + "--extract/--no-extract", default=True, help="Whether to extract the archive after download" +) +def download_stackexchange(subdomain: str, data_dir: str, extract: bool) -> None: + """Download Stack Exchange archive for a given SUBDOMAIN. + + Example: python/graphframes/tutorials/download.py stats.meta + + Note: This won't work for stackoverflow.com archives due to size. + """ + # Create data directory if it doesn't exist + os.makedirs(data_dir, exist_ok=True) + + # Construct archive URL and filename + archive_url = f"https://archive.org/download/stackexchange/{subdomain}.stackexchange.com.7z" + archive_path = os.path.join(data_dir, f"{subdomain}.stackexchange.com.7z") + + click.echo(f"Downloading archive from {archive_url}") + + try: + # Download the file + response = requests.get(archive_url, stream=True) + response.raise_for_status() # Raise exception for bad status codes + + total_size = int(response.headers.get("content-length", 0)) + + with click.progressbar(length=total_size, label="Downloading") as bar: + with open(archive_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + bar.update(len(chunk)) + + click.echo(f"Download complete: {archive_path}") + + # Extract if requested + if extract: + click.echo("Extracting archive...") + output_dir = f"{subdomain}.stackexchange.com" + with py7zr.SevenZipFile(archive_path, mode="r") as z: + z.extractall(path=os.path.join(data_dir, output_dir)) + click.echo(f"Extraction complete: {output_dir}") + + except requests.exceptions.RequestException as e: + click.echo(f"Error downloading archive: {e}", err=True) + raise click.Abort() + except py7zr.Bad7zFile as e: + click.echo(f"Error extracting archive: {e}", err=True) + raise click.Abort() + + +if __name__ == "__main__": + download_stackexchange() diff --git a/python/graphframes/tutorials/motif.py b/python/graphframes/tutorials/motif.py new file mode 100644 index 000000000..af253a998 --- /dev/null +++ b/python/graphframes/tutorials/motif.py @@ -0,0 +1,207 @@ +# Demonstrate GraphFrames network motif finding capabilities + +# +# Interactive Usage: pyspark --packages graphframes:graphframes:0.8.3-spark3.5-s_2.12 +# +# Batch Usage: spark-submit --packages graphframes:graphframes:0.8.3-spark3.5-s_2.12 python/graphframes/tutorials/motif.py +# + +import pyspark.sql.functions as F +from pyspark import SparkContext +from pyspark.sql import DataFrame, SparkSession + +from graphframes import GraphFrame + +# Initialize a SparkSession +spark: SparkSession = ( + SparkSession.builder.appName("Stack Overflow Motif Analysis") + # Lets the Id:(Stack Overflow int) and id:(GraphFrames ULID) coexist + .config("spark.sql.caseSensitive", True).getOrCreate() +) +sc: SparkContext = spark.sparkContext +sc.setCheckpointDir("/tmp/graphframes-checkpoints") + +# Change me if you download a different stackexchange site +STACKEXCHANGE_SITE = "stats.meta.stackexchange.com" +BASE_PATH = f"python/graphframes/tutorials/data/{STACKEXCHANGE_SITE}" + +# +# Load the nodes and edges from disk, repartition, checkpoint [plan got long for some reason] and cache. +# + +# We created these in stackexchange.py from Stack Exchange data dump XML files +NODES_PATH: str = f"{BASE_PATH}/Nodes.parquet" +nodes_df: DataFrame = spark.read.parquet(NODES_PATH) + +# Repartition the nodes to give our motif searches parallelism +nodes_df = nodes_df.repartition(50).checkpoint().cache() + +# We created these in stackexchange.py from Stack Exchange data dump XML files +EDGES_PATH: str = f"{BASE_PATH}/Edges.parquet" +edges_df: DataFrame = spark.read.parquet(EDGES_PATH) + +# Repartition the edges to give our motif searches parallelism +edges_df = edges_df.repartition(50).checkpoint().cache() + +# What kind of nodes we do we have to work with? +node_counts = ( + nodes_df.select("id", F.col("Type").alias("Node Type")) + .groupBy("Node Type") + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) +) +node_counts.show() + +# What kind of edges do we have to work with? +edge_counts = ( + edges_df.select("src", "dst", F.col("relationship").alias("Edge Type")) + .groupBy("Edge Type") + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) +) +edge_counts.show() + +g = GraphFrame(nodes_df, edges_df) + +g.vertices.show(10) +print(f"Node columns: {g.vertices.columns}") + +g.edges.sample(0.0001).show(10) + +# Sanity test that all edges have valid ids +edge_count = g.edges.count() +valid_edge_count = ( + g.edges.join(g.vertices, on=g.edges.src == g.vertices.id) + .select("src", "dst", "relationship") + .join(g.vertices, on=g.edges.dst == g.vertices.id) + .count() +) + +# Just up and die if we have edges that point to non-existent nodes +assert ( + edge_count == valid_edge_count +), f"Edge count {edge_count} != valid edge count {valid_edge_count}" +print(f"Edge count: {edge_count:,} == Valid edge count: {valid_edge_count:,}") + +# G4: Continuous Triangles +paths = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(a)") + +# Show the first path +paths.show(3) + +graphlet_type_df = paths.select( + F.col("a.Type").alias("A_Type"), + F.col("e1.relationship").alias("(a)-[e1]->(b)"), + F.col("b.Type").alias("B_Type"), + F.col("e2.relationship").alias("(b)-[e2]->(c)"), + F.col("c.Type").alias("C_Type"), + F.col("e3.relationship").alias("(c)-[e3]->(a)"), +) + +graphlet_count_df = ( + graphlet_type_df.groupby( + "A_Type", "(a)-[e1]->(b)", "B_Type", "(b)-[e2]->(c)", "C_Type", "(c)-[e3]->(a)" + ) + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) +) +graphlet_count_df.show() + +# G5: Divergent Triangles +paths = g.find("(a)-[e1]->(b); (a)-[e2]->(c); (c)-[e3]->(b)") + +graphlet_type_df = paths.select( + F.col("a.Type").alias("A_Type"), + F.col("e1.relationship").alias("(a)-[e1]->(b)"), + F.col("b.Type").alias("B_Type"), + F.col("e2.relationship").alias("(a)-[e2]->(c)"), + F.col("c.Type").alias("C_Type"), + F.col("e3.relationship").alias("(c)-[e3]->(b)"), +) + +graphlet_count_df = ( + graphlet_type_df.groupby( + "A_Type", "(a)-[e1]->(b)", "B_Type", "(a)-[e2]->(c)", "C_Type", "(c)-[e3]->(b)" + ) + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) +) +graphlet_count_df.show() + +# G17: A directed 3-path is a surprisingly diverse graphlet +paths = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (d)-[e3]->(c)") + +# Visualize the four-path by counting instances of paths by node / edge type +graphlet_type_df = paths.select( + F.col("a.Type").alias("A_Type"), + F.col("e1.relationship").alias("(a)-[e1]->(b)"), + F.col("b.Type").alias("B_Type"), + F.col("e2.relationship").alias("(b)-[e2]->(c)"), + F.col("c.Type").alias("C_Type"), + F.col("e3.relationship").alias("(d)-[e3]->(c)"), + F.col("d.Type").alias("D_Type"), +) +graphlet_count_df = ( + graphlet_type_df.groupby( + "A_Type", + "(a)-[e1]->(b)", + "B_Type", + "(b)-[e2]->(c)", + "C_Type", + "(d)-[e3]->(c)", + "D_Type", + ) + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) +) +graphlet_count_df.show() + +graphlet_count_df.orderBy( + [ + "A_Type", + "(a)-[e1]->(b)", + "B_Type", + "(b)-[e2]->(c)", + "C_Type", + "(d)-[e3]->(c)", + "D_Type", + ], + ascending=False, +).show(104) + +# A user answers an answer that answers a question that links to an answer. +linked_vote_paths = paths.filter( + (F.col("a.Type") == "Vote") + & (F.col("e1.relationship") == "CastFor") + & (F.col("b.Type") == "Question") + & (F.col("e2.relationship") == "Links") + & (F.col("c.Type") == "Question") + & (F.col("e3.relationship") == "CastFor") + & (F.col("d.Type") == "Vote") +) + +# Sanity check the count - it should match the table above +linked_vote_paths.count() + +b_vote_counts = linked_vote_paths.select("a", "b").distinct().groupBy("b").count() +c_vote_counts = linked_vote_paths.select("c", "d").distinct().groupBy("c").count() + +linked_vote_counts = ( + linked_vote_paths.filter((F.col("a.VoteTypeId") == 2) & (F.col("d.VoteTypeId") == 2)) + .select("b", "c") + .join(b_vote_counts, on="b", how="inner") + .withColumnRenamed("count", "b_count") + .join(c_vote_counts, on="c", how="inner") + .withColumnRenamed("count", "c_count") +) +linked_vote_counts.stat.corr("b_count", "c_count") diff --git a/python/graphframes/tutorials/stackexchange.py b/python/graphframes/tutorials/stackexchange.py new file mode 100644 index 000000000..c52f323bb --- /dev/null +++ b/python/graphframes/tutorials/stackexchange.py @@ -0,0 +1,579 @@ +# Build a Graph out of the Stack Exchange Data Dump XML files + +# +# Interactive Usage: pyspark --packages com.databricks:spark-xml_2.12:0.18.0 +# +# Batch Usage: spark-submit --packages com.databricks:spark-xml_2.12:0.18.0 python/graphframes/tutorials/stackexchange.py +# + +import re +from typing import List, Tuple + +import pyspark.sql.functions as F +import pyspark.sql.types as T +from pyspark.sql import DataFrame, SparkSession + +# Change me if you download a different stackexchange site +STACKEXCHANGE_SITE = "stats.meta.stackexchange.com" +BASE_PATH = f"python/graphframes/tutorials/data/{STACKEXCHANGE_SITE}" + + +# +# Some utility functions +# + + +def remove_prefix(df: DataFrame) -> DataFrame: + """Remove the _ prefix present in the fields of the DataFrame""" + field_names = [x.name for x in df.schema] + new_field_names = [x[1:] for x in field_names] + s = [] + + # Substitute the old name for the new one + for old, new in zip(field_names, new_field_names): + s.append(F.col(old).alias(new)) + return df.select(s) + + +@F.udf(returnType=T.ArrayType(T.StringType())) +def split_tags(tags: str) -> List[str]: + if not tags: + return [] + # Remove < and > and split into array + return re.findall(r"<([^>]+)>", tags) + + +# +# Initialize a SparkSession with case sensitivity +# + +spark: SparkSession = ( + SparkSession.builder.appName("Stack Exchange Graph Builder") + # Lets the Id:(Stack Overflow int) and id:(GraphFrames UUID) coexist + .config("spark.sql.caseSensitive", True).getOrCreate() +) + +print("Loading data for stats.meta.stackexchange.com ...") + + +# +# Load the Posts... +# +posts_df: DataFrame = ( + spark.read.format("xml") + .options(rowTag="row") + .options(rootTag="posts") + .load(f"{BASE_PATH}/Posts.xml") +) +print(f"\nTotal Posts: {posts_df.count():,}") + +# Remove the _ prefix from field names +posts_df = remove_prefix(posts_df) + +# Create a list of tags +posts_df = ( + posts_df.withColumn( + "ParsedTags", F.split(F.regexp_replace(F.col("Tags"), "^\\||\\|$", ""), "\\|") + ) + .drop("Tags") + .withColumnRenamed("ParsedTags", "Tags") +) + + +# +# Building blocks: separate the questions and answers +# + +# Do the questions look ok? Questions have NO parent ID and DO have a Title +questions_df: DataFrame = posts_df.filter(posts_df.ParentId.isNull()) +questions_df = questions_df.withColumn("Type", F.lit("Question")).cache() +print(f"\nTotal questions: {questions_df.count():,}\n") + +questions_df.select("ParentId", "Title", "Body").show(10) + +# Answers DO have a ParentId parent post and no Title +answers_df: DataFrame = posts_df.filter(posts_df.ParentId.isNotNull()) +answers_df = answers_df.withColumn("Type", F.lit("Answer")).cache() +print(f"\nTotal answers: {answers_df.count():,}\n") + +answers_df.select("ParentId", "Title", "Body").show(10) + + +# +# Load the PostLinks... +# + +post_links_df = ( + spark.read.format("xml") + .options(rowTag="row") + .options(rootTag="postlinks") + .load(f"{BASE_PATH}/PostLinks.xml") +) +print(f"Total PostLinks: {post_links_df.count():,}") + +# Remove the _ prefix from field names +post_links_df = ( + remove_prefix(post_links_df) + .withColumn( + "LinkType", + F.when(F.col("LinkTypeId") == 1, "Linked") + .when(F.col("LinkTypeId") == 3, "Duplicate") + .otherwise("Unknown"), + ) + .withColumn("Type", F.lit("PostLinks")) +) + + +# +# Load the PostHistory... +# + +post_history_df = ( + spark.read.format("xml") + .options(rowTag="row") + .options(rootTag="posthistory") + .load(f"{BASE_PATH}/PostHistory.xml") +) +print(f"Total PostHistory: {post_history_df.count():,}") + +# Remove the _ prefix from field names +post_history_df = remove_prefix(post_history_df).withColumn("Type", F.lit("PostHistory")) + + +# +# Load the Comments... +# + +comments_df = ( + spark.read.format("xml") + .options(rowTag="row") + .options(rootTag="comments") + .load(f"{BASE_PATH}/Comments.xml") +) +print(f"Total Comments: {comments_df.count():,}") + +# Remove the _ prefix from field names +comments_df = remove_prefix(comments_df).withColumn("Type", F.lit("Comment")) + + +# +# Load the Users... +# + +users_df = ( + spark.read.format("xml") + .options(rowTag="row") + .options(rootTag="users") + .load(f"{BASE_PATH}/Users.xml") +) +print(f"Total Users: {users_df.count():,}") + +# Remove the _ prefix from field names +users_df = remove_prefix(users_df).withColumn("Type", F.lit("User")) + + +# +# Load the Votes... +# + +votes_df = ( + spark.read.format("xml") + .options(rowTag="row") + .options(rootTag="votes") + .load(f"{BASE_PATH}/Votes.xml") +) +print(f"Total Votes: {votes_df.count():,}") + +# Remove the _ prefix from field names +votes_df = remove_prefix(votes_df).withColumn("Type", F.lit("Vote")) + +# Add a VoteType column +votes_df = votes_df.withColumn( + "VoteType", + F.when(F.col("VoteTypeId") == 2, "UpVote") + .when(F.col("VoteTypeId") == 3, "DownVote") + .when(F.col("VoteTypeId") == 4, "Favorite") + .when(F.col("VoteTypeId") == 5, "Close") + .when(F.col("VoteTypeId") == 6, "Reopen") + .when(F.col("VoteTypeId") == 7, "BountyStart") + .when(F.col("VoteTypeId") == 8, "BountyClose") + .when(F.col("VoteTypeId") == 9, "Deletion") + .when(F.col("VoteTypeId") == 10, "Undeletion") + .when(F.col("VoteTypeId") == 11, "Spam") + .when(F.col("VoteTypeId") == 12, "InformModerator") + .otherwise("Unknown"), +) + + +# +# Load the Tags... +# + +tags_df = ( + spark.read.format("xml") + .options(rowTag="row") + .options(rootTag="tags") + .load(f"{BASE_PATH}/Tags.xml") +) +print(f"Total Tags: {tags_df.count():,}") + +# Remove the _ prefix from field names +tags_df = remove_prefix(tags_df).withColumn("Type", F.lit("Tag")) + + +# +# Load the Badges... +# + +badges_df = ( + spark.read.format("xml") + .options(rowTag="row") + .options(rootTag="badges") + .load(f"{BASE_PATH}/Badges.xml") +) +print(f"Total Badges: {badges_df.count():,}\n") + +# Remove the _ prefix from field names +badges_df = remove_prefix(badges_df).withColumn("Type", F.lit("Badge")) + + +# +# Form the nodes from the UNION of posts, users, votes and their combined schemas +# + +all_cols: List[Tuple[str, T.StructField]] = list( + set( + list(zip(answers_df.columns, answers_df.schema)) + + list(zip(questions_df.columns, questions_df.schema)) + + list(zip(post_links_df.columns, post_links_df.schema)) + + list(zip(comments_df.columns, comments_df.schema)) + + list(zip(users_df.columns, users_df.schema)) + + list(zip(votes_df.columns, votes_df.schema)) + + list(zip(tags_df.columns, tags_df.schema)) + + list(zip(badges_df.columns, badges_df.schema)) + ) +) +all_column_names: List[str] = sorted([x[0] for x in all_cols]) + + +def add_missing_columns(df: DataFrame, all_cols: List[Tuple[str, T.StructField]]) -> DataFrame: + """Add any missing columns from any DataFrame among several we want to merge.""" + for col_name, schema_field in all_cols: + if col_name not in df.columns: + df = df.withColumn(col_name, F.lit(None).cast(schema_field.dataType)) + return df + + +# Now apply this function to each of your DataFrames to get a consistent schema +# posts_df = add_missing_columns(posts_df, all_cols).select(all_column_names) +questions_df = add_missing_columns(questions_df, all_cols).select(all_column_names) +answers_df = add_missing_columns(answers_df, all_cols).select(all_column_names) +post_links_df = add_missing_columns(post_links_df, all_cols).select(all_column_names) +users_df = add_missing_columns(users_df, all_cols).select(all_column_names) +votes_df = add_missing_columns(votes_df, all_cols).select(all_column_names) +tags_df = add_missing_columns(tags_df, all_cols).select(all_column_names) +badges_df = add_missing_columns(badges_df, all_cols).select(all_column_names) +assert ( + set(questions_df.columns) + == set(answers_df.columns) + == set(post_links_df.columns) + == set(users_df.columns) + == set(votes_df.columns) + == set(all_column_names) + == set(tags_df.columns) + == set(badges_df.columns) +) + +# Now union them together and remove duplicates +nodes_df: DataFrame = ( + questions_df.unionByName(answers_df) + .unionByName(post_links_df) + .unionByName(users_df) + .unionByName(votes_df) + .unionByName(tags_df) + .unionByName(badges_df) + .distinct() +) +print(f"Total distinct nodes: {nodes_df.count():,}") + +# Now add a unique ID field +nodes_df = nodes_df.withColumn("id", F.expr("uuid()")).select("id", *all_column_names) + +# Now create posts - combined questions and answers for things that can apply to them both +posts_df = questions_df.unionByName(answers_df).cache() + +# +# Store the nodes to disk, reload and cache +# + +NODES_PATH: str = f"{BASE_PATH}/Nodes.parquet" + +# Write to disk and load back again +nodes_df.write.mode("overwrite").parquet(NODES_PATH) +nodes_df = spark.read.parquet(NODES_PATH) + +nodes_df.select("id", "Type").groupBy("Type").count().orderBy(F.col("count").desc()).show() + +# +---------+------+ +# | Type| count| +# +---------+------+ +# | Badge|43,029| +# | Vote|42,593| +# | User|37,709| +# | Answer| 2,978| +# | Question| 2,025| +# |PostLinks| 1,274| +# | Tag| 143| +# +---------+------+ + +# Helps performance of GraphFrames' algorithms +nodes_df = nodes_df.cache() + +# Make sure we have the right columns and cached data +posts_df = nodes_df.filter(nodes_df.Type.isin("Question", "Answer")).cache() +questions_df = nodes_df.filter(nodes_df.Type == "Question").cache() +answers_df = nodes_df.filter(nodes_df.Type == "Answer").cache() +post_links_df = nodes_df.filter(nodes_df.Type == "PostLinks").cache() +users_df = nodes_df.filter(nodes_df.Type == "User").cache() +votes_df = nodes_df.filter(nodes_df.Type == "Vote").cache() +tags_df = nodes_df.filter(nodes_df.Type == "Tag").cache() +badges_df = nodes_df.filter(nodes_df.Type == "Badge").cache() + + +# +# Build the edges DataFrame: +# +# * [Vote]--CastFor-->[Post] +# * [User]--Asks-->[Question] +# * [User]--Posts-->[Answer] +# * [Post]--Answers-->[Question] +# * [Tag]--Tags-->[Post] +# * [User]--Earns-->[Badge] +# * [Post]--Links-->[Post] +# +# Remember: 'src', 'dst' and 'relationship' are standard edge fields in GraphFrames +# Remember: we must produce src/dst based on lowercase 'id' UUID, not 'Id' which is Stack Overflow's integer. +# + +# +# Create a [Vote]--CastFor-->[Post] edge... remember a Post is a Question or Answer +# + +src_vote_df: DataFrame = votes_df.select( + F.col("id").alias("src"), + F.col("Id").alias("VoteId"), + # Everything has all the fields - should build from base records but need UUIDs + F.col("PostId").alias("VotePostId"), +) +cast_for_edge_df: DataFrame = src_vote_df.join( + posts_df, on=src_vote_df.VotePostId == posts_df.Id, how="inner" +).select( + # 'src' comes from the votes' 'id' + "src", + # 'dst' comes from the posts' 'id' + F.col("id").alias("dst"), + # All edges have a 'relationship' field + F.lit("CastFor").alias("relationship"), +) +print(f"Total CastFor edges: {cast_for_edge_df.count():,}") +print(f"Percentage of linked votes: {cast_for_edge_df.count() / votes_df.count():.2%}\n") + +# +# Create a [User]--Asks-->[Question] edge +# + +questions_asked_df: DataFrame = questions_df.select( + F.col("OwnerUserId").alias("QuestionUserId"), + F.col("id").alias("dst"), + F.lit("Asks").alias("relationship"), +) +user_asks_edges_df: DataFrame = questions_asked_df.join( + users_df, on=questions_asked_df.QuestionUserId == users_df.Id, how="inner" +).select( + # 'src' comes from the users' 'id' + F.col("id").alias("src"), + # 'dst' comes from the posts' 'id' + "dst", + # All edges have a 'relationship' field + "relationship", +) +print(f"Total Asks edges: {user_asks_edges_df.count():,}") +print( + f"Percentage of asked questions linked to users: {user_asks_edges_df.count() / questions_df.count():.2%}\n" +) + +# +# Create a [User]--Posts-->[Answer] edge. +# + +user_answers_df: DataFrame = answers_df.select( + F.col("OwnerUserId").alias("AnswerUserId"), + F.col("id").alias("dst"), + F.lit("Posts").alias("relationship"), +) +user_answers_edges_df = user_answers_df.join( + users_df, on=user_answers_df.AnswerUserId == users_df.Id, how="inner" +).select( + # 'src' comes from the users' 'id' + F.col("id").alias("src"), + # 'dst' comes from the posts' 'id' + "dst", + # All edges have a 'relationship' field + "relationship", +) +print(f"Total User Answers edges: {user_answers_edges_df.count():,}") +print( + f"Percentage of answers linked to users: {user_answers_edges_df.count() / answers_df.count():.2%}\n" +) + +# +# Create a [Answer]--Answers-->[Question] edge +# + +src_answers_df: DataFrame = answers_df.select( + F.col("id").alias("src"), + F.col("Id").alias("AnswerId"), + F.col("ParentId").alias("AnswerParentId"), +) +question_answers_edges_df: DataFrame = src_answers_df.join( + posts_df, on=src_answers_df.AnswerParentId == questions_df.Id, how="inner" +).select( + # 'src' comes from the answers' 'id' + "src", + # 'dst' comes from the posts' 'id' + F.col("id").alias("dst"), + # All edges have a 'relationship' field + F.lit("Answers").alias("relationship"), +) +print(f"Total Posts Answers edges: {question_answers_edges_df.count():,}") +print( + f"Percentage of linked answers: {question_answers_edges_df.count() / answers_df.count():.2%}\n" +) + +# +# Create a [Tag]--Tags-->[Post] edge... remember a Post is a Question or Answer +# + +src_tags_df: DataFrame = posts_df.select( + F.col("id").alias("dst"), + # First remove leading/trailing < and >, then split on "><" + F.explode("Tags").alias("Tag"), +) +tags_edge_df: DataFrame = src_tags_df.join( + tags_df, on=src_tags_df.Tag == tags_df.TagName, how="inner" +).select( + # 'src' comes from the posts' 'id' + F.col("id").alias("src"), + # 'dst' comes from the tags' 'id' + "dst", + # All edges have a 'relationship' field + F.lit("Tags").alias("relationship"), +) +print(f"Total Tags edges: {tags_edge_df.count():,}") +print(f"Percentage of linked tags: {tags_edge_df.count() / posts_df.count():.2%}\n") + +# +# Create a [User]--Earns-->[Badge] edge +# + +earns_edges_df: DataFrame = badges_df.select( + F.col("UserId").alias("BadgeUserId"), + F.col("id").alias("dst"), + F.lit("Earns").alias("relationship"), +) +earns_edges_df = earns_edges_df.join( + users_df, on=earns_edges_df.BadgeUserId == users_df.Id, how="inner" +).select( + # 'src' comes from the users' 'id' + F.col("id").alias("src"), + # 'dst' comes from the badges' 'id' + "dst", + # All edges have a 'relationship' field + "relationship", +) +print(f"Total Earns edges: {earns_edges_df.count():,}") +print(f"Percentage of earned badges: {earns_edges_df.count() / badges_df.count():.2%}\n") + +# +# Create a [Post]--Links-->[Post] edge... remember a Post is a Question or Answer +# Also a [Post]--Duplicates-->[Post] edge... remember a Post is a Question or Answer +# + +trim_links_df: DataFrame = post_links_df.select( + F.col("PostId").alias("SrcPostId"), + F.col("RelatedPostId").alias("DstPostId"), + "LinkType", +) +links_src_edge_df: DataFrame = trim_links_df.join( + posts_df.drop("LinkType"), on=trim_links_df.SrcPostId == posts_df.Id, how="inner" +).select( + # 'dst' comes from the posts' 'id' + F.col("id").alias("src"), + "DstPostId", + "LinkType", +) +raw_links_edge_df = links_src_edge_df.join( + posts_df.drop("LinkType"), on=links_src_edge_df.DstPostId == posts_df.Id, how="inner" +).select( + "src", + # 'src' comes from the posts' 'id' + F.col("id").alias("dst"), + # All edges have a 'relationship' field + F.lit("Links").alias("relationship"), + "LinkType", +) + +duplicates_edge_df: DataFrame = ( + raw_links_edge_df.filter(F.col("LinkType") == "Duplicate") + .withColumn("relationship", F.lit("Duplicates")) + .select("src", "dst", "relationship") +) +print(f"Total Duplicates edges: {duplicates_edge_df.count():,}") +print(f"Percentage of duplicate posts: {duplicates_edge_df.count() / post_links_df.count():.2%}\n") + +linked_edge_df = ( + raw_links_edge_df.filter(F.col("LinkType") == "Linked") + .withColumn("relationship", F.lit("Links")) + .select("src", "dst", "relationship") +) +print(f"Total Links edges: {linked_edge_df.count():,}") +print(f"Percentage of linked posts: {linked_edge_df.count() / post_links_df.count():.2%}\n") + + +# +# Combine all the edges together into one relationships DataFrame +# + +relationships_df: DataFrame = ( + cast_for_edge_df.unionByName(user_asks_edges_df) + .unionByName(user_answers_edges_df) + .unionByName(question_answers_edges_df) + .unionByName(tags_edge_df) + .unionByName(earns_edges_df) + .unionByName(duplicates_edge_df) + .unionByName(linked_edge_df) +) +relationships_df.groupBy("relationship").count().orderBy(F.col("count").desc()).withColumn( + "count", F.format_number(F.col("count"), 0) +).show() + +# +------------+------+ +# |relationship| count| +# +------------+------+ +# | Earns|43,029| +# | CastFor|40,701| +# | Tags| 4,427| +# | Answers| 2,978| +# | Posts| 2,767| +# | Asks| 1,934| +# | Links| 1,180| +# | Duplicates| 88| +# +------------+------+ + +EDGES_PATH: str = f"{BASE_PATH}/Edges.parquet" + +# Write to disk and back again +relationships_df.write.mode("overwrite").parquet(EDGES_PATH) + +spark.stop() +print("Spark stopped.") diff --git a/python/graphframes/tutorials/utils.py b/python/graphframes/tutorials/utils.py new file mode 100644 index 000000000..54ef40f8b --- /dev/null +++ b/python/graphframes/tutorials/utils.py @@ -0,0 +1,122 @@ +from pyspark.sql import DataFrame +from graphframes import GraphFrame +from pyspark.sql import functions as F + + +def three_edge_count(paths: DataFrame) -> DataFrame: + """three_edge_count View the counts of the different types of 3-node graphlets in the graph. + + Parameters + ---------- + paths : pyspark.sql.DataFrame + A DataFrame of 3-paths in the graph. + + Returns + ------- + DataFrame + A DataFrame of the counts of the different types of 3-node graphlets in the graph. + """ + graphlet_type_df = paths.select( + F.col("a.Type").alias("A_Type"), + F.col("e1.relationship").alias("E_relationship"), + F.col("b.Type").alias("B_Type"), + F.col("e2.relationship").alias("E2_relationship"), + F.col("c.Type").alias("C_Type"), + F.col("e3.relationship").alias("E3_relationship"), + F.when(F.col("d").isNotNull(), F.col("d.Type")).alias("D_Type"), + ) + graphlet_count_df = ( + graphlet_type_df.groupby( + "A_Type", "E_relationship", "B_Type", "E2_relationship", "C_Type", "E3_relationship" + ) + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) + ) + return graphlet_count_df + + +def four_edge_count(paths: DataFrame) -> DataFrame: + """four_edge_count View the counts of the different types of 4-node graphlets in the graph. + + Parameters + ---------- + paths : DataFrame + A DataFrame of 4-paths in the graph. + + Returns + ------- + DataFrame + A DataFrame of the counts of the different types of 4-node graphlets in the graph. + """ + + graphlet_type_df = paths.select( + F.col("a.Type").alias("A_Type"), + F.col("e1.relationship").alias("E_relationship"), + F.col("b.Type").alias("B_Type"), + F.col("e2.relationship").alias("E2_relationship"), + F.col("c.Type").alias("C_Type"), + F.col("e3.relationship").alias("E3_relationship"), + F.col("d.Type").alias("D_Type"), + F.col("e4.relationship").alias("E4_relationship"), + F.when(F.col("e").isNotNull(), F.col("e.Type")).alias("E_Type"), + ) + graphlet_count_df = ( + graphlet_type_df.groupby( + "A_Type", + "E_relationship", + "B_Type", + "E2_relationship", + "C_Type", + "E3_relationship", + "D_Type", + "E4_relationship", + ) + .count() + .orderBy(F.col("count").desc()) + # Add a comma formatted column for display + .withColumn("count", F.format_number(F.col("count"), 0)) + ) + return graphlet_count_df + + +def add_degree(g: GraphFrame) -> GraphFrame: + """add_degree compute the degree, adding it as a property of the nodes in the GraphFrame. + + Parameters + ---------- + g : GraphFrame + Any valid GraphFrame + + Returns + ------- + GraphFrame + Same GraphFrame with a 'degree' property added + """ + degree_vertices: DataFrame = g.vertices.join(g.degrees, on="id") + return GraphFrame(degree_vertices, g.edges) + + +def add_type_degree(g: GraphFrame) -> DataFrame: + """add_type_degree add a map property to the vertices with the degree by each type of relationship. + + Parameters + ---------- + g : GraphFrame + Any valid GraphFrame + + Returns + ------- + DataFrame - I am broke, next line is wrong + A GraphFrame with a map[type:degree] 'type_degree' field added to the vertices + """ + type_degree: DataFrame = ( + g.edges.select(F.col("src").alias("id"), "relationship") + .filter(F.col("id").isNotNull()) + .groupby("id", "relationship") + .count() + ) + type_degree = type_degree.withColumn("type_degree", F.create_map(type_degree.columns)) + type_degree = type_degree.select("src", "type_degree") + return g.vertices.join(type_degree, on="src") diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt new file mode 100644 index 000000000..b27da4d73 --- /dev/null +++ b/python/requirements-dev.txt @@ -0,0 +1,6 @@ +pytest==8.3.4 +Sphinx==8.1.3 +flake8==7.1.1 +isort==6.0.0 +mypy==1.14.1 +pre-commit==4.0.1 diff --git a/python/requirements.txt b/python/requirements.txt index efb5ec378..fb73319f2 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,3 +1,6 @@ # This file should list any python package dependencies. -nose==1.3.7 +pyspark>=2.0.0 +click==8.1.8 numpy>=1.7 +py7zr==0.22.0 +requests==2.32.3 diff --git a/python/run-tests.sh b/python/run-tests.sh index af4e0a139..dc496e8b0 100755 --- a/python/run-tests.sh +++ b/python/run-tests.sh @@ -38,7 +38,7 @@ echo $pyver LIBS="" for lib in "$SPARK_HOME/python/lib"/*zip ; do - LIBS=$LIBS:$lib + LIBS=$LIBS:$lib done # The current directory of the script. @@ -51,7 +51,7 @@ assembly_path="$DIR/../target/scala-$scala_version_major_minor" echo `ls $assembly_path/graphframes-assembly*.jar` JAR_PATH="" for assembly in $assembly_path/graphframes-assembly*.jar ; do - JAR_PATH=$assembly + JAR_PATH=$assembly done export PYSPARK_SUBMIT_ARGS="--driver-memory 2g --executor-memory 2g --jars $JAR_PATH pyspark-shell " @@ -62,17 +62,7 @@ export PYTHONPATH=$PYTHONPATH:graphframes # Run test suites - -if [[ "$python_major" == "2" ]]; then - - # Horrible hack for spark 1.x: we manually remove some log lines to stay below the 4MB log limit on Travis. - $PYSPARK_DRIVER_PYTHON `which nosetests` -v --all-modules -w $DIR 2>&1 | grep -vE "INFO (ParquetOutputFormat|SparkContext|ContextCleaner|ShuffleBlockFetcherIterator|MapOutputTrackerMaster|TaskSetManager|Executor|MemoryStore|CacheManager|BlockManager|DAGScheduler|PythonRDD|TaskSchedulerImpl|ZippedPartitionsRDD2)"; - -else - - $PYSPARK_DRIVER_PYTHON -m "nose" -v --all-modules -w $DIR 2>&1 | grep -vE "INFO (ParquetOutputFormat|SparkContext|ContextCleaner|ShuffleBlockFetcherIterator|MapOutputTrackerMaster|TaskSetManager|Executor|MemoryStore|CacheManager|BlockManager|DAGScheduler|PythonRDD|TaskSchedulerImpl|ZippedPartitionsRDD2)"; - -fi +$PYSPARK_DRIVER_PYTHON -m "pytest" -v $DIR/graphframes/tests.py 2>&1 | grep -vE "INFO (ParquetOutputFormat|SparkContext|ContextCleaner|ShuffleBlockFetcherIterator|MapOutputTrackerMaster|TaskSetManager|Executor|MemoryStore|CacheManager|BlockManager|DAGScheduler|PythonRDD|TaskSchedulerImpl|ZippedPartitionsRDD2)"; # Exit immediately if the tests fail. # Since we pipe to remove the output, we need to use some horrible BASH features: @@ -80,7 +70,6 @@ fi test ${PIPESTATUS[0]} -eq 0 || exit 1; # Run doc tests - cd "$DIR" $PYSPARK_PYTHON -u ./graphframes/graphframe.py "$@" diff --git a/python/setup.cfg b/python/setup.cfg index f127b08af..02a0d5136 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -1,2 +1,42 @@ -# This file contains the default option values to be used during setup. An -# example can be found at https://github.com/pypa/sampleproject/blob/master/setup.cfg +[metadata] +name = graphframes +version = 0.8.5 +description = GraphFrames: Graph Processing Framework for Apache Spark +long_description = file: ../README.md +long_description_content_type = text/markdown +author = GraphFrames Contributors +author_email = graphframes@googlegroups.com +url = https://pypi.org/project/graphframes-py/ +license = Apache License 2.0 +classifiers = + Development Status :: 4 - Beta + Programming Language :: Python :: 3 + Operating System :: OS Independent + +[options] +packages = find: +package_dir = + = python +include_package_data = True +install_requires = + pyspark>=2.0.0 + click==8.1.8 + numpy>=1.7 + py7zr==0.22.0 + requests==2.32.3 + +[options.packages.find] +where = python + exclude = + tests.py + docs + +[options.extras_require] +dev = + pytest==8.3.4 + Sphinx==8.1.3 + black==25.1.0 + flake8==7.1.1 + isort==6.0.0 + mypy==1.14.1 + pre-commit==3.5.1 diff --git a/python/setup.py b/python/setup.py index 9dad5462e..a91fb629a 100644 --- a/python/setup.py +++ b/python/setup.py @@ -1,2 +1,35 @@ -# Your python setup file. An example can be found at: -# https://github.com/pypa/sampleproject/blob/master/setup.py +from setuptools import setup, find_packages # type: ignore +import os + + +def parse_requirements(filename): + """Load requirements from a pip requirements file.""" + with open(filename, encoding="utf-8") as f: + # Filter out comments and empty lines. + return [line.strip() for line in f if line.strip() and not line.startswith("#")] + + +# Read the long description from the README file. +here = os.path.abspath(os.path.dirname(__file__)) + +# Use requirements.txt to get the list of dependencies. +requirements = parse_requirements(os.path.join(here, "requirements.txt")) + +setup( + name="graphframes", + version=open("version.sbt").read().strip(), # Update this version as needed + description="GraphFrames: Graph Processing Framework for Apache Spark", + long_description=open(os.path.join(f"{here}/..", "README.md"), encoding="utf-8").read(), + long_description_content_type="text/markdown", + author="GraphFrames Contributors", + author_email="graphframes@googlegroups.com", + url="https://pypi.org/project/graphframes-py", + packages=find_packages(where="python"), + package_dir={"": "python"}, + include_package_data=True, # Include non-code files specified in MANIFEST.in + install_requires=requirements, + classifiers=[ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", + ], +) diff --git a/version.sbt b/version.sbt index f72bdcc0e..6fbb590a4 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "0.8.4" +ThisBuild / version := "0.8.5"