diff --git a/.travis.yml b/.travis.yml index bb6780117..6ce9ebcc1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,4 @@ -sudo: required - -dist: trusty +group: travis_lts cache: directories: @@ -8,29 +6,40 @@ cache: - $HOME/.sbt/launchers/ - $HOME/.cache/spark-versions +env: + global: + - SCALA_VERSION=2.11.8 + matrix: include: - jdk: openjdk7 # openJDK crashes sometimes - env: PYSPARK_PYTHON=python2 SCALA_VERSION=2.11.8 SPARK_VERSION=2.0.2 SPARK_BUILD="spark-2.0.2-bin-hadoop2.7" SPARK_BUILD_URL="http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz" + env: PYSPARK_PYTHON=python2 SPARK_VERSION=2.0.2 SPARK_BUILD="spark-${SPARK_VERSION}-bin-hadoop2.7" - jdk: oraclejdk8 - env: PYSPARK_PYTHON=python2 SCALA_VERSION=2.11.8 SPARK_VERSION=2.1.1 SPARK_BUILD="spark-2.1.1-bin-hadoop2.7" SPARK_BUILD_URL="http://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz" + env: PYSPARK_PYTHON=python2 SPARK_VERSION=2.1.2 SPARK_BUILD="spark-${SPARK_VERSION}-bin-hadoop2.7" - jdk: oraclejdk8 - env: PYSPARK_PYTHON=python2 SCALA_VERSION=2.11.8 SPARK_VERSION=2.2.0 SPARK_BUILD="spark-2.2.0-bin-hadoop2.7" SPARK_BUILD_URL="http://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz" + env: PYSPARK_PYTHON=python2 SPARK_VERSION=2.2.1 SPARK_BUILD="spark-${SPARK_VERSION}-bin-hadoop2.7" + - jdk: oraclejdk8 + env: PYSPARK_PYTHON=python2 SPARK_VERSION=2.3.1 SPARK_BUILD="spark-${SPARK_VERSION}-bin-hadoop2.7" - jdk: openjdk7 - env: PYSPARK_PYTHON=python3 SCALA_VERSION=2.11.8 SPARK_VERSION=2.0.2 SPARK_BUILD="spark-2.0.2-bin-hadoop2.7" SPARK_BUILD_URL="http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz" + env: PYSPARK_PYTHON=python3 SPARK_VERSION=2.0.2 SPARK_BUILD="spark-${SPARK_VERSION}-bin-hadoop2.7" + - jdk: oraclejdk8 + env: PYSPARK_PYTHON=python3 SPARK_VERSION=2.1.2 SPARK_BUILD="spark-${SPARK_VERSION}-bin-hadoop2.7" - jdk: oraclejdk8 - env: PYSPARK_PYTHON=python3 SCALA_VERSION=2.11.8 SPARK_VERSION=2.1.1 SPARK_BUILD="spark-2.1.1-bin-hadoop2.7" SPARK_BUILD_URL="http://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz" + env: PYSPARK_PYTHON=python3 SPARK_VERSION=2.2.1 SPARK_BUILD="spark-${SPARK_VERSION}-bin-hadoop2.7" - jdk: oraclejdk8 - env: PYSPARK_PYTHON=python3 SCALA_VERSION=2.11.8 SPARK_VERSION=2.2.0 SPARK_BUILD="spark-2.2.0-bin-hadoop2.7" SPARK_BUILD_URL="http://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz" + env: PYSPARK_PYTHON=python3 SPARK_VERSION=2.3.1 SPARK_BUILD="spark-${SPARK_VERSION}-bin-hadoop2.7" before_install: - ./bin/download_travis_dependencies.sh install: - - pyenv versions - - pyenv global 2.7 3.5 - - python2 -m pip install --user -r ./python/requirements.txt - - python3 -m pip install --user -r ./python/requirements.txt + - if [[ "${PYSPARK_PYTHON}" == "python2" ]]; then + pyenv install -f 2.7.13 && pyenv global 2.7.13; + python2 -m pip install --user -r ./python/requirements.txt; + else + pyenv install -f 3.5.4 && pyenv global 3.5.4; + python3 -m pip install --user -r ./python/requirements.txt; + fi script: - ./build/sbt -Dspark.version=$SPARK_VERSION -Dscala.version=$SCALA_VERSION "set test in assembly := {}" assembly diff --git a/Makefile b/Makefile index 565d9d3c4..cf691caa6 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,8 @@ -all: 2.2.0s2.10 2.0.2 2.1.1 2.2.0 +all: 2.3.0 2.3.1 clean: rm -rf target/graphframes_*.zip -2.0.2 2.1.1 2.2.0: +2.3.0 2.3.1: build/sbt -Dspark.version=$@ spDist -2.2.0s2.10: - build/sbt -Dspark.version=2.2.0 -Dscala.version=2.10.6 spDist assembly test diff --git a/build.sbt b/build.sbt index 8bb8d106c..3a6387ee3 100644 --- a/build.sbt +++ b/build.sbt @@ -1,12 +1,14 @@ // Your sbt build file. Guides on how to write one can be found at // http://www.scala-sbt.org/0.13/docs/index.html -val sparkVer = sys.props.getOrElse("spark.version", "2.2.0") +val sparkVer = sys.props.getOrElse("spark.version", "2.4.0") val sparkBranch = sparkVer.substring(0, 3) val defaultScalaVer = sparkBranch match { case "2.0" => "2.11.8" case "2.1" => "2.11.8" case "2.2" => "2.11.8" + case "2.3" => "2.11.8" + case "2.4" => "2.11.12" case _ => throw new IllegalArgumentException(s"Unsupported Spark version: $sparkVer.") } val scalaVer = sys.props.getOrElse("scala.version", defaultScalaVer) @@ -21,10 +23,12 @@ scalaVersion := scalaVer name := "graphframes" +organization := "graphframes" + spName := "graphframes/graphframes" // Don't forget to set the version -version := s"0.6.0-SNAPSHOT-spark$sparkBranch" +version := s"0.6.0-adbe42-spark$sparkBranch-s_${scalaVer.split("\\.").take(2).mkString(".")}" // All Spark Packages need a license licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")) @@ -40,14 +44,9 @@ sparkComponents ++= Seq("graphx", "sql", "mllib") // add any Spark Package dependencies using spDependencies. // e.g. spDependencies += "databricks/spark-avro:0.1" -libraryDependencies += "org.scalatest" %% "scalatest" % defaultScalaTestVer % "test" +libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.16" -// These versions are ancient, but they cross-compile around scala 2.10 and 2.11. -// Update them when dropping support for scala 2.10 -libraryDependencies ++= Seq( - "com.typesafe.scala-logging" %% "scala-logging-api" % "2.1.2", - "com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2" -) +libraryDependencies += "org.scalatest" %% "scalatest" % defaultScalaTestVer % "test" parallelExecution := false @@ -78,3 +77,20 @@ concurrentRestrictions in Global := Seq( autoAPIMappings := true coverageHighlighting := false + +publishMavenStyle := true + +crossPaths := false + +credentials += Credentials( + "Sonatype Nexus Repository Manager", "bucasit-build", + "p", "p" +) + +publishTo := { + val nexus = "http://bucasit-build/nexus/content/repositories" + if (version.value.trim.endsWith("SNAPSHOT")) + Some("Sonatype Nexus Repository Manager" at s"${nexus}/snapshots") + else + Some("Sonatype Nexus Repository Manager" at s"${nexus}/releases") +} diff --git a/src/main/scala/org/graphframes/GraphFrame.scala b/src/main/scala/org/graphframes/GraphFrame.scala index d6c6c73d7..d86d812d3 100644 --- a/src/main/scala/org/graphframes/GraphFrame.scala +++ b/src/main/scala/org/graphframes/GraphFrame.scala @@ -41,7 +41,8 @@ import org.graphframes.pattern._ */ class GraphFrame private( @transient private val _vertices: DataFrame, - @transient private val _edges: DataFrame) extends Logging with Serializable { + @transient private val _edges: DataFrame, + @transient private val _storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK) extends Logging with Serializable { import GraphFrame._ @@ -480,7 +481,7 @@ class GraphFrame private( .distinct() .sortWithinPartitions(ID) .withColumn(LONG_ID, monotonically_increasing_id()) - .persist(StorageLevel.MEMORY_AND_DISK) + .persist(_storageLevel) vertices.select(col(ID), nestAsCol(vertices, ATTR)) .join(withLongIds, ID) .select(LONG_ID, ID, ATTR) @@ -504,8 +505,8 @@ class GraphFrame private( col(ATTR)) } else { val threshold = broadcastThreshold - val hubs: Set[Any] = degrees.filter(col("degree") >= threshold).select(ID) - .collect().map(_.get(0)).toSet + val hubs: Set[Any] = Set() // don't detect hubs detection, expensive and not in our dataset + logInfo(s"Found HUBS: $hubs") val indexedSourceEdges = GraphFrame.skewedJoin( packedEdges, indexedVertices.select(col(ID).as(SRC), col(LONG_ID).as(LONG_SRC)), @@ -557,7 +558,7 @@ object GraphFrame extends Serializable with Logging { // No skew. Do regular join. a.join(b, joinCol) } else { - logger.debug(s"$logPrefix Skewed join with ${hubs.size} high-degree keys.") + logDebug(s"$logPrefix Skewed join with ${hubs.size} high-degree keys.") val isHub = udf { id: T => hubs.contains(id) } @@ -605,7 +606,7 @@ object GraphFrame extends Serializable with Logging { * destination vertex IDs. All other columns are treated as edge attributes. * @return New [[GraphFrame]] instance */ - def apply(vertices: DataFrame, edges: DataFrame): GraphFrame = { + def apply(vertices: DataFrame, edges: DataFrame, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): GraphFrame = { require(vertices.columns.contains(ID), s"Vertex ID column '$ID' missing from vertex DataFrame, which has columns: " + vertices.columns.mkString(",")) @@ -616,7 +617,7 @@ object GraphFrame extends Serializable with Logging { s"Destination vertex ID column '$DST' missing from edge DataFrame, which has columns: " + edges.columns.mkString(",")) - new GraphFrame(vertices, edges) + new GraphFrame(vertices, edges, storageLevel) } /** @@ -631,12 +632,12 @@ object GraphFrame extends Serializable with Logging { * * @group conversions */ - def fromEdges(e: DataFrame): GraphFrame = { + def fromEdges(e: DataFrame, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): GraphFrame = { val srcs = e.select(e("src").as("id")) val dsts = e.select(e("dst").as("id")) val v = srcs.unionAll(dsts).distinct - v.persist(StorageLevel.MEMORY_AND_DISK) - apply(v, e) + v.persist(storageLevel) + apply(v, e, storageLevel) } /** diff --git a/src/main/scala/org/graphframes/Logging.scala b/src/main/scala/org/graphframes/Logging.scala index d2cbb9c8a..93ad0a96a 100644 --- a/src/main/scala/org/graphframes/Logging.scala +++ b/src/main/scala/org/graphframes/Logging.scala @@ -17,11 +17,26 @@ package org.graphframes -import com.typesafe.scalalogging.slf4j.LazyLogging +import org.slf4j.{Logger, LoggerFactory} // This needs to be accessible to org.apache.spark.graphx.lib.backport -private[org] trait Logging extends LazyLogging { - protected def logDebug(s: String) = logger.debug(s) - protected def logInfo(s: String) = logger.info(s) - protected def logTrace(s: String) = logger.trace(s) +private[org] trait Logging { + + @transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName) + + protected def logDebug(s: => String): Unit = { + if (logger.isDebugEnabled) logger.debug(s) + } + + protected def logWarn(s: => String): Unit = { + if (logger.isWarnEnabled) logger.warn(s) + } + + protected def logInfo(s: => String): Unit = { + if (logger.isInfoEnabled) logger.info(s) + } + + protected def logTrace(s: => String): Unit = { + if (logger.isTraceEnabled) logger.trace(s) + } } diff --git a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala index d5d250457..a58c0fa30 100644 --- a/src/main/scala/org/graphframes/lib/ConnectedComponents.scala +++ b/src/main/scala/org/graphframes/lib/ConnectedComponents.scala @@ -113,7 +113,7 @@ class ConnectedComponents private[graphframes] ( */ def setCheckpointInterval(value: Int): this.type = { if (value <= 0 || value > 2) { - logger.warn( + logWarn( s"Set checkpointInterval to $value. This would blow up the query plan and hang the " + "driver for large graphs.") } @@ -248,11 +248,12 @@ object ConnectedComponents extends Logging { broadcastThreshold: Int, logPrefix: String): DataFrame = { import edges.sqlContext.implicits._ - val hubs = minNbrs.filter(col(CNT) > broadcastThreshold) + val hubs = minNbrs.filter(col(CNT) > broadcastThreshold) // do we need this? .select(SRC) .as[Long] .collect() .toSet + logInfo(s"Found # HUBS: ${hubs.size}") GraphFrame.skewedJoin(edges, minNbrs, SRC, hubs, logPrefix) } @@ -283,7 +284,7 @@ object ConnectedComponents extends Logging { val runId = UUID.randomUUID().toString.takeRight(8) val logPrefix = s"[CC $runId]" - logger.info(s"$logPrefix Start connected components with run ID $runId.") + logInfo(s"$logPrefix Start connected components with run ID $runId.") val sqlContext = graph.sqlContext val sc = sqlContext.sparkContext @@ -296,20 +297,20 @@ object ConnectedComponents extends Logging { throw new IOException( "Checkpoint directory is not set. Please set it first using sc.setCheckpointDir().") } - logger.info(s"$logPrefix Using $dir for checkpointing with interval $checkpointInterval.") + logInfo(s"$logPrefix Using $dir for checkpointing with interval $checkpointInterval.") Some(dir) } else { - logger.info( + logInfo( s"$logPrefix Checkpointing is disabled because checkpointInterval=$checkpointInterval.") None } - logger.info(s"$logPrefix Preparing the graph for connected component computation ...") + logInfo(s"$logPrefix Preparing the graph for connected component computation ...") val g = prepare(graph) val vv = g.vertices var ee = g.edges.persist(intermediateStorageLevel) // src < dst val numEdges = ee.count() - logger.info(s"$logPrefix Found $numEdges edges after preparation.") + logInfo(s"$logPrefix Found $numEdges edges after preparation.") var converged = false var iteration = 1 @@ -339,18 +340,7 @@ object ConnectedComponents extends Logging { // checkpointing if (shouldCheckpoint && (iteration % checkpointInterval == 0)) { - // TODO: remove this after DataFrame.checkpoint is implemented - val out = s"${checkpointDir.get}/$iteration" - ee.write.parquet(out) - // may hit S3 eventually consistent issue - ee = sqlContext.read.parquet(out) - - // remove previous checkpoint - if (iteration > checkpointInterval) { - val path = new Path(s"${checkpointDir.get}/${iteration - checkpointInterval}") - path.getFileSystem(sc.hadoopConfiguration).delete(path, true) - } - + ee = ee.checkpoint() System.gc() // hint Spark to clean shuffle directories } @@ -385,9 +375,9 @@ object ConnectedComponents extends Logging { iteration += 1 } - logger.info(s"$logPrefix Connected components converged in ${iteration - 1} iterations.") + logInfo(s"$logPrefix Connected components converged in ${iteration - 1} iterations.") - logger.info(s"$logPrefix Join and return component assignments with original vertex IDs.") + logInfo(s"$logPrefix Join and return component assignments with original vertex IDs.") vv.join(ee, vv(ID) === ee(DST), "left_outer") .select(vv(ATTR), when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT)) .select(col(s"$ATTR.*"), col(COMPONENT))