4141)
4242from feast_spark .constants import ConfigOptions as opt
4343from feast_spark .lock_manager import JobOperation , JobOperationLock
44- from feast_spark .metrics import (
45- job_schedule_count ,
46- job_submission_count ,
47- job_whitelist_failure_count ,
48- )
44+ from feast_spark .metrics import job_schedule_count , job_submission_count
4945from feast_spark .pyspark .abc import (
5046 BatchIngestionJob ,
5147 RetrievalJob ,
5248 SparkJob ,
5349 SparkJobStatus ,
54- SparkJobType ,
5550 StreamIngestionJob ,
5651)
5752from feast_spark .pyspark .launcher import (
@@ -114,7 +109,6 @@ def _job_to_proto(spark_job: SparkJob) -> JobProto:
114109class JobServiceServicer (JobService_pb2_grpc .JobServiceServicer ):
115110 def __init__ (self , client : Client ):
116111 self .client = client
117- self ._whitelisted_project_feature_table_pairs_cached : List [Tuple [str , str ]] = []
118112
119113 @property
120114 def _whitelisted_projects (self ) -> Optional [List [str ]]:
@@ -123,81 +117,20 @@ def _whitelisted_projects(self) -> Optional[List[str]]:
123117 return whitelisted_projects .split ("," )
124118 return None
125119
126- @property
127- def _whitelisted_project_feature_table_pairs (
128- self ,
129- ) -> Optional [List [Tuple [str , str ]]]:
130- if self ._whitelisted_project_feature_table_pairs_cached :
131- return self ._whitelisted_project_feature_table_pairs_cached
132-
133- if self .client .config .exists (opt .WHITELISTED_FEATURE_TABLES_PATH ):
134- _whitelisted_feature_tables = self .client .config .get (
135- opt .WHITELISTED_FEATURE_TABLES_PATH
136- )
137- with open (str (_whitelisted_feature_tables ), "r" ) as whitelist :
138- whitelist .seek (0 )
139- whitelisted_feature_tables = [
140- (line .strip ().split (":" )[0 ], line .strip ().split (":" )[- 1 ])
141- for line in whitelist .readlines ()
142- ]
143- self ._whitelisted_project_feature_table_pairs_cached = (
144- whitelisted_feature_tables
145- )
146- return whitelisted_feature_tables
147- return None
148-
149- @property
150- def _whitelisted_job_types (self ) -> Optional [List [str ]]:
151- if self .client .config .exists (opt .WHITELISTED_JOB_TYPES ):
152- whitelisted_job_types = self .client .config .get (opt .WHITELISTED_JOB_TYPES )
153- return whitelisted_job_types .split ("," )
154- return None
155-
156120 def is_whitelisted (self , project : str ):
157121 # Whitelisted projects not specified, allow all projects
158122 if not self ._whitelisted_projects :
159123 return True
160124 return project in self ._whitelisted_projects
161125
162- def is_feature_table_whitelisted (self , project : str , feature_table : str ):
163- if not self ._whitelisted_project_feature_table_pairs :
164- return True
165- return (project , feature_table ) in self ._whitelisted_project_feature_table_pairs
166-
167- def is_job_type_whitelisted (self , job_type : SparkJobType ):
168- if not self ._whitelisted_job_types :
169- return True
170- return job_type .name in self ._whitelisted_job_types
171-
172126 def StartOfflineToOnlineIngestionJob (
173127 self , request : StartOfflineToOnlineIngestionJobRequest , context
174128 ):
175129 """Start job to ingest data from offline store into online store"""
176- if not self .is_job_type_whitelisted (SparkJobType .BATCH_INGESTION ):
177- raise ValueError (
178- "This job service is not configured to accept batch ingestion"
179- )
180-
181130 job_submission_count .labels (
182131 "batch_ingestion" , request .project , request .table_name
183132 ).inc ()
184133
185- if not self .is_whitelisted (request .project ):
186- job_whitelist_failure_count .labels (
187- request .project , request .table_name
188- ).inc ()
189- raise ValueError (
190- f"Project { request .project } is not whitelisted. Please contact your Feast administrator to whitelist it."
191- )
192-
193- if not self .is_feature_table_whitelisted (request .project , request .table_name ):
194- job_whitelist_failure_count .labels (
195- request .project , request .table_name
196- ).inc ()
197- raise ValueError (
198- f"Project { request .project } :{ request .table_name } is not whitelisted. Please contact your Feast administrator to whitelist it."
199- )
200-
201134 feature_table = self .client .feature_store .get_feature_table (
202135 request .table_name , request .project
203136 )
@@ -223,11 +156,6 @@ def ScheduleOfflineToOnlineIngestionJob(
223156 self , request : ScheduleOfflineToOnlineIngestionJobRequest , context
224157 ):
225158 """Schedule job to ingest data from offline store into online store periodically"""
226- if not self .is_job_type_whitelisted (SparkJobType .SCHEDULED_BATCH_INGESTION ):
227- raise ValueError (
228- "This job service is not configured to schedule batch ingestion"
229- )
230-
231159 job_schedule_count .labels (request .project , request .table_name ).inc ()
232160 feature_table = self .client .feature_store .get_feature_table (
233161 request .table_name , request .project
@@ -245,10 +173,6 @@ def ScheduleOfflineToOnlineIngestionJob(
245173 def UnscheduleOfflineToOnlineIngestionJob (
246174 self , request : UnscheduleOfflineToOnlineIngestionJobRequest , context
247175 ):
248- if not self .is_job_type_whitelisted (SparkJobType .SCHEDULED_BATCH_INGESTION ):
249- raise ValueError (
250- "This job service is not configured to unschedule ingestion job"
251- )
252176 feature_table = self .client .feature_store .get_feature_table (
253177 request .table_name , request .project
254178 )
@@ -259,18 +183,8 @@ def UnscheduleOfflineToOnlineIngestionJob(
259183
260184 def GetHistoricalFeatures (self , request : GetHistoricalFeaturesRequest , context ):
261185 """Produce a training dataset, return a job id that will provide a file reference"""
262- if not self .is_job_type_whitelisted (SparkJobType .HISTORICAL_RETRIEVAL ):
263- raise ValueError (
264- "This job service is not configured to accept historical retrieval job"
265- )
266-
267186 job_submission_count .labels ("historical_retrieval" , request .project , "" ).inc ()
268187
269- if not self .is_whitelisted (request .project ):
270- raise ValueError (
271- f"Project { request .project } is not whitelisted. Please contact your Feast administrator to whitelist it."
272- )
273-
274188 job = start_historical_feature_retrieval_job (
275189 client = self .client ,
276190 project = request .project ,
@@ -297,11 +211,6 @@ def StartStreamToOnlineIngestionJob(
297211 self , request : StartStreamToOnlineIngestionJobRequest , context
298212 ):
299213 """Start job to ingest data from stream into online store"""
300- if not self .is_job_type_whitelisted (SparkJobType .STREAM_INGESTION ):
301- raise ValueError (
302- "This job service is not configured to start streaming job"
303- )
304-
305214 job_submission_count .labels (
306215 "streaming" , request .project , request .table_name
307216 ).inc ()
@@ -356,11 +265,6 @@ def StartStreamToOnlineIngestionJob(
356265 def ListJobs (self , request , context ):
357266 """List all types of jobs"""
358267
359- if not self .is_whitelisted (request .project ):
360- raise ValueError (
361- f"Project { request .project } is not whitelisted. Please contact your Feast administrator to whitelist it."
362- )
363-
364268 jobs = list_jobs (
365269 include_terminated = request .include_terminated ,
366270 project = request .project ,
0 commit comments