Skip to content

Commit af0ec85

Browse files
authored
Configurable triggering interval from streaming ingestion (#63)
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> option default value Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 8f6b793 commit af0ec85

File tree

5 files changed

+24
-1
lines changed

5 files changed

+24
-1
lines changed

python/feast_spark/constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ class ConfigOptions(metaclass=ConfigMeta):
160160
#: Whitelisted Feast projects
161161
WHITELISTED_PROJECTS: Optional[str] = None
162162

163+
#: If set - streaming ingestion job will be consuming incoming rows not continuously,
164+
#: but periodically with configured interval (in seconds).
165+
#: That may help to control amount of write requests to storage
166+
SPARK_STREAMING_TRIGGERING_INTERVAL: Optional[str] = None
167+
163168
def defaults(self):
164169
return {
165170
k: getattr(self, k)

python/feast_spark/pyspark/abc.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@ def __init__(
503503
checkpoint_path: Optional[str] = None,
504504
stencil_url: Optional[str] = None,
505505
drop_invalid_rows: bool = False,
506+
triggering_interval: Optional[int] = None,
506507
):
507508
super().__init__(
508509
feature_table,
@@ -523,6 +524,7 @@ def __init__(
523524
)
524525
self._extra_jars = extra_jars
525526
self._checkpoint_path = checkpoint_path
527+
self._triggering_interval = triggering_interval
526528

527529
def get_name(self) -> str:
528530
return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}"
@@ -538,6 +540,8 @@ def get_arguments(self) -> List[str]:
538540
args.extend(["--mode", "online"])
539541
if self._checkpoint_path:
540542
args.extend(["--checkpoint-path", self._checkpoint_path])
543+
if self._triggering_interval:
544+
args.extend(["--triggering-interval", str(self._triggering_interval)])
541545
return args
542546

543547
def get_job_hash(self) -> str:

python/feast_spark/pyspark/launcher.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,9 @@ def get_stream_to_online_ingestion_params(
315315
checkpoint_path=client.config.get(opt.CHECKPOINT_PATH),
316316
stencil_url=client.config.get(opt.STENCIL_URL),
317317
drop_invalid_rows=client.config.get(opt.INGESTION_DROP_INVALID_ROWS),
318+
triggering_interval=client.config.getint(
319+
opt.SPARK_STREAMING_TRIGGERING_INTERVAL, default=None
320+
),
318321
)
319322

320323

spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ object IngestionJob {
101101

102102
opt[String](name = "checkpoint-path")
103103
.action((x, c) => c.copy(checkpointPath = Some(x)))
104+
105+
opt[Int](name = "triggering-interval")
106+
.action((x, c) => c.copy(streamingTriggeringSecs = x))
104107
}
105108

106109
def main(args: Array[String]): Unit = {

spark/ingestion/src/main/scala/feast/ingestion/stores/bigtable/BigTableSinkRelation.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package feast.ingestion.stores.bigtable
1818

19+
import java.io.IOException
20+
1921
import com.google.cloud.bigtable.hbase.BigtableConfiguration
2022
import org.apache.hadoop.conf.Configuration
2123
import org.apache.hadoop.hbase.{
@@ -75,7 +77,13 @@ class BigTableSinkRelation(
7577
try {
7678
admin.createTable(table)
7779
} catch {
78-
case _: TableExistsException => admin.modifyTable(table)
80+
case _: TableExistsException =>
81+
try {
82+
admin.modifyTable(table)
83+
} catch {
84+
case e: IOException =>
85+
println(s"Table modification failed: ${e.getMessage}")
86+
}
7987
}
8088
} finally {
8189
btConn.close()

0 commit comments

Comments
 (0)