File tree Expand file tree Collapse file tree 5 files changed +24
-1
lines changed
spark/ingestion/src/main/scala/feast/ingestion Expand file tree Collapse file tree 5 files changed +24
-1
lines changed Original file line number Diff line number Diff line change @@ -160,6 +160,11 @@ class ConfigOptions(metaclass=ConfigMeta):
160160 #: Whitelisted Feast projects
161161 WHITELISTED_PROJECTS : Optional [str ] = None
162162
163+ #: If set - streaming ingestion job will be consuming incoming rows not continuously,
164+ #: but periodically with configured interval (in seconds).
165+ #: That may help to control amount of write requests to storage
166+ SPARK_STREAMING_TRIGGERING_INTERVAL : Optional [str ] = None
167+
163168 def defaults (self ):
164169 return {
165170 k : getattr (self , k )
Original file line number Diff line number Diff line change @@ -503,6 +503,7 @@ def __init__(
503503 checkpoint_path : Optional [str ] = None ,
504504 stencil_url : Optional [str ] = None ,
505505 drop_invalid_rows : bool = False ,
506+ triggering_interval : Optional [int ] = None ,
506507 ):
507508 super ().__init__ (
508509 feature_table ,
@@ -523,6 +524,7 @@ def __init__(
523524 )
524525 self ._extra_jars = extra_jars
525526 self ._checkpoint_path = checkpoint_path
527+ self ._triggering_interval = triggering_interval
526528
527529 def get_name (self ) -> str :
528530 return f"{ self .get_job_type ().to_pascal_case ()} -{ self .get_feature_table_name ()} "
@@ -538,6 +540,8 @@ def get_arguments(self) -> List[str]:
538540 args .extend (["--mode" , "online" ])
539541 if self ._checkpoint_path :
540542 args .extend (["--checkpoint-path" , self ._checkpoint_path ])
543+ if self ._triggering_interval :
544+ args .extend (["--triggering-interval" , str (self ._triggering_interval )])
541545 return args
542546
543547 def get_job_hash (self ) -> str :
Original file line number Diff line number Diff line change @@ -315,6 +315,9 @@ def get_stream_to_online_ingestion_params(
315315 checkpoint_path = client .config .get (opt .CHECKPOINT_PATH ),
316316 stencil_url = client .config .get (opt .STENCIL_URL ),
317317 drop_invalid_rows = client .config .get (opt .INGESTION_DROP_INVALID_ROWS ),
318+ triggering_interval = client .config .getint (
319+ opt .SPARK_STREAMING_TRIGGERING_INTERVAL , default = None
320+ ),
318321 )
319322
320323
Original file line number Diff line number Diff line change @@ -101,6 +101,9 @@ object IngestionJob {
101101
102102 opt[String ](name = " checkpoint-path" )
103103 .action((x, c) => c.copy(checkpointPath = Some (x)))
104+
105+ opt[Int ](name = " triggering-interval" )
106+ .action((x, c) => c.copy(streamingTriggeringSecs = x))
104107 }
105108
106109 def main (args : Array [String ]): Unit = {
Original file line number Diff line number Diff line change 1616 */
1717package feast .ingestion .stores .bigtable
1818
19+ import java .io .IOException
20+
1921import com .google .cloud .bigtable .hbase .BigtableConfiguration
2022import org .apache .hadoop .conf .Configuration
2123import org .apache .hadoop .hbase .{
@@ -75,7 +77,13 @@ class BigTableSinkRelation(
7577 try {
7678 admin.createTable(table)
7779 } catch {
78- case _ : TableExistsException => admin.modifyTable(table)
80+ case _ : TableExistsException =>
81+ try {
82+ admin.modifyTable(table)
83+ } catch {
84+ case e : IOException =>
85+ println(s " Table modification failed: ${e.getMessage}" )
86+ }
7987 }
8088 } finally {
8189 btConn.close()
You can’t perform that action at this time.
0 commit comments