Skip to content

Commit 4068b3e

Browse files
authored
Implement JobService API calls & connect it to SDK (#1129)
* Implement half of JobService functionality Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com> * Python lint Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com> * Correct implementation of all jobservice functions tested on standalone mode * New API calls (start_offline_to_online_ingestion, start_stream_to_online_ingestion) now return Remote Jobs instead of job ids * Implement list_jobs & get_job for standalone mode (looks like Spark is running in local mode and we can't get job statuses so we have to keep cache in memory) * Wire up list_jobs & get_job on client side with job service * Tested locally on Feast 101 notebook, everything works Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com> * Fix list_jobs when include_terminated=False Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com> * e2e tests Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com> * Format python Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com> * Add docstring Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com> * Remove spark extra configs and hardcode spark jars/conf in stadalone mode Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com> * Remove extra spark params from dockerfile Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com> * Use start_stream_to_online_ingestion from launcher in job service Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com>
1 parent 1299815 commit 4068b3e

File tree

14 files changed

+245
-68
lines changed

14 files changed

+245
-68
lines changed

infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ spec:
5757
{{- end }}
5858

5959
env:
60+
- name: FEAST_CORE_URL
61+
value: "{{ .Release.Name }}-feast-core:6565"
62+
- name: FEAST_HISTORICAL_SERVING_URL
63+
value: "{{ .Release.Name }}-feast-batch-serving:6566"
64+
6065
{{- if .Values.gcpServiceAccount.enabled }}
6166
- name: GOOGLE_APPLICATION_CREDENTIALS
6267
value: /etc/secrets/google/{{ .Values.gcpServiceAccount.existingSecret.key }}

infra/charts/feast/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ feast-jupyter:
1313
# feast-jupyter.enabled -- Flag to install Feast Jupyter Notebook with SDK
1414
enabled: true
1515

16+
feast-jobservice:
17+
# feast-jobservice.enabled -- Flag to install Feast Job Service
18+
enabled: true
19+
1620
postgresql:
1721
# postgresql.enabled -- Flag to install Postgresql
1822
enabled: true

infra/docker/jobservice/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM python:3.7-slim-buster
1+
FROM jupyter/pyspark-notebook:ae5f7e104dd5
22

33
USER root
44
WORKDIR /feast

infra/docker/jupyter/Dockerfile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,5 @@ COPY examples .
2929
ENV FEAST_SPARK_LAUNCHER standalone
3030
ENV FEAST_SPARK_STANDALONE_MASTER "local[*]"
3131
ENV FEAST_SPARK_HOME $SPARK_HOME
32-
ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \
33-
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
3432

3533
CMD ["start-notebook.sh", "--NotebookApp.token=''"]

protos/feast/core/JobService.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ message StartOfflineToOnlineIngestionJobResponse {
110110
}
111111

112112
message GetHistoricalFeaturesRequest {
113-
// List of features that are being retrieved
113+
// List of feature references that are being retrieved
114114
repeated string feature_refs = 1;
115115

116116
// Batch DataSource that can be used to obtain entity values for historical retrieval.

sdk/python/feast/client.py

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,13 @@
6565
ListProjectsResponse,
6666
)
6767
from feast.core.CoreService_pb2_grpc import CoreServiceStub
68-
from feast.core.JobService_pb2 import GetHistoricalFeaturesRequest
68+
from feast.core.JobService_pb2 import (
69+
GetHistoricalFeaturesRequest,
70+
GetJobRequest,
71+
ListJobsRequest,
72+
StartOfflineToOnlineIngestionJobRequest,
73+
StartStreamToOnlineIngestionJobRequest,
74+
)
6975
from feast.core.JobService_pb2_grpc import JobServiceStub
7076
from feast.data_format import ParquetFormat
7177
from feast.data_source import BigQuerySource, FileSource
@@ -94,7 +100,12 @@
94100
start_offline_to_online_ingestion,
95101
start_stream_to_online_ingestion,
96102
)
97-
from feast.remote_job import RemoteRetrievalJob
103+
from feast.remote_job import (
104+
RemoteBatchIngestionJob,
105+
RemoteRetrievalJob,
106+
RemoteStreamIngestionJob,
107+
get_remote_job_from_proto,
108+
)
98109
from feast.serving.ServingService_pb2 import (
99110
GetFeastServingInfoRequest,
100111
GetOnlineFeaturesRequestV2,
@@ -201,6 +212,10 @@ def _job_service(self):
201212
202213
Returns: JobServiceStub
203214
"""
215+
# Don't try to initialize job service stub if the job service is disabled
216+
if not self._use_job_service:
217+
return None
218+
204219
if not self._job_service_stub:
205220
channel = create_grpc_channel(
206221
url=self._config.get(CONFIG_JOB_SERVICE_URL_KEY),
@@ -891,8 +906,8 @@ def get_historical_features(
891906
self,
892907
feature_refs: List[str],
893908
entity_source: Union[pd.DataFrame, FileSource, BigQuerySource],
894-
project: str = None,
895-
output_location: str = None,
909+
project: Optional[str] = None,
910+
output_location: Optional[str] = None,
896911
) -> RetrievalJob:
897912
"""
898913
Launch a historical feature retrieval job.
@@ -915,6 +930,7 @@ def get_historical_features(
915930
retrieval job.
916931
project: Specifies the project that contains the feature tables
917932
which the requested features belong to.
933+
destination_path: Specifies the path in a bucket to write the exported feature data files
918934
919935
Returns:
920936
Returns a retrieval job object that can be used to monitor retrieval
@@ -1062,18 +1078,57 @@ def start_offline_to_online_ingestion(
10621078
:param end: upper datetime boundary
10631079
:return: Spark Job Proxy object
10641080
"""
1065-
return start_offline_to_online_ingestion(feature_table, start, end, self)
1081+
if not self._use_job_service:
1082+
return start_offline_to_online_ingestion(feature_table, start, end, self)
1083+
else:
1084+
request = StartOfflineToOnlineIngestionJobRequest(
1085+
project=self.project, table_name=feature_table.name,
1086+
)
1087+
request.start_date.FromDatetime(start)
1088+
request.end_date.FromDatetime(end)
1089+
response = self._job_service.StartOfflineToOnlineIngestionJob(request)
1090+
return RemoteBatchIngestionJob(
1091+
self._job_service, self._extra_grpc_params, response.id,
1092+
)
10661093

10671094
def start_stream_to_online_ingestion(
10681095
self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None,
10691096
) -> SparkJob:
1070-
return start_stream_to_online_ingestion(feature_table, extra_jars or [], self)
1097+
if not self._use_job_service:
1098+
return start_stream_to_online_ingestion(
1099+
feature_table, extra_jars or [], self
1100+
)
1101+
else:
1102+
request = StartStreamToOnlineIngestionJobRequest(
1103+
project=self.project, table_name=feature_table.name,
1104+
)
1105+
response = self._job_service.StartStreamToOnlineIngestionJob(request)
1106+
return RemoteStreamIngestionJob(
1107+
self._job_service, self._extra_grpc_params, response.id,
1108+
)
10711109

10721110
def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
1073-
return list_jobs(include_terminated, self)
1111+
if not self._use_job_service:
1112+
return list_jobs(include_terminated, self)
1113+
else:
1114+
request = ListJobsRequest(include_terminated=include_terminated)
1115+
response = self._job_service.ListJobs(request)
1116+
return [
1117+
get_remote_job_from_proto(
1118+
self._job_service, self._extra_grpc_params, job
1119+
)
1120+
for job in response.jobs
1121+
]
10741122

10751123
def get_job_by_id(self, job_id: str) -> SparkJob:
1076-
return get_job_by_id(job_id, self)
1124+
if not self._use_job_service:
1125+
return get_job_by_id(job_id, self)
1126+
else:
1127+
request = GetJobRequest(job_id=job_id)
1128+
response = self._job_service.GetJob(request)
1129+
return get_remote_job_from_proto(
1130+
self._job_service, self._extra_grpc_params, response.job
1131+
)
10771132

10781133
def stage_dataframe(
10791134
self, df: pd.DataFrame, event_timestamp_column: str,

sdk/python/feast/constants.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ class AuthProvider(Enum):
9393
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path"
9494
CONFIG_SPARK_EMR_LOG_LOCATION = "emr_log_location"
9595

96-
CONFIG_SPARK_EXTRA_OPTIONS = "spark_extra_options"
97-
9896
# Configuration option default values
9997
FEAST_DEFAULT_OPTIONS = {
10098
# Default Feast project to use
@@ -115,7 +113,11 @@ class AuthProvider(Enum):
115113
CONFIG_SERVING_ENABLE_SSL_KEY: "False",
116114
# Path to certificate(s) to secure connection to Feast Serving
117115
CONFIG_SERVING_SERVER_SSL_CERT_KEY: "",
118-
# Default connection timeout to Feast Serving and Feast Core (in seconds)
116+
# Enable or disable TLS/SSL to Feast Job Service
117+
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False",
118+
# Path to certificate(s) to secure connection to Feast Job Service
119+
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "",
120+
# Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds)
119121
CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "10",
120122
# Default gRPC connection timeout when sending an ApplyFeatureSet command to
121123
# Feast Core (in seconds)
@@ -128,13 +130,9 @@ class AuthProvider(Enum):
128130
CONFIG_AUTH_PROVIDER: "google",
129131
CONFIG_SPARK_LAUNCHER: "dataproc",
130132
CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/spark/ingestion/feast-ingestion-spark-develop.jar",
133+
CONFIG_SPARK_STANDALONE_MASTER: "local[*]",
131134
CONFIG_REDIS_HOST: "localhost",
132135
CONFIG_REDIS_PORT: "6379",
133136
CONFIG_REDIS_SSL: "False",
134137
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT: "parquet",
135-
CONFIG_SPARK_EXTRA_OPTIONS: "",
136-
# Enable or disable TLS/SSL to Feast Service
137-
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False",
138-
# Path to certificate(s) to secure connection to Feast Job Service
139-
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "",
140138
}

sdk/python/feast/job_service.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,13 @@
1212
GetJobResponse,
1313
)
1414
from feast.core.JobService_pb2 import Job as JobProto
15-
from feast.core.JobService_pb2 import JobStatus, JobType, ListJobsResponse
15+
from feast.core.JobService_pb2 import (
16+
JobStatus,
17+
JobType,
18+
ListJobsResponse,
19+
StartOfflineToOnlineIngestionJobResponse,
20+
StartStreamToOnlineIngestionJobResponse,
21+
)
1622
from feast.data_source import DataSource
1723
from feast.pyspark.abc import (
1824
BatchIngestionJob,
@@ -21,7 +27,10 @@
2127
SparkJobStatus,
2228
StreamIngestionJob,
2329
)
24-
from feast.pyspark.launcher import start_historical_feature_retrieval_job
30+
from feast.pyspark.launcher import (
31+
start_historical_feature_retrieval_job,
32+
start_stream_to_online_ingestion,
33+
)
2534
from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc
2635
from feast.third_party.grpc.health.v1.HealthService_pb2 import (
2736
HealthCheckResponse,
@@ -62,9 +71,15 @@ def _job_to_proto(self, spark_job: SparkJob) -> JobProto:
6271

6372
def StartOfflineToOnlineIngestionJob(self, request, context):
6473
"""Start job to ingest data from offline store into online store"""
65-
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
66-
context.set_details("Method not implemented!")
67-
raise NotImplementedError("Method not implemented!")
74+
feature_table = self.client.get_feature_table(
75+
request.table_name, request.project
76+
)
77+
job = self.client.start_offline_to_online_ingestion(
78+
feature_table,
79+
request.start_date.ToDatetime(),
80+
request.end_date.ToDatetime(),
81+
)
82+
return StartOfflineToOnlineIngestionJobResponse(id=job.get_id())
6883

6984
def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
7085
"""Produce a training dataset, return a job id that will provide a file reference"""
@@ -86,9 +101,13 @@ def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
86101

87102
def StartStreamToOnlineIngestionJob(self, request, context):
88103
"""Start job to ingest data from stream into online store"""
89-
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
90-
context.set_details("Method not implemented!")
91-
raise NotImplementedError("Method not implemented!")
104+
105+
feature_table = self.client.get_feature_table(
106+
request.table_name, request.project
107+
)
108+
# TODO: add extra_jars to request
109+
job = start_stream_to_online_ingestion(feature_table, [], self.client)
110+
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())
92111

93112
def ListJobs(self, request, context):
94113
"""List all types of jobs"""

sdk/python/feast/pyspark/abc.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,6 @@ def get_arguments(self) -> List[str]:
113113
"""
114114
raise NotImplementedError
115115

116-
@abc.abstractmethod
117-
def get_extra_options(self) -> str:
118-
"""
119-
Spark job dependencies (expected to resolved from maven)
120-
Returns:
121-
str: Spark job dependencies.
122-
"""
123-
raise NotImplementedError
124-
125116

126117
class RetrievalJobParameters(SparkJobParameters):
127118
def __init__(
@@ -130,7 +121,6 @@ def __init__(
130121
feature_tables_sources: List[Dict],
131122
entity_source: Dict,
132123
destination: Dict,
133-
extra_options: str = "",
134124
):
135125
"""
136126
Args:
@@ -242,7 +232,6 @@ def __init__(
242232
self._feature_tables_sources = feature_tables_sources
243233
self._entity_source = entity_source
244234
self._destination = destination
245-
self._extra_options = extra_options
246235

247236
def get_name(self) -> str:
248237
all_feature_tables_names = [ft["name"] for ft in self._feature_tables]
@@ -271,9 +260,6 @@ def get_arguments(self) -> List[str]:
271260
def get_destination_path(self) -> str:
272261
return self._destination["path"]
273262

274-
def get_extra_options(self) -> str:
275-
return self._extra_options
276-
277263

278264
class RetrievalJob(SparkJob):
279265
"""
@@ -315,7 +301,6 @@ def __init__(
315301
redis_host: str,
316302
redis_port: int,
317303
redis_ssl: bool,
318-
extra_options: str = "",
319304
):
320305
self._feature_table = feature_table
321306
self._source = source
@@ -325,7 +310,6 @@ def __init__(
325310
self._redis_host = redis_host
326311
self._redis_port = redis_port
327312
self._redis_ssl = redis_ssl
328-
self._extra_options = extra_options
329313

330314
def get_name(self) -> str:
331315
return (
@@ -364,9 +348,6 @@ def get_arguments(self) -> List[str]:
364348
json.dumps(self._get_redis_config()),
365349
]
366350

367-
def get_extra_options(self) -> str:
368-
return self._extra_options
369-
370351

371352
class StreamIngestionJobParameters(SparkJobParameters):
372353
def __init__(
@@ -378,7 +359,6 @@ def __init__(
378359
redis_host: str,
379360
redis_port: int,
380361
redis_ssl: bool,
381-
extra_options="",
382362
):
383363
self._feature_table = feature_table
384364
self._source = source
@@ -387,7 +367,6 @@ def __init__(
387367
self._redis_host = redis_host
388368
self._redis_port = redis_port
389369
self._redis_ssl = redis_ssl
390-
self._extra_options = extra_options
391370

392371
def get_name(self) -> str:
393372
return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}"
@@ -422,9 +401,6 @@ def get_arguments(self) -> List[str]:
422401
json.dumps(self._get_redis_config()),
423402
]
424403

425-
def get_extra_options(self) -> str:
426-
return self._extra_options
427-
428404

429405
class BatchIngestionJob(SparkJob):
430406
"""

sdk/python/feast/pyspark/launcher.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH,
1717
CONFIG_SPARK_EMR_LOG_LOCATION,
1818
CONFIG_SPARK_EMR_REGION,
19-
CONFIG_SPARK_EXTRA_OPTIONS,
2019
CONFIG_SPARK_HOME,
2120
CONFIG_SPARK_INGESTION_JOB_JAR,
2221
CONFIG_SPARK_LAUNCHER,
@@ -192,7 +191,6 @@ def start_historical_feature_retrieval_job(
192191
for feature_table in feature_tables
193192
],
194193
destination={"format": output_format, "path": output_path},
195-
extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS),
196194
)
197195
)
198196

@@ -256,7 +254,6 @@ def start_offline_to_online_ingestion(
256254
redis_host=client._config.get(CONFIG_REDIS_HOST),
257255
redis_port=client._config.getint(CONFIG_REDIS_PORT),
258256
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
259-
extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS),
260257
)
261258
)
262259

@@ -277,7 +274,6 @@ def start_stream_to_online_ingestion(
277274
redis_host=client._config.get(CONFIG_REDIS_HOST),
278275
redis_port=client._config.getint(CONFIG_REDIS_PORT),
279276
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
280-
extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS),
281277
)
282278
)
283279

0 commit comments

Comments
 (0)