Skip to content

Commit 4d67fbb

Browse files
authored
Add dataproc executor resource config (#1160)
* Add dataproc executor resource config Signed-off-by: Terence <terencelimxp@gmail.com> * Add default spark job executor values Signed-off-by: Terence <terencelimxp@gmail.com> * Fix e2e tests Signed-off-by: Terence <terencelimxp@gmail.com> * Shift spark configurations Signed-off-by: Terence <terencelimxp@gmail.com> * Update constants and docstrings Signed-off-by: Terence <terencelimxp@gmail.com>
1 parent 78cbd13 commit 4d67fbb

File tree

7 files changed

+61
-7
lines changed

7 files changed

+61
-7
lines changed

sdk/python/feast/constants.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,15 @@ class ConfigOptions(metaclass=ConfigMeta):
160160
#: Region of Dataproc cluster
161161
DATAPROC_REGION: Optional[str] = None
162162

163+
#: No. of executor instances for Dataproc cluster
164+
DATAPROC_EXECUTOR_INSTANCES = "2"
165+
166+
#: No. of executor cores for Dataproc cluster
167+
DATAPROC_EXECUTOR_CORES = "2"
168+
169+
#: No. of executor memory for Dataproc cluster
170+
DATAPROC_EXECUTOR_MEMORY = "2g"
171+
163172
#: File format of historical retrieval features
164173
HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet"
165174

sdk/python/feast/pyspark/launcher.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@ def _dataproc_launcher(config: Config) -> JobLauncher:
3434
from feast.pyspark.launchers import gcloud
3535

3636
return gcloud.DataprocClusterLauncher(
37-
config.get(opt.DATAPROC_CLUSTER_NAME),
38-
config.get(opt.SPARK_STAGING_LOCATION),
39-
config.get(opt.DATAPROC_REGION),
40-
config.get(opt.DATAPROC_PROJECT),
37+
cluster_name=config.get(opt.DATAPROC_CLUSTER_NAME),
38+
staging_location=config.get(opt.SPARK_STAGING_LOCATION),
39+
region=config.get(opt.DATAPROC_REGION),
40+
project_id=config.get(opt.DATAPROC_PROJECT),
41+
executor_instances=config.get(opt.DATAPROC_EXECUTOR_INSTANCES),
42+
executor_cores=config.get(opt.DATAPROC_EXECUTOR_CORES),
43+
executor_memory=config.get(opt.DATAPROC_EXECUTOR_MEMORY),
4144
)
4245

4346

sdk/python/feast/pyspark/launchers/gcloud/dataproc.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,14 @@ class DataprocClusterLauncher(JobLauncher):
200200
JOB_HASH_LABEL_KEY = "feast_job_hash"
201201

202202
def __init__(
203-
self, cluster_name: str, staging_location: str, region: str, project_id: str,
203+
self,
204+
cluster_name: str,
205+
staging_location: str,
206+
region: str,
207+
project_id: str,
208+
executor_instances: str,
209+
executor_cores: str,
210+
executor_memory: str,
204211
):
205212
"""
206213
Initialize a dataproc job controller client, used internally for job submission and result
@@ -213,8 +220,14 @@ def __init__(
213220
GCS directory for the storage of files generated by the launcher, such as the pyspark scripts.
214221
region (str):
215222
Dataproc cluster region.
216-
project_id (str:
223+
project_id (str):
217224
GCP project id for the dataproc cluster.
225+
executor_instances (str):
226+
Number of executor instances for dataproc job.
227+
executor_cores (str):
228+
Number of cores for dataproc job.
229+
executor_memory (str):
230+
Amount of memory for dataproc job.
218231
"""
219232

220233
self.cluster_name = cluster_name
@@ -231,6 +244,9 @@ def __init__(
231244
self.job_client = JobControllerClient(
232245
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
233246
)
247+
self.executor_instances = executor_instances
248+
self.executor_cores = executor_cores
249+
self.executor_memory = executor_memory
234250

235251
def _stage_file(self, file_path: str, job_id: str) -> str:
236252
if not os.path.isfile(file_path):
@@ -264,7 +280,12 @@ def dataproc_submit(
264280
"jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS,
265281
"main_class": job_params.get_class_name(),
266282
"args": job_params.get_arguments(),
267-
"properties": {"spark.yarn.user.classpath.first": "true"},
283+
"properties": {
284+
"spark.yarn.user.classpath.first": "true",
285+
"spark.executor.instances": self.executor_instances,
286+
"spark.executor.cores": self.executor_cores,
287+
"spark.executor.memory": self.executor_memory,
288+
},
268289
}
269290
}
270291
)

tests/e2e/conftest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ def pytest_addoption(parser):
1717
parser.addoption("--emr-cluster-id", action="store")
1818
parser.addoption("--emr-region", action="store")
1919
parser.addoption("--dataproc-project", action="store")
20+
parser.addoption("--dataproc-executor-instances", action="store", default="2")
21+
parser.addoption("--dataproc-executor-cores", action="store", default="2")
22+
parser.addoption("--dataproc-executor-memory", action="store", default="2g")
2023
parser.addoption("--ingestion-jar", action="store")
2124
parser.addoption("--redis-url", action="store", default="localhost:6379")
2225
parser.addoption("--redis-cluster", action="store_true")

tests/e2e/fixtures/feast_services.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,15 @@ def feast_jobservice(
187187
)
188188
env["FEAST_DATAPROC_PROJECT"] = pytestconfig.getoption("dataproc_project")
189189
env["FEAST_DATAPROC_REGION"] = pytestconfig.getoption("dataproc_region")
190+
env["FEAST_DATAPROC_EXECUTOR_INSTANCES"] = pytestconfig.getoption(
191+
"dataproc_executor_instances"
192+
)
193+
env["FEAST_DATAPROC_EXECUTOR_CORES"] = pytestconfig.getoption(
194+
"dataproc_executor_cores"
195+
)
196+
env["FEAST_DATAPROC_EXECUTOR_MEMORY"] = pytestconfig.getoption(
197+
"dataproc_executor_memory"
198+
)
190199
env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join(
191200
global_staging_path, "dataproc"
192201
)

tests/integration/conftest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,8 @@ def pytest_addoption(parser):
33
parser.addoption("--dataproc-region", action="store")
44
parser.addoption("--dataproc-project", action="store")
55
parser.addoption("--dataproc-staging-location", action="store")
6+
parser.addoption("--dataproc-executor-instances", action="store", default="2")
7+
parser.addoption("--dataproc-executor-cores", action="store", default="2")
8+
parser.addoption("--dataproc-executor-memory", action="store", default="2g")
69
parser.addoption("--redis-url", action="store")
710
parser.addoption("--redis-cluster", action="store_true")

tests/integration/fixtures/launchers.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,15 @@ def dataproc_launcher(pytestconfig) -> DataprocClusterLauncher:
99
region = pytestconfig.getoption("--dataproc-region")
1010
project_id = pytestconfig.getoption("--dataproc-project")
1111
staging_location = pytestconfig.getoption("--dataproc-staging-location")
12+
executor_instances = pytestconfig.getoption("dataproc_executor_instances")
13+
executor_cores = pytestconfig.getoption("dataproc_executor_cores")
14+
executor_memory = pytestconfig.getoption("dataproc_executor_memory")
1215
return DataprocClusterLauncher(
1316
cluster_name=cluster_name,
1417
staging_location=staging_location,
1518
region=region,
1619
project_id=project_id,
20+
executor_instances=executor_instances,
21+
executor_cores=executor_cores,
22+
executor_memory=executor_memory,
1723
)

0 commit comments

Comments
 (0)