5151)
5252
5353
54- def _load_resource_template (job_template_path : Path ) -> Dict [str , Any ]:
54+ def _load_resource_template (job_template_path : Optional [str ]) -> Dict [str , Any ]:
55+ if not job_template_path or not Path (job_template_path ).exists ():
56+ return {}
57+
5558 with open (job_template_path , "rt" ) as f :
5659 return yaml .safe_load (f )
5760
@@ -189,7 +192,10 @@ def __init__(
189192 namespace : str ,
190193 incluster : bool ,
191194 staging_location : str ,
192- resource_template_path : Optional [Path ],
195+ generic_resource_template_path : Optional [str ],
196+ batch_ingestion_resource_template_path : Optional [str ],
197+ stream_ingestion_resource_template_path : Optional [str ],
198+ historical_retrieval_resource_template_path : Optional [str ],
193199 staging_client : AbstractStagingClient ,
194200 azure_account_name : str ,
195201 azure_account_key : str ,
@@ -200,10 +206,26 @@ def __init__(
200206 self ._staging_client = staging_client
201207 self ._azure_account_name = azure_account_name
202208 self ._azure_account_key = azure_account_key
203- if resource_template_path is not None :
204- self ._resource_template = _load_resource_template (resource_template_path )
205- else :
206- self ._resource_template = yaml .safe_load (DEFAULT_JOB_TEMPLATE )
209+
210+ generic_template = _load_resource_template (
211+ generic_resource_template_path
212+ ) or yaml .safe_load (DEFAULT_JOB_TEMPLATE )
213+
214+ self ._batch_ingestion_template = (
215+ _load_resource_template (batch_ingestion_resource_template_path )
216+ or generic_template
217+ )
218+
219+ self ._stream_ingestion_template = (
220+ _load_resource_template (stream_ingestion_resource_template_path )
221+ or generic_template
222+ )
223+
224+ self ._historical_retrieval_template = (
225+ _load_resource_template (historical_retrieval_resource_template_path )
226+ or generic_template
227+ )
228+
207229 self ._scheduled_resource_template = yaml .safe_load (
208230 DEFAULT_SCHEDULED_JOB_TEMPLATE
209231 )
@@ -281,7 +303,7 @@ def historical_feature_retrieval(
281303 job_id = _generate_job_id ()
282304
283305 resource = _prepare_job_resource (
284- job_template = self ._resource_template ,
306+ job_template = self ._historical_retrieval_template ,
285307 job_id = job_id ,
286308 job_type = HISTORICAL_RETRIEVAL_JOB_TYPE ,
287309 main_application_file = pyspark_script_path ,
@@ -341,7 +363,7 @@ def offline_to_online_ingestion(
341363 job_id = _generate_job_id ()
342364
343365 resource = _prepare_job_resource (
344- job_template = self ._resource_template ,
366+ job_template = self ._batch_ingestion_template ,
345367 job_id = job_id ,
346368 job_type = OFFLINE_TO_ONLINE_JOB_TYPE ,
347369 main_application_file = jar_s3_path ,
@@ -394,7 +416,7 @@ def schedule_offline_to_online_ingestion(
394416 scheduled_job_template = self ._scheduled_resource_template ,
395417 scheduled_job_id = schedule_job_id ,
396418 job_schedule = ingestion_job_params .get_job_schedule (),
397- job_template = self ._resource_template ,
419+ job_template = self ._batch_ingestion_template ,
398420 job_type = OFFLINE_TO_ONLINE_JOB_TYPE ,
399421 main_application_file = jar_s3_path ,
400422 main_class = ingestion_job_params .get_class_name (),
@@ -454,7 +476,7 @@ def start_stream_to_online_ingestion(
454476 job_id = _generate_job_id ()
455477
456478 resource = _prepare_job_resource (
457- job_template = self ._resource_template ,
479+ job_template = self ._stream_ingestion_template ,
458480 job_id = job_id ,
459481 job_type = STREAM_TO_ONLINE_JOB_TYPE ,
460482 main_application_file = jar_s3_path ,
0 commit comments