@@ -20,26 +20,38 @@ import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
2020import org .apache .spark .sql .sources .{BaseRelation , CreatableRelationProvider }
2121import com .google .cloud .bigtable .hbase .BigtableConfiguration
2222import feast .ingestion .stores .serialization .AvroSerializer
23+ import com .google .cloud .bigtable .hbase .BigtableOptionsFactory .{
24+ BIGTABLE_BUFFERED_MUTATOR_ENABLE_THROTTLING ,
25+ BIGTABLE_BUFFERED_MUTATOR_THROTTLING_THRESHOLD_MILLIS ,
26+ BIGTABLE_BULK_MAX_ROW_KEY_COUNT ,
27+ BIGTABLE_DATA_CHANNEL_COUNT_KEY ,
28+ BIGTABLE_EMULATOR_HOST_KEY
29+ }
30+ import org .apache .hadoop .conf .Configuration
2331
2432class DefaultSource extends CreatableRelationProvider {
33+ import DefaultSource ._
34+
2535 override def createRelation (
2636 sqlContext : SQLContext ,
2737 mode : SaveMode ,
2838 parameters : Map [String , String ],
2939 data : DataFrame
3040 ): BaseRelation = {
3141 val bigtableConf = BigtableConfiguration .configure(
32- sqlContext.getConf(" spark.bigtable.projectId " ),
33- sqlContext.getConf(" spark.bigtable.instanceId " )
42+ sqlContext.getConf(PROJECT_KEY ),
43+ sqlContext.getConf(INSTANCE_KEY )
3444 )
3545
3646 if (sqlContext.getConf(" spark.bigtable.emulatorHost" , " " ).nonEmpty) {
3747 bigtableConf.set(
38- " google.bigtable.emulator.endpoint.host " ,
48+ BIGTABLE_EMULATOR_HOST_KEY ,
3949 sqlContext.getConf(" spark.bigtable.emulatorHost" )
4050 )
4151 }
4252
53+ configureBigTableClient(bigtableConf, sqlContext)
54+
4355 val rel =
4456 new BigTableSinkRelation (
4557 sqlContext,
@@ -52,4 +64,32 @@ class DefaultSource extends CreatableRelationProvider {
5264 rel.insert(data, overwrite = false )
5365 rel
5466 }
67+
68+ private def configureBigTableClient (bigtableConf : Configuration , sqlContext : SQLContext ): Unit = {
69+ val confs = sqlContext.getAllConfs
70+
71+ confs.get(CHANNEL_COUNT_KEY ).foreach(bigtableConf.set(BIGTABLE_DATA_CHANNEL_COUNT_KEY , _))
72+ confs.get(MAX_ROW_COUNT_KEY ).foreach(bigtableConf.set(BIGTABLE_BULK_MAX_ROW_KEY_COUNT , _))
73+
74+ confs
75+ .get(ENABLE_THROTTLING_KEY )
76+ .foreach(
77+ bigtableConf.set(BIGTABLE_BUFFERED_MUTATOR_ENABLE_THROTTLING , _)
78+ )
79+ confs
80+ .get(THROTTLING_THRESHOLD_MILLIS_KEY )
81+ .foreach(
82+ bigtableConf.set(BIGTABLE_BUFFERED_MUTATOR_THROTTLING_THRESHOLD_MILLIS , _)
83+ )
84+ }
85+ }
86+
87+ object DefaultSource {
88+ private val PROJECT_KEY = " spark.bigtable.projectId"
89+ private val INSTANCE_KEY = " spark.bigtable.instanceId"
90+
91+ private val CHANNEL_COUNT_KEY = " spark.bigtable.channelCount"
92+ private val ENABLE_THROTTLING_KEY = " spark.bigtable.enableThrottling"
93+ private val THROTTLING_THRESHOLD_MILLIS_KEY = " spark.bigtable.throttlingThresholdMs"
94+ private val MAX_ROW_COUNT_KEY = " spark.bigtable.maxRowCount"
5595}
0 commit comments