|
6 | 6 | from typing import List, Optional, Union, cast |
7 | 7 |
|
8 | 8 | import pandas as pd |
| 9 | +from croniter import croniter |
9 | 10 |
|
10 | 11 | import feast |
11 | 12 | from feast.config import Config |
|
21 | 22 | GetHistoricalFeaturesRequest, |
22 | 23 | GetJobRequest, |
23 | 24 | ListJobsRequest, |
| 25 | + ScheduleOfflineToOnlineIngestionJobRequest, |
24 | 26 | StartOfflineToOnlineIngestionJobRequest, |
25 | 27 | StartStreamToOnlineIngestionJobRequest, |
| 28 | + UnscheduleOfflineToOnlineIngestionJobRequest, |
26 | 29 | ) |
27 | 30 | from feast_spark.api.JobService_pb2_grpc import JobServiceStub |
28 | 31 | from feast_spark.constants import ConfigOptions as opt |
29 | 32 | from feast_spark.pyspark.abc import RetrievalJob, SparkJob |
30 | 33 | from feast_spark.pyspark.launcher import ( |
31 | 34 | get_job_by_id, |
32 | 35 | list_jobs, |
| 36 | + schedule_offline_to_online_ingestion, |
33 | 37 | start_historical_feature_retrieval_job, |
34 | 38 | start_historical_feature_retrieval_spark_session, |
35 | 39 | start_offline_to_online_ingestion, |
36 | 40 | start_stream_to_online_ingestion, |
| 41 | + unschedule_offline_to_online_ingestion, |
37 | 42 | ) |
38 | 43 | from feast_spark.remote_job import ( |
39 | 44 | RemoteBatchIngestionJob, |
@@ -305,6 +310,61 @@ def start_offline_to_online_ingestion( |
305 | 310 | response.log_uri, |
306 | 311 | ) |
307 | 312 |
|
| 313 | + def schedule_offline_to_online_ingestion( |
| 314 | + self, |
| 315 | + feature_table: feast.FeatureTable, |
| 316 | + ingestion_timespan: int, |
| 317 | + cron_schedule: str, |
| 318 | + ): |
| 319 | + """ |
| 320 | + Launch Scheduled Ingestion Job from Batch Source to Online Store for given feature table |
| 321 | +
|
| 322 | + Args: |
| 323 | + feature_table: FeatureTable that will be ingested into the online store |
| 324 | + ingestion_timespan: Days of data which will be ingestion per job. The boundaries |
| 325 | + on which to filter the source are [end of day of execution date - ingestion_timespan (days) , |
| 326 | + end of day of execution date) |
| 327 | + cron_schedule: Cron schedule expression |
| 328 | +
|
| 329 | + Returns: Spark Job Proxy object |
| 330 | + """ |
| 331 | + if not croniter.is_valid(cron_schedule): |
| 332 | + raise RuntimeError(f"{cron_schedule} is not a valid cron expression") |
| 333 | + if not self._use_job_service: |
| 334 | + schedule_offline_to_online_ingestion( |
| 335 | + client=self, |
| 336 | + project=self._feast.project, |
| 337 | + feature_table=feature_table, |
| 338 | + ingestion_timespan=ingestion_timespan, |
| 339 | + cron_schedule=cron_schedule, |
| 340 | + ) |
| 341 | + else: |
| 342 | + request = ScheduleOfflineToOnlineIngestionJobRequest( |
| 343 | + project=self._feast.project, |
| 344 | + table_name=feature_table.name, |
| 345 | + ingestion_timespan=ingestion_timespan, |
| 346 | + cron_schedule=cron_schedule, |
| 347 | + ) |
| 348 | + self._job_service.ScheduleOfflineToOnlineIngestionJob(request) |
| 349 | + |
| 350 | + def unschedule_offline_to_online_ingestion( |
| 351 | + self, feature_table: feast.FeatureTable, project=None |
| 352 | + ): |
| 353 | + feature_table_project = self._feast.project if project is None else project |
| 354 | + |
| 355 | + if not self._use_job_service: |
| 356 | + unschedule_offline_to_online_ingestion( |
| 357 | + client=self, |
| 358 | + project=feature_table_project, |
| 359 | + feature_table=feature_table.name, |
| 360 | + ) |
| 361 | + else: |
| 362 | + request = UnscheduleOfflineToOnlineIngestionJobRequest( |
| 363 | + project=feature_table_project, table_name=feature_table.name, |
| 364 | + ) |
| 365 | + |
| 366 | + self._job_service.UnscheduleOfflineToOnlineIngestionJob(request) |
| 367 | + |
308 | 368 | def start_stream_to_online_ingestion( |
309 | 369 | self, |
310 | 370 | feature_table: feast.FeatureTable, |
|
0 commit comments