Skip to content

Commit 94b7446

Browse files
authored
Support for Schedule Spark Application (#59)
* Support for Schedule Spark Application Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Remove the need of returning job as response Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Add e2e test Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Fix flag typo in spark ingestion job Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Fix linting Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Fix kubernetes api calls Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Add verification for e2e test Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Fix image tag for jobservice Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Fix schedule sparkapplication namespace Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Fix unscheduling Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Remove unused argument Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Remove schedule job id from label, add truncation Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * formatting Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
1 parent bb0fd44 commit 94b7446

File tree

19 files changed

+523
-6
lines changed

19 files changed

+523
-6
lines changed

infra/scripts/helm/k8s-jobservice.tpl.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
feast-jobservice:
2+
image:
3+
tag: ${IMAGE_TAG}
24
envOverrides:
35
FEAST_CORE_URL: feast-release-feast-core:6565
46
FEAST_SPARK_LAUNCHER: k8s

infra/scripts/test-end-to-end-aws.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@ PYTHONPATH=sdk/python pytest tests/e2e/ \
1818
--redis-url $NODE_IP:32379 \
1919
--emr-region us-west-2 \
2020
--kafka-brokers $NODE_IP:30092 \
21-
-m "not bq"
21+
-m "not bq and not k8s"

infra/scripts/test-end-to-end-gcp.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ CMD=$(printf '%s' \
6464
"--core-url feast-release-feast-core:6565 " \
6565
"--serving-url feast-release-feast-online-serving:6566 " \
6666
"--job-service-url js-feast-jobservice:6568 " \
67-
"--kafka-brokers 10.128.0.103:9094 --bq-project kf-feast --feast-version dev")
67+
"--kafka-brokers 10.128.0.103:9094 --bq-project kf-feast --feast-version dev -m \"not k8s\"")
6868

6969
# Delete old test running pod if it exists
7070
kubectl delete pod -n "$NAMESPACE" ci-test-runner 2>/dev/null || true

infra/scripts/test-end-to-end-local.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,4 @@ PYTHONPATH=sdk/python pytest tests/e2e/ \
6666
--staging-path s3a://feast-staging \
6767
--redis-url sparkop-redis-master.sparkop.svc.cluster.local:6379 \
6868
--kafka-brokers sparkop-kafka.sparkop.svc.cluster.local:9092 \
69-
-m "not bq"
69+
-m "not bq and not k8s"

infra/scripts/test-end-to-end-sparkop.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ CMD=$(printf '%s' \
5454
"--core-url feast-release-feast-core:6565 " \
5555
"--serving-url feast-release-feast-online-serving:6566 " \
5656
"--job-service-url js-feast-jobservice:6568 " \
57+
"--k8s-namespace sparkop-e2e " \
5758
"--kafka-brokers feast-release-kafka-headless:9092 --bq-project kf-feast --feast-version dev")
5859

5960
# Delete old test running pod if it exists

