Skip to content
Open
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8a1a310
Add Synapse Launcher
xiaoyongzhu Apr 25, 2021
8c06c45
Remove unnecessary printout
xiaoyongzhu Apr 26, 2021
b3fd983
Add eventhub support
xiaoyongzhu May 8, 2021
24f0986
Merge branch 'master' of https://github.com/xiaoyongzhu/feast-spark
xiaoyongzhu May 19, 2021
89f48a3
Add EventHub support and Redis Auth support
xiaoyongzhu May 19, 2021
f2cd8be
Adding EventHub support in Spark jobs
xiaoyongzhu May 19, 2021
53d7e20
add ScheduledBatchIngestionJobParameters
xiaoyongzhu Aug 31, 2021
79866e9
Merge pull request #1 from feast-dev/master
xiaoyongzhu Aug 31, 2021
c469ee7
Add Azure specific dependencies
xiaoyongzhu Sep 2, 2021
91c6822
Change azure storage dependencies
xiaoyongzhu Sep 2, 2021
0a7a56c
Commen for removing/adding spaces between brackets
xiaoyongzhu Sep 9, 2021
f4c0d5a
Delete feature_store_debug.py
xiaoyongzhumsft Sep 9, 2021
6bc9260
Update StreamingPipeline.scala
xiaoyongzhumsft Sep 14, 2021
dd53a53
Merge branch 'feast-dev:master' into master
xiaoyongzhumsft Oct 1, 2021
9fde235
Update synapse.py
xiaoyongzhumsft Oct 1, 2021
bb3d6be
Update synapse.py
xiaoyongzhumsft Oct 1, 2021
6885f31
Merge branch 'feast-dev:master' into master
xiaoyongzhumsft Oct 4, 2021
08da84f
Fix Redis auth issue
xiaoyongzhu Oct 12, 2021
0ddbcef
Update Ingestion jobs and add supporting files
xiaoyongzhu Oct 12, 2021
48a1c44
Fix build issues
xiaoyongzhu Oct 12, 2021
762386e
Add support for Kafka ingestion
xiaoyongzhu Oct 12, 2021
41fc406
Add build and push instructions
xiaoyongzhu Oct 12, 2021
0f7d433
Adding License
xiaoyongzhu Oct 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add ScheduledBatchIngestionJobParameters
  • Loading branch information
xiaoyongzhu committed Aug 31, 2021
commit 53d7e20e3176cb2202d9f7ceb50d90237166f9c5
56 changes: 56 additions & 0 deletions python/feast_spark/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,62 @@ def get_arguments(self) -> List[str]:
self._end.strftime("%Y-%m-%dT%H:%M:%S"),
]

class ScheduledBatchIngestionJobParameters(IngestionJobParameters):
def __init__(
self,
feature_table: Dict,
source: Dict,
ingestion_timespan: int,
cron_schedule: str,
jar: str,
redis_host: Optional[str],
redis_port: Optional[int],
redis_ssl: Optional[bool],
bigtable_project: Optional[str],
bigtable_instance: Optional[str],
cassandra_host: Optional[str] = None,
cassandra_port: Optional[int] = None,
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
stencil_url: Optional[str] = None,
):
super().__init__(
feature_table,
source,
jar,
redis_host,
redis_port,
redis_ssl,
bigtable_project,
bigtable_instance,
cassandra_host,
cassandra_port,
statsd_host,
statsd_port,
deadletter_path,
stencil_url,
)
self._ingestion_timespan = ingestion_timespan
self._cron_schedule = cron_schedule

def get_name(self) -> str:
return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}"

def get_job_type(self) -> SparkJobType:
return SparkJobType.SCHEDULED_BATCH_INGESTION

def get_job_schedule(self) -> str:
return self._cron_schedule

def get_arguments(self) -> List[str]:
return super().get_arguments() + [
"--mode",
"offline",
"--ingestion-timespan",
str(self._ingestion_timespan),
]


class StreamIngestionJobParameters(IngestionJobParameters):
def __init__(
Expand Down