Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ src_managed/
project/boot/
project/plugins/project/
.bsp
.metals

# intellij
.idea/
Expand Down
15 changes: 15 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# The context of this file is copied from the Apache Spark Project

align = none
align.openParenDefnSite = false
align.openParenCallSite = false
align.tokens = []
importSelectors = "singleLine"
optIn = {
configStyleArguments = false
}
danglingParentheses.preset = false
docstrings.style = Asterisk
maxColumn = 98
runner.dialect = scala213
version = 3.8.5
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import ReleaseTransformations._
import ReleaseTransformations.*

lazy val sparkVer = sys.props.getOrElse("spark.version", "3.5.4")
lazy val sparkBranch = sparkVer.substring(0, 3)
Expand Down
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ ThisBuild / libraryDependencySchemes ++= Seq(
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.10")
addSbtPlugin("com.github.sbt" % "sbt-release" % "1.4.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.1")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.4")
551 changes: 310 additions & 241 deletions src/main/scala/org/graphframes/GraphFrame.scala

Large diffs are not rendered by default.

132 changes: 69 additions & 63 deletions src/main/scala/org/graphframes/examples/BeliefPropagation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,43 @@ import org.graphframes.GraphFrame
import org.graphframes.examples.Graphs.gridIsingModel
import org.graphframes.lib.AggregateMessages


/**
* Example code for Belief Propagation (BP)
*
* This provides a template for building customized BP algorithms for different types of
* graphical models.
* This provides a template for building customized BP algorithms for different types of graphical
* models.
*
* This example:
* - Ising model on a grid
* - Parallel Belief Propagation using colored fields
* - Ising model on a grid
* - Parallel Belief Propagation using colored fields
*
* Ising models are probabilistic graphical models over binary variables x,,i,,.
* Each binary variable x,,i,, corresponds to one vertex, and it may take values -1 or +1.
* The probability distribution P(X) (over all x,,i,,) is parameterized by vertex factors a,,i,,
* and edge factors b,,ij,,:
* Ising models are probabilistic graphical models over binary variables x,,i,,. Each binary
* variable x,,i,, corresponds to one vertex, and it may take values -1 or +1. The probability
* distribution P(X) (over all x,,i,,) is parameterized by vertex factors a,,i,, and edge factors
* b,,ij,,:
* {{{
* P(X) = (1/Z) * exp[ \sum_i a_i x_i + \sum_{ij} b_{ij} x_i x_j ]
* }}}
* where Z is the normalization constant (partition function).
* See [[https://en.wikipedia.org/wiki/Ising_model Wikipedia]] for more information on Ising models.
* where Z is the normalization constant (partition function). See
* [[https://en.wikipedia.org/wiki/Ising_model Wikipedia]] for more information on Ising models.
*
* Belief Propagation (BP) provides marginal probabilities of the values of the variables x,,i,,,
* i.e., P(x,,i,,) for each i. This allows a user to understand likely values of variables.
* See [[https://en.wikipedia.org/wiki/Belief_propagation Wikipedia]] for more information on BP.
* i.e., P(x,,i,,) for each i. This allows a user to understand likely values of variables. See
* [[https://en.wikipedia.org/wiki/Belief_propagation Wikipedia]] for more information on BP.
*
* We use a batch synchronous BP algorithm, where batches of vertices are updated synchronously.
* We follow the mean field update algorithm in Slide 13 of the
* [[http://www.eecs.berkeley.edu/~wainwrig/Talks/A_GraphModel_Tutorial talk slides]] from:
* Wainwright. "Graphical models, message-passing algorithms, and convex optimization."
* [[http://www.eecs.berkeley.edu/~wainwrig/Talks/A_GraphModel_Tutorial talk slides]] from:
* Wainwright. "Graphical models, message-passing algorithms, and convex optimization."
*
* The batches are chosen according to a coloring. For background on graph colorings for inference,
* see for example:
* Gonzalez et al. "Parallel Gibbs Sampling: From Colored Fields to Thin Junction Trees."
* AISTATS, 2011.
* The batches are chosen according to a coloring. For background on graph colorings for
* inference, see for example: Gonzalez et al. "Parallel Gibbs Sampling: From Colored Fields to
* Thin Junction Trees." AISTATS, 2011.
*
* The BP algorithm works by:
* - Coloring the graph by assigning a color to each vertex such that no neighboring vertices
* share the same color.
* - In each step of BP, update all vertices of a single color. Alternate colors.
* - Coloring the graph by assigning a color to each vertex such that no neighboring vertices
* share the same color.
* - In each step of BP, update all vertices of a single color. Alternate colors.
*/
object BeliefPropagation {

Expand Down Expand Up @@ -94,14 +92,16 @@ object BeliefPropagation {
}

/**
* Given a GraphFrame, choose colors for each vertex. No neighboring vertices will share the
* same color. The number of colors is minimized.
* Given a GraphFrame, choose colors for each vertex. No neighboring vertices will share the
* same color. The number of colors is minimized.
*
* This is written specifically for grid graphs. For non-grid graphs, it should be generalized,
* such as by using a greedy coloring scheme.
*
* @param g Grid graph generated by [[org.graphframes.examples.Graphs.gridIsingModel()]]
* @return Same graph, but with a new vertex column "color" of type Int (0 or 1)
* @param g
* Grid graph generated by [[org.graphframes.examples.Graphs.gridIsingModel()]]
* @return
* Same graph, but with a new vertex column "color" of type Int (0 or 1)
*/
private def colorGraph(g: GraphFrame): GraphFrame = {
val colorUDF = udf { (i: Int, j: Int) => (i + j) % 2 }
Expand All @@ -112,19 +112,22 @@ object BeliefPropagation {
/**
* Run Belief Propagation.
*
* This implementation of BP shows how to use GraphX's aggregateMessages method.
* It is simple to convert to and from GraphX format. This method does the following:
* - Color GraphFrame vertices for BP scheduling.
* - Convert GraphFrame to GraphX format.
* - Run BP using GraphX's aggregateMessages API.
* - Augment the original GraphFrame with the BP results (vertex beliefs).
* This implementation of BP shows how to use GraphX's aggregateMessages method. It is simple to
* convert to and from GraphX format. This method does the following:
* - Color GraphFrame vertices for BP scheduling.
* - Convert GraphFrame to GraphX format.
* - Run BP using GraphX's aggregateMessages API.
* - Augment the original GraphFrame with the BP results (vertex beliefs).
*
* @param g Graphical model created by `org.graphframes.examples.Graphs.gridIsingModel()`
* @param numIter Number of iterations of BP to run. One iteration includes updating each
* vertex's belief once.
* @return Same graphical model, but with [[GraphFrame.vertices]] augmented with a new column
* "belief" containing P(x,,i,, = +1), the marginal probability of vertex i taking
* value +1 instead of -1.
* @param g
* Graphical model created by `org.graphframes.examples.Graphs.gridIsingModel()`
* @param numIter
* Number of iterations of BP to run. One iteration includes updating each vertex's belief
* once.
* @return
* Same graphical model, but with [[GraphFrame.vertices]] augmented with a new column "belief"
* containing P(x,,i,, = +1), the marginal probability of vertex i taking value +1 instead of
* -1.
*/
def runBPwithGraphX(g: GraphFrame, numIter: Int): GraphFrame = {
// Choose colors for vertices for BP scheduling.
Expand Down Expand Up @@ -166,15 +169,14 @@ object BeliefPropagation {
},
_ + _)
// Receive messages, and update beliefs for vertices of the current color.
gx = gx.outerJoinVertices(msgs) {
case (vID, vAttr, optMsg) =>
if (vAttr.color == color) {
val x = vAttr.a + optMsg.getOrElse(0.0)
val newBelief = math.exp(-log1pExp(-x))
VertexAttr(vAttr.a, newBelief, color)
} else {
vAttr
}
gx = gx.outerJoinVertices(msgs) { case (vID, vAttr, optMsg) =>
if (vAttr.color == color) {
val x = vAttr.a + optMsg.getOrElse(0.0)
val newBelief = math.exp(-log1pExp(-x))
VertexAttr(vAttr.a, newBelief, color)
} else {
vAttr
}
}
}
}
Expand All @@ -192,16 +194,19 @@ object BeliefPropagation {
* Run Belief Propagation.
*
* This implementation of BP shows how to use GraphFrame's aggregateMessages method.
* - Color GraphFrame vertices for BP scheduling.
* - Run BP using GraphFrame's aggregateMessages API.
* - Augment the original GraphFrame with the BP results (vertex beliefs).
* - Color GraphFrame vertices for BP scheduling.
* - Run BP using GraphFrame's aggregateMessages API.
* - Augment the original GraphFrame with the BP results (vertex beliefs).
*
* @param g Graphical model created by `org.graphframes.examples.Graphs.gridIsingModel()`
* @param numIter Number of iterations of BP to run. One iteration includes updating each
* vertex's belief once.
* @return Same graphical model, but with [[GraphFrame.vertices]] augmented with a new column
* "belief" containing P(x,,i,, = +1), the marginal probability of vertex i taking
* value +1 instead of -1.
* @param g
* Graphical model created by `org.graphframes.examples.Graphs.gridIsingModel()`
* @param numIter
* Number of iterations of BP to run. One iteration includes updating each vertex's belief
* once.
* @return
* Same graphical model, but with [[GraphFrame.vertices]] augmented with a new column "belief"
* containing P(x,,i,, = +1), the marginal probability of vertex i taking value +1 instead of
* -1.
*/
def runBPwithGraphFrames(g: GraphFrame, numIter: Int): GraphFrame = {
// Choose colors for vertices for BP scheduling.
Expand Down Expand Up @@ -231,15 +236,16 @@ object BeliefPropagation {
.agg(sum(AM.msg).as("aggMess"))
val v = gx.vertices
// Receive messages, and update beliefs for vertices of the current color.
val newBeliefCol = when(v("color") === color && aggregates("aggMess").isNotNull,
val newBeliefCol = when(
v("color") === color && aggregates("aggMess").isNotNull,
logistic(aggregates("aggMess") + v("a")))
.otherwise(v("belief")) // keep old beliefs for other colors
.otherwise(v("belief")) // keep old beliefs for other colors
val newVertices = v
.join(aggregates, v("id") === aggregates("id"), "left_outer") // join messages, vertices
.drop(aggregates("id")) // drop duplicate ID column (from outer join)
.withColumn("newBelief", newBeliefCol) // compute new beliefs
.drop("aggMess") // drop messages
.drop("belief") // drop old beliefs
.join(aggregates, v("id") === aggregates("id"), "left_outer") // join messages, vertices
.drop(aggregates("id")) // drop duplicate ID column (from outer join)
.withColumn("newBelief", newBeliefCol) // compute new beliefs
.drop("aggMess") // drop messages
.drop("belief") // drop old beliefs
.withColumnRenamed("newBelief", "belief")
// Cache new vertices using workaround for SPARK-13346
val cachedNewVertices = AM.getCachedDataFrame(newVertices)
Expand Down
Loading