Skip to content

Commit 00f8a47

Browse files
committed
specialized job templates for spark kubernetes jobs
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent e55a145 commit 00f8a47

File tree

6 files changed

+67
-11
lines changed

6 files changed

+67
-11
lines changed

infra/charts/feast-spark/charts/feast-jobservice/templates/configmap.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,10 @@ metadata:
1313
data:
1414
jobTemplate.yaml: |
1515
{{- toYaml .Values.sparkOperator.jobTemplate | nindent 4 }}
16+
batchJobTemplate.yaml: |
17+
{{- toYaml .Values.sparkOperator.batchJobTemplate | nindent 4 }}
18+
streamJobTemplate.yaml: |
19+
{{- toYaml .Values.sparkOperator.streamJobTemplate | nindent 4 }}
20+
historicalJobTemplate.yaml: |
21+
{{- toYaml .Values.sparkOperator.historicalJobTemplate | nindent 4 }}
1622
{{- end }}

infra/charts/feast-spark/charts/feast-jobservice/templates/deployment.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ spec:
7676
{{- if .Values.sparkOperator.enabled }}
7777
- name: FEAST_SPARK_K8S_JOB_TEMPLATE_PATH
7878
value: /etc/configs/jobTemplate.yaml
79+
- name: SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH
80+
value: /etc/configs/batchJobTemplate.yaml
81+
- name: SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH
82+
value: /etc/configs/streamJobTemplate.yaml
83+
- name: SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH
84+
value: /etc/configs/historicalJobTemplate.yaml
7985
{{- end }}
8086
{{- range $key, $value := .Values.envOverrides }}
8187
- name: {{ printf "%s" $key | replace "." "_" | upper | quote }}

infra/charts/feast-spark/charts/feast-jobservice/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ sparkOperator:
2626
enabled: false
2727
# sparkOperator.jobTemplate -- Content of the job template, in yaml format
2828
jobTemplate: {}
29+
# specialized job templates by job types
30+
batchJobTemplate: {}
31+
streamJobTemplate: {}
32+
historicalJobTemplate: {}
2933

3034
prometheus:
3135
# prometheus.enabled -- Flag to enable scraping of metrics

python/feast_spark/constants.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,15 @@ class ConfigOptions(metaclass=ConfigMeta):
9393
# SparkApplication resource template
9494
SPARK_K8S_JOB_TEMPLATE_PATH = None
9595

96+
# SparkApplication resource template for Batch Ingestion Jobs
97+
SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH: Optional[str] = ""
98+
99+
# SparkApplication resource template for Stream Ingestion Jobs
100+
SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH: Optional[str] = ""
101+
102+
# SparkApplication resource template for Historical Retrieval Jobs
103+
SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH: Optional[str] = ""
104+
96105
#: File format of historical retrieval features
97106
HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet"
98107

python/feast_spark/pyspark/launcher.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,16 @@ def _k8s_launcher(config: Config) -> JobLauncher:
7373

7474
return k8s.KubernetesJobLauncher(
7575
namespace=config.get(opt.SPARK_K8S_NAMESPACE),
76-
resource_template_path=config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH, None),
76+
generic_resource_template_path=config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH),
77+
batch_ingestion_resource_template_path=config.get(
78+
opt.SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH, None
79+
),
80+
stream_ingestion_resource_template_path=config.get(
81+
opt.SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH, None
82+
),
83+
historical_retrieval_resource_template_path=config.get(
84+
opt.SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH, None
85+
),
7786
staging_location=staging_location,
7887
incluster=config.getboolean(opt.SPARK_K8S_USE_INCLUSTER_CONFIG),
7988
staging_client=get_staging_client(staging_uri.scheme, config),

python/feast_spark/pyspark/launchers/k8s/k8s.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@
5151
)
5252

5353

54-
def _load_resource_template(job_template_path: Path) -> Dict[str, Any]:
54+
def _load_resource_template(job_template_path: Optional[str]) -> Dict[str, Any]:
55+
if not job_template_path or not Path(job_template_path).exists():
56+
return {}
57+
5558
with open(job_template_path, "rt") as f:
5659
return yaml.safe_load(f)
5760

@@ -189,7 +192,10 @@ def __init__(
189192
namespace: str,
190193
incluster: bool,
191194
staging_location: str,
192-
resource_template_path: Optional[Path],
195+
generic_resource_template_path: Optional[str],
196+
batch_ingestion_resource_template_path: Optional[str],
197+
stream_ingestion_resource_template_path: Optional[str],
198+
historical_retrieval_resource_template_path: Optional[str],
193199
staging_client: AbstractStagingClient,
194200
azure_account_name: str,
195201
azure_account_key: str,
@@ -200,10 +206,26 @@ def __init__(
200206
self._staging_client = staging_client
201207
self._azure_account_name = azure_account_name
202208
self._azure_account_key = azure_account_key
203-
if resource_template_path is not None:
204-
self._resource_template = _load_resource_template(resource_template_path)
205-
else:
206-
self._resource_template = yaml.safe_load(DEFAULT_JOB_TEMPLATE)
209+
210+
generic_template = _load_resource_template(
211+
generic_resource_template_path
212+
) or yaml.safe_load(DEFAULT_JOB_TEMPLATE)
213+
214+
self._batch_ingestion_template = (
215+
_load_resource_template(batch_ingestion_resource_template_path)
216+
or generic_template
217+
)
218+
219+
self._stream_ingestion_template = (
220+
_load_resource_template(stream_ingestion_resource_template_path)
221+
or generic_template
222+
)
223+
224+
self._historical_retrieval_template = (
225+
_load_resource_template(historical_retrieval_resource_template_path)
226+
or generic_template
227+
)
228+
207229
self._scheduled_resource_template = yaml.safe_load(
208230
DEFAULT_SCHEDULED_JOB_TEMPLATE
209231
)
@@ -281,7 +303,7 @@ def historical_feature_retrieval(
281303
job_id = _generate_job_id()
282304

283305
resource = _prepare_job_resource(
284-
job_template=self._resource_template,
306+
job_template=self._historical_retrieval_template,
285307
job_id=job_id,
286308
job_type=HISTORICAL_RETRIEVAL_JOB_TYPE,
287309
main_application_file=pyspark_script_path,
@@ -341,7 +363,7 @@ def offline_to_online_ingestion(
341363
job_id = _generate_job_id()
342364

343365
resource = _prepare_job_resource(
344-
job_template=self._resource_template,
366+
job_template=self._batch_ingestion_template,
345367
job_id=job_id,
346368
job_type=OFFLINE_TO_ONLINE_JOB_TYPE,
347369
main_application_file=jar_s3_path,
@@ -394,7 +416,7 @@ def schedule_offline_to_online_ingestion(
394416
scheduled_job_template=self._scheduled_resource_template,
395417
scheduled_job_id=schedule_job_id,
396418
job_schedule=ingestion_job_params.get_job_schedule(),
397-
job_template=self._resource_template,
419+
job_template=self._batch_ingestion_template,
398420
job_type=OFFLINE_TO_ONLINE_JOB_TYPE,
399421
main_application_file=jar_s3_path,
400422
main_class=ingestion_job_params.get_class_name(),
@@ -454,7 +476,7 @@ def start_stream_to_online_ingestion(
454476
job_id = _generate_job_id()
455477

456478
resource = _prepare_job_resource(
457-
job_template=self._resource_template,
479+
job_template=self._stream_ingestion_template,
458480
job_id=job_id,
459481
job_type=STREAM_TO_ONLINE_JOB_TYPE,
460482
main_application_file=jar_s3_path,

0 commit comments

Comments
 (0)