@@ -200,7 +200,14 @@ class DataprocClusterLauncher(JobLauncher):
200200 JOB_HASH_LABEL_KEY = "feast_job_hash"
201201
202202 def __init__ (
203- self , cluster_name : str , staging_location : str , region : str , project_id : str ,
203+ self ,
204+ cluster_name : str ,
205+ staging_location : str ,
206+ region : str ,
207+ project_id : str ,
208+ executor_instances : str ,
209+ executor_cores : str ,
210+ executor_memory : str ,
204211 ):
205212 """
206213 Initialize a dataproc job controller client, used internally for job submission and result
@@ -213,8 +220,14 @@ def __init__(
213220 GCS directory for the storage of files generated by the launcher, such as the pyspark scripts.
214221 region (str):
215222 Dataproc cluster region.
216- project_id (str:
223+ project_id (str) :
217224 GCP project id for the dataproc cluster.
225+ executor_instances (str):
226+ Number of executor instances for dataproc job.
227+ executor_cores (str):
228+ Number of cores for dataproc job.
229+ executor_memory (str):
230+ Amount of memory for dataproc job.
218231 """
219232
220233 self .cluster_name = cluster_name
@@ -231,6 +244,9 @@ def __init__(
231244 self .job_client = JobControllerClient (
232245 client_options = {"api_endpoint" : f"{ region } -dataproc.googleapis.com:443" }
233246 )
247+ self .executor_instances = executor_instances
248+ self .executor_cores = executor_cores
249+ self .executor_memory = executor_memory
234250
235251 def _stage_file (self , file_path : str , job_id : str ) -> str :
236252 if not os .path .isfile (file_path ):
@@ -264,7 +280,12 @@ def dataproc_submit(
264280 "jar_file_uris" : [main_file_uri ] + self .EXTERNAL_JARS ,
265281 "main_class" : job_params .get_class_name (),
266282 "args" : job_params .get_arguments (),
267- "properties" : {"spark.yarn.user.classpath.first" : "true" },
283+ "properties" : {
284+ "spark.yarn.user.classpath.first" : "true" ,
285+ "spark.executor.instances" : self .executor_instances ,
286+ "spark.executor.cores" : self .executor_cores ,
287+ "spark.executor.memory" : self .executor_memory ,
288+ },
268289 }
269290 }
270291 )
0 commit comments