|
| 1 | +package org.apache.spark.sql.graphframes |
| 2 | + |
| 3 | +import org.apache.spark.internal.config.ConfigEntry |
| 4 | +import org.apache.spark.sql.SparkSession |
| 5 | +import org.apache.spark.sql.internal.SQLConf |
| 6 | +import org.apache.spark.storage.StorageLevel |
| 7 | + |
| 8 | +object GraphFramesConf { |
| 9 | + private val CONNECTED_COMPONENTS_ALGORITHM = |
| 10 | + SQLConf |
| 11 | + .buildConf("spark.graphframes.connectedComponents.algorithm") |
| 12 | + .doc(""" Sets the connected components algorithm to use (default: "graphframes"). Supported algorithms |
| 13 | + | - "graphframes": Uses alternating large star and small star iterations proposed in |
| 14 | + | [[http://dx.doi.org/10.1145/2670979.2670997 Connected Components in MapReduce and Beyond]] |
| 15 | + | with skewed join optimization. |
| 16 | + | - "graphx": Converts the graph to a GraphX graph and then uses the connected components |
| 17 | + | implementation in GraphX. |
| 18 | + | @see org.graphframes.lib.ConnectedComponents.supportedAlgorithms""".stripMargin) |
| 19 | + .version("0.9.0") |
| 20 | + .stringConf |
| 21 | + .createOptional |
| 22 | + |
| 23 | + private val CONNECTED_COMPONENTS_BROADCAST_THRESHOLD = |
| 24 | + SQLConf |
| 25 | + .buildConf("spark.graphframes.connectedComponents.broadcastthreshold") |
| 26 | + .doc(""" Sets broadcast threshold in propagating component assignments (default: 1000000). If a node |
| 27 | + | degree is greater than this threshold at some iteration, its component assignment will be |
| 28 | + | collected and then broadcasted back to propagate the assignment to its neighbors. Otherwise, |
| 29 | + | the assignment propagation is done by a normal Spark join. This parameter is only used when |
| 30 | + | the algorithm is set to "graphframes".""".stripMargin) |
| 31 | + .version("0.9.0") |
| 32 | + .intConf |
| 33 | + .createOptional |
| 34 | + |
| 35 | + private val CONNECTED_COMPONENTS_CHECKPOINT_INTERVAL = |
| 36 | + SQLConf |
| 37 | + .buildConf("spark.graphframes.connectedComponents.checkpointinterval") |
| 38 | + .doc(""" Sets checkpoint interval in terms of number of iterations (default: 2). Checkpointing |
| 39 | + | regularly helps recover from failures, clean shuffle files, shorten the lineage of the |
| 40 | + | computation graph, and reduce the complexity of plan optimization. As of Spark 2.0, the |
| 41 | + | complexity of plan optimization would grow exponentially without checkpointing. Hence, |
| 42 | + | disabling or setting longer-than-default checkpoint intervals are not recommended. Checkpoint |
| 43 | + | data is saved under `org.apache.spark.SparkContext.getCheckpointDir` with prefix |
| 44 | + | "connected-components". If the checkpoint directory is not set, this throws a |
| 45 | + | `java.io.IOException`. Set a nonpositive value to disable checkpointing. This parameter is |
| 46 | + | only used when the algorithm is set to "graphframes". Its default value might change in the |
| 47 | + | future. |
| 48 | + | @see `org.apache.spark.SparkContext.setCheckpointDir` in Spark API doc""".stripMargin) |
| 49 | + .version("0.9.0") |
| 50 | + .intConf |
| 51 | + .createOptional |
| 52 | + |
| 53 | + private val CONNECTED_COMPONENTS_INTERMEDIATE_STORAGE_LEVEL = |
| 54 | + SQLConf |
| 55 | + .buildConf("spark.graphframes.connectedComponents.intermediatestoragelevel") |
| 56 | + .doc("Sets storage level for intermediate datasets that require multiple passes (default: ``MEMORY_AND_DISK``).") |
| 57 | + .version("0.9.0") |
| 58 | + .stringConf |
| 59 | + .createOptional |
| 60 | + |
| 61 | + private def get(entry: ConfigEntry[_]): Option[String] = { |
| 62 | + try { |
| 63 | + Option(SparkSession.getActiveSession.get.conf.get(entry.key)) |
| 64 | + } catch { |
| 65 | + case _: NoSuchElementException => None |
| 66 | + } |
| 67 | + } |
| 68 | + |
| 69 | + def getConnectedComponentsAlgorithm: Option[String] = { |
| 70 | + get(CONNECTED_COMPONENTS_ALGORITHM) match { |
| 71 | + case Some(threshold) => Some(threshold.toLowerCase) |
| 72 | + case _ => None |
| 73 | + } |
| 74 | + } |
| 75 | + |
| 76 | + def getConnectedComponentsBroadcastThreshold: Option[Int] = { |
| 77 | + get(CONNECTED_COMPONENTS_BROADCAST_THRESHOLD) match { |
| 78 | + case Some(threshold) => Some(threshold.toInt) |
| 79 | + case _ => None |
| 80 | + } |
| 81 | + } |
| 82 | + |
| 83 | + def getConnectedComponentsCheckpointInterval: Option[Int] = { |
| 84 | + get(CONNECTED_COMPONENTS_CHECKPOINT_INTERVAL) match { |
| 85 | + case Some(interval) => Some(interval.toInt) |
| 86 | + case _ => None |
| 87 | + } |
| 88 | + } |
| 89 | + |
| 90 | + def getConnectedComponentsStorageLevel: Option[StorageLevel] = { |
| 91 | + get(CONNECTED_COMPONENTS_INTERMEDIATE_STORAGE_LEVEL) match { |
| 92 | + case Some(level) => Some(StorageLevel.fromString(level.toUpperCase)) |
| 93 | + case _ => None |
| 94 | + } |
| 95 | + } |
| 96 | +} |
0 commit comments