Skip to content

Commit 92800d9

Browse files
authored
Add spark k8s operator launcher (#1225)
* support spark k8s operator launcher Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> * make bq package path constant Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> * address more comments Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
1 parent d1b1aeb commit 92800d9

File tree

13 files changed

+725
-33
lines changed

13 files changed

+725
-33
lines changed

sdk/python/feast/constants.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ class ConfigOptions(metaclass=ConfigMeta):
180180
#: No. of executor memory for Dataproc cluster
181181
DATAPROC_EXECUTOR_MEMORY = "2g"
182182

183+
# namespace to use for Spark jobs launched using k8s spark operator
184+
SPARK_K8S_NAMESPACE = "default"
185+
186+
# expect k8s spark operator to be running in the same cluster as Feast
187+
SPARK_K8S_USE_INCLUSTER_CONFIG = True
188+
189+
# SparkApplication resource template
190+
SPARK_K8S_JOB_TEMPLATE_PATH = None
191+
183192
#: File format of historical retrieval features
184193
HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet"
185194

sdk/python/feast/pyspark/abc.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import hashlib
33
import json
44
import os
5+
from base64 import b64encode
56
from datetime import datetime
67
from enum import Enum
78
from typing import Dict, List, Optional
@@ -15,6 +16,9 @@ class SparkJobFailure(Exception):
1516
pass
1617

1718

19+
BQ_SPARK_PACKAGE = "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0"
20+
21+
1822
class SparkJobStatus(Enum):
1923
STARTING = 0
2024
IN_PROGRESS = 1
@@ -243,15 +247,18 @@ def get_main_file_path(self) -> str:
243247
)
244248

245249
def get_arguments(self) -> List[str]:
250+
def json_b64_encode(obj) -> str:
251+
return b64encode(json.dumps(obj).encode("utf8")).decode("ascii")
252+
246253
return [
247254
"--feature-tables",
248-
json.dumps(self._feature_tables),
255+
json_b64_encode(self._feature_tables),
249256
"--feature-tables-sources",
250-
json.dumps(self._feature_tables_sources),
257+
json_b64_encode(self._feature_tables_sources),
251258
"--entity-source",
252-
json.dumps(self._entity_source),
259+
json_b64_encode(self._entity_source),
253260
"--destination",
254-
json.dumps(self._destination),
261+
json_b64_encode(self._destination),
255262
]
256263

257264
def get_destination_path(self) -> str:

sdk/python/feast/pyspark/historical_feature_retrieval_job.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import abc
22
import argparse
33
import json
4+
from base64 import b64decode
45
from datetime import timedelta
56
from typing import Any, Dict, List, NamedTuple, Optional
67

@@ -794,13 +795,17 @@ def _feature_table_from_dict(dct: Dict[str, Any]) -> FeatureTable:
794795
)
795796

796797

798+
def json_b64_decode(s: str) -> Any:
799+
return json.loads(b64decode(s.encode("ascii")))
800+
801+
797802
if __name__ == "__main__":
798803
spark = SparkSession.builder.getOrCreate()
799804
args = _get_args()
800-
feature_tables_conf = json.loads(args.feature_tables)
801-
feature_tables_sources_conf = json.loads(args.feature_tables_sources)
802-
entity_source_conf = json.loads(args.entity_source)
803-
destination_conf = json.loads(args.destination)
805+
feature_tables_conf = json_b64_decode(args.feature_tables)
806+
feature_tables_sources_conf = json_b64_decode(args.feature_tables_sources)
807+
entity_source_conf = json_b64_decode(args.entity_source)
808+
destination_conf = json_b64_decode(args.destination)
804809
start_job(
805810
spark,
806811
entity_source_conf,

sdk/python/feast/pyspark/launcher.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,22 @@ def _get_optional(option):
6565
)
6666

6767

68+
def _k8s_launcher(config: Config) -> JobLauncher:
69+
from feast.pyspark.launchers import k8s
70+
71+
return k8s.KubernetesJobLauncher(
72+
namespace=config.get(opt.SPARK_K8S_NAMESPACE),
73+
resource_template_path=config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH, None),
74+
staging_location=config.get(opt.SPARK_STAGING_LOCATION),
75+
incluster=config.getboolean(opt.SPARK_K8S_USE_INCLUSTER_CONFIG),
76+
)
77+
78+
6879
_launchers = {
6980
"standalone": _standalone_launcher,
7081
"dataproc": _dataproc_launcher,
7182
"emr": _emr_launcher,
83+
"k8s": _k8s_launcher,
7284
}
7385

7486

sdk/python/feast/pyspark/launchers/aws/emr_utils.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
import yaml
1010

