Skip to content

Commit 0df33d0

Browse files
committed
Extra configuration for bigtable client
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 279df79 commit 0df33d0

File tree

1 file changed

+43
-3
lines changed

1 file changed

+43
-3
lines changed

spark/ingestion/src/main/scala/feast/ingestion/stores/bigtable/DefaultSource.scala

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,38 @@ import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
2020
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider}
2121
import com.google.cloud.bigtable.hbase.BigtableConfiguration
2222
import feast.ingestion.stores.serialization.AvroSerializer
23+
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory.{
24+
BIGTABLE_BUFFERED_MUTATOR_ENABLE_THROTTLING,
25+
BIGTABLE_BUFFERED_MUTATOR_THROTTLING_THRESHOLD_MILLIS,
26+
BIGTABLE_BULK_MAX_ROW_KEY_COUNT,
27+
BIGTABLE_DATA_CHANNEL_COUNT_KEY,
28+
BIGTABLE_EMULATOR_HOST_KEY
29+
}
30+
import org.apache.hadoop.conf.Configuration
2331

2432
class DefaultSource extends CreatableRelationProvider {
33+
import DefaultSource._
34+
2535
override def createRelation(
2636
sqlContext: SQLContext,
2737
mode: SaveMode,
2838
parameters: Map[String, String],
2939
data: DataFrame
3040
): BaseRelation = {
3141
val bigtableConf = BigtableConfiguration.configure(
32-
sqlContext.getConf("spark.bigtable.projectId"),
33-
sqlContext.getConf("spark.bigtable.instanceId")
42+
sqlContext.getConf(PROJECT_KEY),
43+
sqlContext.getConf(INSTANCE_KEY)
3444
)
3545

3646
if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) {
3747
bigtableConf.set(
38-
"google.bigtable.emulator.endpoint.host",
48+
BIGTABLE_EMULATOR_HOST_KEY,
3949
sqlContext.getConf("spark.bigtable.emulatorHost")
4050
)
4151
}
4252

53+
configureBigTableClient(bigtableConf, sqlContext)
54+
4355
val rel =
4456
new BigTableSinkRelation(
4557
sqlContext,
@@ -52,4 +64,32 @@ class DefaultSource extends CreatableRelationProvider {
5264
rel.insert(data, overwrite = false)
5365
rel
5466
}
67+
68+
private def configureBigTableClient(bigtableConf: Configuration, sqlContext: SQLContext): Unit = {
69+
val confs = sqlContext.getAllConfs
70+
71+
confs.get(CHANNEL_COUNT_KEY).foreach(bigtableConf.set(BIGTABLE_DATA_CHANNEL_COUNT_KEY, _))
72+
confs.get(MAX_ROW_COUNT_KEY).foreach(bigtableConf.set(BIGTABLE_BULK_MAX_ROW_KEY_COUNT, _))
73+
74+
confs
75+
.get(ENABLE_THROTTLING_KEY)
76+
.foreach(
77+
bigtableConf.set(BIGTABLE_BUFFERED_MUTATOR_ENABLE_THROTTLING, _)
78+
)
79+
confs
80+
.get(THROTTLING_THRESHOLD_MILLIS_KEY)
81+
.foreach(
82+
bigtableConf.set(BIGTABLE_BUFFERED_MUTATOR_THROTTLING_THRESHOLD_MILLIS, _)
83+
)
84+
}
85+
}
86+
87+
object DefaultSource {
88+
private val PROJECT_KEY = "spark.bigtable.projectId"
89+
private val INSTANCE_KEY = "spark.bigtable.instanceId"
90+
91+
private val CHANNEL_COUNT_KEY = "spark.bigtable.channelCount"
92+
private val ENABLE_THROTTLING_KEY = "spark.bigtable.enableThrottling"
93+
private val THROTTLING_THRESHOLD_MILLIS_KEY = "spark.bigtable.throttlingThresholdMs"
94+
private val MAX_ROW_COUNT_KEY = "spark.bigtable.maxRowCount"
5595
}

0 commit comments

Comments
 (0)