|
9 | 9 | import yaml |
10 | 10 | from kubernetes.client.api import CustomObjectsApi |
11 | 11 |
|
| 12 | +from feast.config import Config |
| 13 | +from feast.constants import ConfigOptions as opt |
12 | 14 | from feast.pyspark.abc import ( |
13 | 15 | BQ_SPARK_PACKAGE, |
14 | 16 | BatchIngestionJob, |
@@ -140,16 +142,13 @@ class KubernetesJobLauncher(JobLauncher): |
140 | 142 | Submits spark jobs to a spark cluster. Currently supports only historical feature retrieval jobs. |
141 | 143 | """ |
142 | 144 |
|
143 | | - def __init__( |
144 | | - self, |
145 | | - namespace: str, |
146 | | - incluster: bool, |
147 | | - staging_location: str, |
148 | | - resource_template_path: Optional[Path], |
149 | | - ): |
150 | | - self._namespace = namespace |
| 145 | + def __init__(self, config: Config): |
| 146 | + self._config = config |
| 147 | + self._namespace = config.get(opt.SPARK_K8S_NAMESPACE) |
| 148 | + incluster = config.getboolean(opt.SPARK_K8S_USE_INCLUSTER_CONFIG) |
151 | 149 | self._api = _get_api(incluster=incluster) |
152 | | - self._staging_location = staging_location |
| 150 | + self._staging_location = config.get(opt.SPARK_STAGING_LOCATION) |
| 151 | + resource_template_path = config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH, None) |
153 | 152 | if resource_template_path is not None: |
154 | 153 | self._resource_template = _load_resource_template(resource_template_path) |
155 | 154 | else: |
@@ -183,7 +182,7 @@ def _job_from_job_info(self, job_info: JobInfo) -> SparkJob: |
183 | 182 |
|
184 | 183 | def _get_staging_client(self): |
185 | 184 | uri = urlparse(self._staging_location) |
186 | | - return get_staging_client(uri.scheme) |
| 185 | + return get_staging_client(uri.scheme, self._config) |
187 | 186 |
|
188 | 187 | def historical_feature_retrieval( |
189 | 188 | self, job_params: RetrievalJobParameters |
|
0 commit comments