11+
from feast.pyspark.abc import BQ_SPARK_PACKAGE
12+
1113
__all__ = [
1214
"FAILED_STEP_STATES",
1315
"HISTORICAL_RETRIEVAL_JOB_TYPE",
@@ -107,7 +109,7 @@ def _sync_offline_to_online_step(
107109
"--class",
108110
"feast.ingestion.IngestionJob",
109111
"--packages",
110-
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0",
112+
BQ_SPARK_PACKAGE,
111113
jar_path,
112114
]
113115
+ args,
@@ -330,11 +332,7 @@ def _stream_ingestion_step(
330332
],
331333
"Args": ["spark-submit", "--class", "feast.ingestion.IngestionJob"]
332334
+ jars_args
333-
+ [
334-
"--packages",
335-
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0",
336-
jar_path,
337-
]
335+
+ ["--packages", BQ_SPARK_PACKAGE, jar_path]
338336
+ args,
339337
"Jar": "command-runner.jar",
340338
},

sdk/python/feast/pyspark/launchers/gcloud/dataproc.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import os
32
import time
43
import uuid
@@ -265,7 +264,7 @@ def _stage_file(self, file_path: str, job_id: str) -> str:
265264
return blob_uri_str
266265

267266
def dataproc_submit(
268-
self, job_params: SparkJobParameters
267+
self, job_params: SparkJobParameters, extra_properties: Dict[str, str]
269268
) -> Tuple[Job, Callable[[], Job], Callable[[], None]]:
270269
local_job_id = str(uuid.uuid4())
271270
main_file_uri = self._stage_file(job_params.get_main_file_path(), local_job_id)
@@ -280,18 +279,22 @@ def dataproc_submit(
280279
job_config["labels"][self.JOB_HASH_LABEL_KEY] = job_params.get_job_hash()
281280

282281
if job_params.get_class_name():
282+
properties = {
283+
"spark.yarn.user.classpath.first": "true",
284+
"spark.executor.instances": self.executor_instances,
285+
"spark.executor.cores": self.executor_cores,
286+
"spark.executor.memory": self.executor_memory,
287+
}
288+
289+
properties.update(extra_properties)
290+
283291
job_config.update(
284292
{
285293
"spark_job": {
286294
"jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS,
287295
"main_class": job_params.get_class_name(),
288296
"args": job_params.get_arguments(),
289-
"properties": {
290-
"spark.yarn.user.classpath.first": "true",
291-
"spark.executor.instances": self.executor_instances,
292-
"spark.executor.cores": self.executor_cores,
293-
"spark.executor.memory": self.executor_memory,
294-
},
297+
"properties": properties,
295298
}
296299
}
297300
)
@@ -302,6 +305,7 @@ def dataproc_submit(
302305
"main_python_file_uri": main_file_uri,
303306
"jar_file_uris": self.EXTERNAL_JARS,
304307
"args": job_params.get_arguments(),
308+
"properties": extra_properties if extra_properties else {},
305309
}
306310
}
307311
)
@@ -332,21 +336,23 @@ def dataproc_cancel(self, job_id):
332336
def historical_feature_retrieval(
333337
self, job_params: RetrievalJobParameters
334338
) -> RetrievalJob:
335-
job, refresh_fn, cancel_fn = self.dataproc_submit(job_params)
339+
job, refresh_fn, cancel_fn = self.dataproc_submit(
340+
job_params, {"dev.feast.outputuri": job_params.get_destination_path()}
341+
)
336342
return DataprocRetrievalJob(
337343
job, refresh_fn, cancel_fn, job_params.get_destination_path()
338344
)
339345

340346
def offline_to_online_ingestion(
341347
self, ingestion_job_params: BatchIngestionJobParameters
342348
) -> BatchIngestionJob:
343-
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params)
349+
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {})
344350
return DataprocBatchIngestionJob(job, refresh_fn, cancel_fn)
345351

346352
def start_stream_to_online_ingestion(
347353
self, ingestion_job_params: StreamIngestionJobParameters
348354
) -> StreamIngestionJob:
349-
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params)
355+
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {})
350356
job_hash = ingestion_job_params.get_job_hash()
351357
return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash)
352358

@@ -368,7 +374,7 @@ def _dataproc_job_to_spark_job(self, job: Job) -> SparkJob:
368374
cancel_fn = partial(self.dataproc_cancel, job_id)
369375

370376
if job_type == SparkJobType.HISTORICAL_RETRIEVAL.name.lower():
371-
output_path = json.loads(job.pyspark_job.args[-1])["path"]
377+
output_path = job.pyspark_job.properties.get("dev.feast.outputuri")
372378
return DataprocRetrievalJob(job, refresh_fn, cancel_fn, output_path)
373379

374380
if job_type == SparkJobType.BATCH_INGESTION.name.lower():
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from .k8s import (
2+
KubernetesBatchIngestionJob,
3+
KubernetesJobLauncher,
4+
KubernetesRetrievalJob,
5+
KubernetesStreamIngestionJob,
6+
)
7+
8+
__all__ = [
9+
"KubernetesRetrievalJob",
10+
"KubernetesBatchIngestionJob",
11+
"KubernetesStreamIngestionJob",
12+
"KubernetesJobLauncher",
13+
]

0 commit comments

Comments
 (0)