diff --git a/python/feast_spark/job_service.py b/python/feast_spark/job_service.py index b181a909..3ba6ba4c 100644 --- a/python/feast_spark/job_service.py +++ b/python/feast_spark/job_service.py @@ -60,6 +60,7 @@ start_stream_to_online_ingestion, unschedule_offline_to_online_ingestion, ) +from feast_spark.pyspark.launchers.k8s.k8s import JobNotFoundException from feast_spark.third_party.grpc.health.v1.HealthService_pb2 import ( HealthCheckResponse, ServingStatus, @@ -437,9 +438,16 @@ def ensure_stream_ingestion_jobs(client: Client, all_projects: bool): opt.JOB_SERVICE_RETRY_FAILED_JOBS ) ): + status = None + try: + status = job.get_status() + except JobNotFoundException: + logger.warning(f"{job.get_id()} was already removed") + if ( isinstance(job, StreamIngestionJob) - and job.get_status() != SparkJobStatus.COMPLETED + and status is not None + and status != SparkJobStatus.COMPLETED ): jobs_by_hash[job.get_hash()] = job diff --git a/python/feast_spark/pyspark/launchers/k8s/k8s.py b/python/feast_spark/pyspark/launchers/k8s/k8s.py index dbbd6638..2488992f 100644 --- a/python/feast_spark/pyspark/launchers/k8s/k8s.py +++ b/python/feast_spark/pyspark/launchers/k8s/k8s.py @@ -74,6 +74,10 @@ def _truncate_label(label: str) -> str: return label[:63] +class JobNotFoundException(Exception): + pass + + class KubernetesJobMixin: def __init__(self, api: CustomObjectsApi, namespace: str, job_id: str): self._api = api @@ -85,17 +89,20 @@ def get_id(self) -> str: def get_error_message(self) -> str: job = _get_job_by_id(self._api, self._namespace, self._job_id) - assert job is not None + if job is None: + raise JobNotFoundException() return job.job_error_message def get_status(self) -> SparkJobStatus: job = _get_job_by_id(self._api, self._namespace, self._job_id) - assert job is not None + if job is None: + raise JobNotFoundException return job.state def get_start_time(self) -> datetime: job = _get_job_by_id(self._api, self._namespace, self._job_id) - assert job is not None + if job is None: + raise JobNotFoundException return job.start_time def cancel(self):