protos/feast_spark/api/JobService.proto

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ service JobService {
2929
// Start job to ingest data from offline store into online store
3030
rpc StartOfflineToOnlineIngestionJob (StartOfflineToOnlineIngestionJobRequest) returns (StartOfflineToOnlineIngestionJobResponse);
3131

32+
// Start scheduled job to ingest data from offline store into online store
33+
rpc ScheduleOfflineToOnlineIngestionJob (ScheduleOfflineToOnlineIngestionJobRequest) returns (ScheduleOfflineToOnlineIngestionJobResponse);
34+
35+
// Unschedule job to ingest data from offline store into online store
36+
rpc UnscheduleOfflineToOnlineIngestionJob(UnscheduleOfflineToOnlineIngestionJobRequest) returns (UnscheduleOfflineToOnlineIngestionJobResponse);
37+
3238
// Produce a training dataset, return a job id that will provide a file reference
3339
rpc GetHistoricalFeatures (GetHistoricalFeaturesRequest) returns (GetHistoricalFeaturesResponse);
3440

@@ -127,6 +133,29 @@ message StartOfflineToOnlineIngestionJobResponse {
127133
string log_uri = 4;
128134
}
129135

136+
message ScheduleOfflineToOnlineIngestionJobRequest {
137+
// Feature table to ingest
138+
string project = 1;
139+
string table_name = 2;
140+
141+
142+
// Timespan of the ingested data per job, in days. The data from end of the day - timespan till end of the day will be ingested. Eg. if the job execution date is 10/4/2021, and ingestion timespan is 2, then data from 9/4/2021 00:00 to 10/4/2021 23:59 (inclusive) will be ingested.
143+
int32 ingestion_timespan = 3;
144+
145+
// Crontab string. Eg. 0 13 * * *
146+
string cron_schedule = 4;
147+
148+
}
149+
150+
message ScheduleOfflineToOnlineIngestionJobResponse {}
151+
152+
message UnscheduleOfflineToOnlineIngestionJobRequest {
153+
string project = 1;
154+
string table_name = 2;
155+
}
156+
157+
message UnscheduleOfflineToOnlineIngestionJobResponse {}
158+
130159
message GetHistoricalFeaturesRequest {
131160
// List of feature references that are being retrieved
132161
repeated string feature_refs = 1;

python/feast_spark/client.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import List, Optional, Union, cast
77

88
import pandas as pd
9+
from croniter import croniter
910

1011
import feast
1112
from feast.config import Config
@@ -21,19 +22,23 @@
2122
GetHistoricalFeaturesRequest,
2223
GetJobRequest,
2324
ListJobsRequest,
25+
ScheduleOfflineToOnlineIngestionJobRequest,
2426
StartOfflineToOnlineIngestionJobRequest,
2527
StartStreamToOnlineIngestionJobRequest,
28+
UnscheduleOfflineToOnlineIngestionJobRequest,
2629
)
2730
from feast_spark.api.JobService_pb2_grpc import JobServiceStub
2831
from feast_spark.constants import ConfigOptions as opt
2932
from feast_spark.pyspark.abc import RetrievalJob, SparkJob
3033
from feast_spark.pyspark.launcher import (
3134
get_job_by_id,
3235
list_jobs,
36+
schedule_offline_to_online_ingestion,
3337
start_historical_feature_retrieval_job,
3438
start_historical_feature_retrieval_spark_session,
3539
start_offline_to_online_ingestion,
3640
start_stream_to_online_ingestion,
41+
unschedule_offline_to_online_ingestion,
3742
)
3843
from feast_spark.remote_job import (
3944
RemoteBatchIngestionJob,
@@ -305,6 +310,61 @@ def start_offline_to_online_ingestion(
305310
response.log_uri,
306311
)
307312

313+
def schedule_offline_to_online_ingestion(
314+
self,
315+
feature_table: feast.FeatureTable,
316+
ingestion_timespan: int,
317+
cron_schedule: str,
318+
):
319+
"""
320+
Launch Scheduled Ingestion Job from Batch Source to Online Store for given feature table
321+
322+
Args:
323+
feature_table: FeatureTable that will be ingested into the online store
324+
ingestion_timespan: Days of data which will be ingestion per job. The boundaries
325+
on which to filter the source are [end of day of execution date - ingestion_timespan (days) ,
326+
end of day of execution date)
327+
cron_schedule: Cron schedule expression
328+
329+
Returns: Spark Job Proxy object
330+
"""
331+
if not croniter.is_valid(cron_schedule):
332+
raise RuntimeError(f"{cron_schedule} is not a valid cron expression")
333+
if not self._use_job_service:
334+
schedule_offline_to_online_ingestion(
335+
client=self,
336+
project=self._feast.project,
337+
feature_table=feature_table,
338+
ingestion_timespan=ingestion_timespan,
339+
cron_schedule=cron_schedule,
340+
)
341+
else:
342+
request = ScheduleOfflineToOnlineIngestionJobRequest(
343+
project=self._feast.project,
344+
table_name=feature_table.name,
345+
ingestion_timespan=ingestion_timespan,
346+
cron_schedule=cron_schedule,
347+
)
348+
self._job_service.ScheduleOfflineToOnlineIngestionJob(request)
349+
350+
def unschedule_offline_to_online_ingestion(
351+
self, feature_table: feast.FeatureTable, project=None
352+
):
353+
feature_table_project = self._feast.project if project is None else project
354+
355+
if not self._use_job_service:
356+
unschedule_offline_to_online_ingestion(
357+
client=self,
358+
project=feature_table_project,
359+
feature_table=feature_table.name,
360+
)
361+
else:
362+
request = UnscheduleOfflineToOnlineIngestionJobRequest(
363+
project=feature_table_project, table_name=feature_table.name,
364+
)
365+
366+
self._job_service.UnscheduleOfflineToOnlineIngestionJob(request)
367+
308368
def start_stream_to_online_ingestion(
309369
self,
310370
feature_table: feast.FeatureTable,

python/feast_spark/job_service.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,14 @@
2828
JobStatus,
2929
JobType,
3030
ListJobsResponse,
31+
ScheduleOfflineToOnlineIngestionJobRequest,
32+
ScheduleOfflineToOnlineIngestionJobResponse,
3133
StartOfflineToOnlineIngestionJobRequest,
3234
StartOfflineToOnlineIngestionJobResponse,
3335
StartStreamToOnlineIngestionJobRequest,
3436
StartStreamToOnlineIngestionJobResponse,
37+
UnscheduleOfflineToOnlineIngestionJobRequest,
38+
UnscheduleOfflineToOnlineIngestionJobResponse,
3539
)
3640
from feast_spark.constants import ConfigOptions as opt
3741
from feast_spark.pyspark.abc import (
@@ -45,9 +49,11 @@
4549
get_job_by_id,
4650
get_stream_to_online_ingestion_params,
4751
list_jobs,
52+
schedule_offline_to_online_ingestion,
4853
start_historical_feature_retrieval_job,
4954
start_offline_to_online_ingestion,
5055
start_stream_to_online_ingestion,
56+
unschedule_offline_to_online_ingestion,
5157
)
5258
from feast_spark.third_party.grpc.health.v1.HealthService_pb2 import (
5359
HealthCheckResponse,
@@ -142,6 +148,34 @@ def StartOfflineToOnlineIngestionJob(
142148
log_uri=job.get_log_uri(), # type: ignore
143149
)
144150

151+
def ScheduleOfflineToOnlineIngestionJob(
152+
self, request: ScheduleOfflineToOnlineIngestionJobRequest, context
153+
):
154+
"""Schedule job to ingest data from offline store into online store periodically"""
155+
feature_table = self.client.feature_store.get_feature_table(
156+
request.table_name, request.project
157+
)
158+
schedule_offline_to_online_ingestion(
159+
client=self.client,
160+
project=request.project,
161+
feature_table=feature_table,
162+
ingestion_timespan=request.ingestion_timespan,
163+
cron_schedule=request.cron_schedule,
164+
)
165+
166+
return ScheduleOfflineToOnlineIngestionJobResponse()
167+
168+
def UnscheduleOfflineToOnlineIngestionJob(
169+
self, request: UnscheduleOfflineToOnlineIngestionJobRequest, context
170+
):
171+
feature_table = self.client.feature_store.get_feature_table(
172+
request.table_name, request.project
173+
)
174+
unschedule_offline_to_online_ingestion(
175+
client=self.client, project=request.project, feature_table=feature_table,
176+
)
177+
return UnscheduleOfflineToOnlineIngestionJobResponse()
178+
145179
def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
146180
"""Produce a training dataset, return a job id that will provide a file reference"""
147181

python/feast_spark/pyspark/abc.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class SparkJobType(Enum):
3030
HISTORICAL_RETRIEVAL = 0
3131
BATCH_INGESTION = 1
3232
STREAM_INGESTION = 2
33+
SCHEDULED_BATCH_INGESTION = 3
3334

3435
def to_pascal_case(self):
3536
return self.name.title().replace("_", "")
@@ -483,6 +484,60 @@ def get_arguments(self) -> List[str]:
483484
]
484485

485486

487+
class ScheduledBatchIngestionJobParameters(IngestionJobParameters):
488+
def __init__(
489+
self,
490+
feature_table: Dict,
491+
source: Dict,
492+
ingestion_timespan: int,
493+
cron_schedule: str,
494+
jar: str,
495+
redis_host: Optional[str],
496+
redis_port: Optional[int],
497+
redis_ssl: Optional[bool],
498+
bigtable_project: Optional[str],
499+
bigtable_instance: Optional[str],
500+
cassandra_host: Optional[str] = None,
501+
cassandra_port: Optional[int] = None,
502+
statsd_host: Optional[str] = None,
503+
statsd_port: Optional[int] = None,
504+
deadletter_path: Optional[str] = None,
505+
stencil_url: Optional[str] = None,
506+
):
507+
super().__init__(
508+
feature_table,
509+
source,
510+
jar,
511+
redis_host,
512+
redis_port,
513+
redis_ssl,
514+
bigtable_project,
515+
bigtable_instance,
516+
cassandra_host,
517+
cassandra_port,
518+
statsd_host,
519+
statsd_port,
520+
deadletter_path,
521+
stencil_url,
522+
)
523+
self._ingestion_timespan = ingestion_timespan
524+
self._cron_schedule = cron_schedule
525+
526+
def get_name(self) -> str:
527+
return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}"
528+
529+
def get_job_type(self) -> SparkJobType:
530+
return SparkJobType.SCHEDULED_BATCH_INGESTION
531+
532+
def get_arguments(self) -> List[str]:
533+
return super().get_arguments() + [
534+
"--mode",
535+
"offline",
536+
"--ingestion-timespan",
537+
str(self._ingestion_timespan),
538+
]
539+
540+
486541
class StreamIngestionJobParameters(IngestionJobParameters):
487542
def __init__(
488543
self,
@@ -636,6 +691,29 @@ def offline_to_online_ingestion(
636691
"""
637692
raise NotImplementedError
638693

694+
@abc.abstractmethod
695+
def schedule_offline_to_online_ingestion(
696+
self, ingestion_job_params: ScheduledBatchIngestionJobParameters
697+
):
698+
"""
699+
Submits a scheduled batch ingestion job to a Spark cluster.
700+
701+
Raises:
702+
SparkJobFailure: The spark job submission failed, encountered error
703+
during execution, or timeout.
704+
705+
Returns:
706+
ScheduledBatchIngestionJob: wrapper around remote job that can be used to check when job completed.
707+
"""
708+
raise NotImplementedError
709+
710+
@abc.abstractmethod
711+
def unschedule_offline_to_online_ingestion(self, project: str, feature_table: str):
712+
"""
713+
Unschedule a scheduled batch ingestion job.
714+
"""
715+
raise NotImplementedError
716+
639717
@abc.abstractmethod
640718
def start_stream_to_online_ingestion(
641719
self, ingestion_job_params: StreamIngestionJobParameters

python/feast_spark/pyspark/launcher.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
JobLauncher,
1919
RetrievalJob,
2020
RetrievalJobParameters,
21+
ScheduledBatchIngestionJobParameters,
2122
SparkJob,
2223
StreamIngestionJob,
2324
StreamIngestionJobParameters,
@@ -293,6 +294,54 @@ def start_offline_to_online_ingestion(
293294
)
294295

295296

297+
def schedule_offline_to_online_ingestion(
298+
client: "Client",
299+
project: str,
300+
feature_table: FeatureTable,
301+
ingestion_timespan: int,
302+
cron_schedule: str,
303+
):
304+
305+
launcher = resolve_launcher(client.config)
306+
307+
launcher.schedule_offline_to_online_ingestion(
308+
ScheduledBatchIngestionJobParameters(
309+
jar=client.config.get(opt.SPARK_INGESTION_JAR),
310+
source=_source_to_argument(feature_table.batch_source, client.config),
311+
feature_table=_feature_table_to_argument(client, project, feature_table),
312+
ingestion_timespan=ingestion_timespan,
313+
cron_schedule=cron_schedule,
314+
redis_host=client.config.get(opt.REDIS_HOST),
315+
redis_port=bool(client.config.get(opt.REDIS_HOST))
316+
and client.config.getint(opt.REDIS_PORT),
317+
redis_ssl=client.config.getboolean(opt.REDIS_SSL),
318+
bigtable_project=client.config.get(opt.BIGTABLE_PROJECT),
319+
bigtable_instance=client.config.get(opt.BIGTABLE_INSTANCE),
320+
cassandra_host=client.config.get(opt.CASSANDRA_HOST),
321+
cassandra_port=bool(client.config.get(opt.CASSANDRA_HOST))
322+
and client.config.getint(opt.CASSANDRA_PORT),
323+
statsd_host=(
324+
client.config.getboolean(opt.STATSD_ENABLED)
325+
and client.config.get(opt.STATSD_HOST)
326+
),
327+
statsd_port=(
328+
client.config.getboolean(opt.STATSD_ENABLED)
329+
and client.config.getint(opt.STATSD_PORT)
330+
),
331+
deadletter_path=client.config.get(opt.DEADLETTER_PATH),
332+
stencil_url=client.config.get(opt.STENCIL_URL),
333+
)
334+
)
335+
336+
337+
def unschedule_offline_to_online_ingestion(
338+
client: "Client", project: str, feature_table: FeatureTable,
339+
):
340+
341+
launcher = resolve_launcher(client.config)
342+
launcher.unschedule_offline_to_online_ingestion(project, feature_table.name)
343+
344+
296345
def get_stream_to_online_ingestion_params(
297346
client: "Client", project: str, feature_table: FeatureTable, extra_jars: List[str]
298347
) -> StreamIngestionJobParameters:

0 commit comments

Comments
 (0)