diff --git a/src/main/scala/org/graphframes/GraphFrame.scala b/src/main/scala/org/graphframes/GraphFrame.scala index 523026092..c9a221a6a 100644 --- a/src/main/scala/org/graphframes/GraphFrame.scala +++ b/src/main/scala/org/graphframes/GraphFrame.scala @@ -52,8 +52,10 @@ class GraphFrame private( override def toString: String = { // We call select on the vertices and edges to ensure that ID, SRC, DST always come first // in the printed schema. - val v = vertices.select(ID, vertices.columns.filter(_ != ID) :_ *).toString - val e = edges.select(SRC, DST +: edges.columns.filter(c => c != SRC && c != DST) :_ *).toString + val vCols = (ID +: vertices.columns.filter(_ != ID).toIndexedSeq).map(col) + val eCols = (SRC +: DST +: edges.columns.filter(c => c != SRC && c != DST).toIndexedSeq).map(col) + val v = vertices.select(vCols.toSeq: _*).toString + val e = edges.select(eCols.toSeq: _*).toString "GraphFrame(v:" + v + ", e:" + e + ")" } @@ -708,7 +710,7 @@ object GraphFrame extends Serializable with Logging { def fromEdges(e: DataFrame): GraphFrame = { val srcs = e.select(e("src").as("id")) val dsts = e.select(e("dst").as("id")) - val v = srcs.unionAll(dsts).distinct + val v = srcs.unionAll(dsts).distinct() v.persist(StorageLevel.MEMORY_AND_DISK) apply(v, e) } @@ -802,7 +804,7 @@ object GraphFrame extends Serializable with Logging { private[graphframes] def colStar(df: DataFrame, col: String): Seq[String] = { df.schema(col).dataType match { case s: StructType => - s.fieldNames.map(f => col + "." + f) + s.fieldNames.map(f => col + "." + f).toIndexedSeq case other => throw new RuntimeException(s"Unknown error in GraphFrame. Expected column $col to be" + s" StructType, but found type: $other") @@ -811,7 +813,7 @@ object GraphFrame extends Serializable with Logging { /** Nest all columns within a single StructType column with the given name */ private[graphframes] def nestAsCol(df: DataFrame, name: String): Column = { - struct(df.columns.map(c => df(c)) :_*).as(name) + struct(df.columns.map(c => df(c)).toSeq: _*).as(name) } // ========== Motif finding ========== diff --git a/src/main/scala/org/graphframes/examples/BeliefPropagation.scala b/src/main/scala/org/graphframes/examples/BeliefPropagation.scala index 4c75d8369..9074bd13c 100644 --- a/src/main/scala/org/graphframes/examples/BeliefPropagation.scala +++ b/src/main/scala/org/graphframes/examples/BeliefPropagation.scala @@ -129,7 +129,7 @@ object BeliefPropagation { def runBPwithGraphX(g: GraphFrame, numIter: Int): GraphFrame = { // Choose colors for vertices for BP scheduling. val colorG = colorGraph(g) - val numColors: Int = colorG.vertices.select("color").distinct.count().toInt + val numColors: Int = colorG.vertices.select("color").distinct().count().toInt // Convert GraphFrame to GraphX, and initialize beliefs. val gx0 = colorG.toGraphX @@ -206,7 +206,7 @@ object BeliefPropagation { def runBPwithGraphFrames(g: GraphFrame, numIter: Int): GraphFrame = { // Choose colors for vertices for BP scheduling. val colorG = colorGraph(g) - val numColors: Int = colorG.vertices.select("color").distinct.count().toInt + val numColors: Int = colorG.vertices.select("color").distinct().count().toInt // TODO: Handle vertices without any edges. diff --git a/src/main/scala/org/graphframes/examples/Graphs.scala b/src/main/scala/org/graphframes/examples/Graphs.scala index c929c61ec..2ea19e525 100644 --- a/src/main/scala/org/graphframes/examples/Graphs.scala +++ b/src/main/scala/org/graphframes/examples/Graphs.scala @@ -122,12 +122,12 @@ class Graphs private[graphframes] () { */ def ALSSyntheticData(): GraphFrame = { val sc = spark.sparkContext - val data = sc.parallelize(als_data).map { line => + val data = sc.parallelize(als_data.toIndexedSeq).map { line => val fields = line.split(",") (fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble) } val edges = spark.createDataFrame(data).toDF("src", "dst", "weight") - val vs = data.flatMap(r => r._1 :: r._2 :: Nil).collect().distinct.map(x => Tuple1(x)) + val vs = data.flatMap(r => r._1 :: r._2 :: Nil).collect().distinct.map(x => Tuple1(x)).toIndexedSeq val vertices = spark.createDataFrame(vs).toDF("id") GraphFrame(vertices, edges) } diff --git a/src/main/scala/org/graphframes/lib/BFS.scala b/src/main/scala/org/graphframes/lib/BFS.scala index df859c14d..80ebe313e 100644 --- a/src/main/scala/org/graphframes/lib/BFS.scala +++ b/src/main/scala/org/graphframes/lib/BFS.scala @@ -218,7 +218,7 @@ private object BFS extends Logging with Serializable { } } val ordered = paths.columns.sortBy(rank _) - paths.select(ordered.map(col): _*) + paths.select(ordered.map(col).toSeq: _*) } else { logInfo(s"GraphFrame.bfs failed to find a path of length <= $maxPathLength.") // Return empty DataFrame diff --git a/src/main/scala/org/graphframes/lib/GraphXConversions.scala b/src/main/scala/org/graphframes/lib/GraphXConversions.scala index 88e690876..13f9beb65 100644 --- a/src/main/scala/org/graphframes/lib/GraphXConversions.scala +++ b/src/main/scala/org/graphframes/lib/GraphXConversions.scala @@ -104,10 +104,10 @@ private[graphframes] object GraphXConversions { val otherFields = df.schema.fieldNames.filter(_ != structName).map(col) if (renamedSubfields.isEmpty) { // Do not attempt to add an empty structure. - df.select(otherFields : _*) + df.select(otherFields.toSeq: _*) } else { - val renamedStruct = struct(renamedSubfields : _*).as(structName) - df.select(renamedStruct +: otherFields : _*) + val renamedStruct = struct(renamedSubfields.toSeq: _*).as(structName) + df.select((renamedStruct +: otherFields).toSeq: _*) } } diff --git a/src/main/scala/org/graphframes/lib/ShortestPaths.scala b/src/main/scala/org/graphframes/lib/ShortestPaths.scala index f110d6f1f..6b1f15c74 100644 --- a/src/main/scala/org/graphframes/lib/ShortestPaths.scala +++ b/src/main/scala/org/graphframes/lib/ShortestPaths.scala @@ -19,7 +19,7 @@ package org.graphframes.lib import java.util -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import org.apache.spark.graphx.{lib => graphxlib} import org.apache.spark.sql.{Column, DataFrame, Row} @@ -91,7 +91,7 @@ private object ShortestPaths { mapToLandmark(col(DISTANCE_ID)) } val cols = graph.vertices.columns.map(col) :+ distanceCol.as(DISTANCE_ID) - g.vertices.select(cols: _*) + g.vertices.select(cols.toSeq: _*) } private val DISTANCE_ID = "distances" diff --git a/src/main/scala/org/graphframes/lib/TriangleCount.scala b/src/main/scala/org/graphframes/lib/TriangleCount.scala index 816cb0da7..fb8c51462 100644 --- a/src/main/scala/org/graphframes/lib/TriangleCount.scala +++ b/src/main/scala/org/graphframes/lib/TriangleCount.scala @@ -68,7 +68,7 @@ private object TriangleCount { val v = graph.vertices val countsCol = when(col("count").isNull, 0L).otherwise(col("count")) val newV = v.join(triangleCounts, v(ID) === triangleCounts(ID), "left_outer") - .select(countsCol.as(COUNT_ID) +: v.columns.map(v.apply) :_ *) + .select((countsCol.as(COUNT_ID) +: v.columns.map(v.apply)).toSeq: _*) newV }