File tree Expand file tree Collapse file tree 3 files changed +8
-0
lines changed
python/feast_spark/pyspark Expand file tree Collapse file tree 3 files changed +8
-0
lines changed Original file line number Diff line number Diff line change @@ -146,6 +146,7 @@ def get_arguments(self) -> List[str]:
146146class RetrievalJobParameters (SparkJobParameters ):
147147 def __init__ (
148148 self ,
149+ project : str ,
149150 feature_tables : List [Dict ],
150151 feature_tables_sources : List [Dict ],
151152 entity_source : Dict ,
@@ -155,6 +156,7 @@ def __init__(
155156 ):
156157 """
157158 Args:
159+ project (str): Client project
158160 entity_source (Dict): Entity data source configuration.
159161 feature_tables_sources (List[Dict]): List of feature tables data sources configurations.
160162 feature_tables (List[Dict]): List of feature table specification.
@@ -261,13 +263,17 @@ def __init__(
261263 }
262264
263265 """
266+ self ._project = project
264267 self ._feature_tables = feature_tables
265268 self ._feature_tables_sources = feature_tables_sources
266269 self ._entity_source = entity_source
267270 self ._destination = destination
268271 self ._extra_packages = extra_packages if extra_packages else []
269272 self ._checkpoint_path = checkpoint_path
270273
274+ def get_project (self ) -> str :
275+ return self ._project
276+
271277 def get_name (self ) -> str :
272278 all_feature_tables_names = [ft ["name" ] for ft in self ._feature_tables ]
273279 return f"{ self .get_job_type ().to_pascal_case ()} -{ '-' .join (all_feature_tables_names )} "
Original file line number Diff line number Diff line change @@ -233,6 +233,7 @@ def start_historical_feature_retrieval_job(
233233
234234 return launcher .historical_feature_retrieval (
235235 RetrievalJobParameters (
236+ project = project ,
236237 entity_source = _source_to_argument (entity_source , client .config ),
237238 feature_tables_sources = feature_sources ,
238239 feature_tables = [
Original file line number Diff line number Diff line change @@ -314,6 +314,7 @@ def historical_feature_retrieval(
314314 azure_credentials = self ._get_azure_credentials (),
315315 arguments = job_params .get_arguments (),
316316 namespace = self ._namespace ,
317+ extra_labels = {LABEL_PROJECT : job_params .get_project ()},
317318 )
318319
319320 job_info = _submit_job (
You can’t perform that action at this time.
0 commit comments