From ae5715a72c1218b3d27dcb00a093f6df16edbaf7 Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Mon, 23 May 2022 16:00:41 +0800 Subject: [PATCH] Prevent crashing when job was deleted externally (#147) Signed-off-by: shuheng Co-authored-by: shuheng --- python/feast_spark/client.py | 49 ++++++++++++++++--- python/feast_spark/job_service.py | 10 +++- .../feast_spark/pyspark/launchers/k8s/k8s.py | 13 +++-- 3 files changed, 61 insertions(+), 11 deletions(-) diff --git a/python/feast_spark/client.py b/python/feast_spark/client.py index 8491cc6f..115734cf 100644 --- a/python/feast_spark/client.py +++ b/python/feast_spark/client.py @@ -1,24 +1,21 @@ import configparser import os import uuid -from datetime import datetime +from datetime import datetime, timedelta from itertools import groupby from typing import Dict, List, Optional, Union, cast import pandas as pd import redis from croniter import croniter +from google.cloud import bigquery import feast from feast.config import Config from feast.constants import ConfigOptions as feast_opt from feast.data_source import BigQuerySource, FileSource from feast.grpc.grpc import create_grpc_channel -from feast.staging.entities import ( - stage_entities_to_bq, - stage_entities_to_fs, - table_reference_from_string, -) +from feast.staging.entities import stage_entities_to_fs, table_reference_from_string from feast_spark.api.JobService_pb2 import ( GetHealthMetricsRequest, GetHistoricalFeaturesRequest, @@ -51,6 +48,44 @@ ) +def stage_entities_to_bq_with_partition( + entity_source: pd.DataFrame, project: str, dataset: str +) -> BigQuerySource: + """ + Stores given (entity) dataframe as new table in BQ. Name of the table generated based on current time. + Table will expire in 1 day. + Returns BigQuerySource with reference to created table. + """ + + bq_client: bigquery.Client = bigquery.Client() + destination = bigquery.TableReference( + bigquery.DatasetReference(project, dataset), + f"_entities_{datetime.now():%Y%m%d%H%M%s}", + ) + + # prevent casting ns -> ms exception inside pyarrow + entity_source["event_timestamp"] = entity_source["event_timestamp"].dt.floor("ms") + + load_job_config = bigquery.LoadJobConfig( + time_partitioning=bigquery.TimePartitioning( + type_=bigquery.TimePartitioningType.DAY, field="event_timestamp", + ) + ) + load_job: bigquery.LoadJob = bq_client.load_table_from_dataframe( + entity_source, destination, job_config=load_job_config, + ) + load_job.result() # wait until complete + + dest_table: bigquery.Table = bq_client.get_table(destination) + dest_table.expires = datetime.now() + timedelta(days=1) + bq_client.update_table(dest_table, fields=["expires"]) + + return BigQuerySource( + event_timestamp_column="event_timestamp", + table_ref=f"{destination.project}:{destination.dataset_id}.{destination.table_id}", + ) + + class Client: _feast: feast.Client @@ -197,7 +232,7 @@ def get_historical_features( staging_bq_project = source_ref.project staging_bq_dataset = source_ref.dataset_id - entity_source = stage_entities_to_bq( + entity_source = stage_entities_to_bq_with_partition( entity_source, staging_bq_project, staging_bq_dataset ) else: diff --git a/python/feast_spark/job_service.py b/python/feast_spark/job_service.py index b181a909..3ba6ba4c 100644 --- a/python/feast_spark/job_service.py +++ b/python/feast_spark/job_service.py @@ -60,6 +60,7 @@ start_stream_to_online_ingestion, unschedule_offline_to_online_ingestion, ) +from feast_spark.pyspark.launchers.k8s.k8s import JobNotFoundException from feast_spark.third_party.grpc.health.v1.HealthService_pb2 import ( HealthCheckResponse, ServingStatus, @@ -437,9 +438,16 @@ def ensure_stream_ingestion_jobs(client: Client, all_projects: bool): opt.JOB_SERVICE_RETRY_FAILED_JOBS ) ): + status = None + try: + status = job.get_status() + except JobNotFoundException: + logger.warning(f"{job.get_id()} was already removed") + if ( isinstance(job, StreamIngestionJob) - and job.get_status() != SparkJobStatus.COMPLETED + and status is not None + and status != SparkJobStatus.COMPLETED ): jobs_by_hash[job.get_hash()] = job diff --git a/python/feast_spark/pyspark/launchers/k8s/k8s.py b/python/feast_spark/pyspark/launchers/k8s/k8s.py index dbbd6638..2488992f 100644 --- a/python/feast_spark/pyspark/launchers/k8s/k8s.py +++ b/python/feast_spark/pyspark/launchers/k8s/k8s.py @@ -74,6 +74,10 @@ def _truncate_label(label: str) -> str: return label[:63] +class JobNotFoundException(Exception): + pass + + class KubernetesJobMixin: def __init__(self, api: CustomObjectsApi, namespace: str, job_id: str): self._api = api @@ -85,17 +89,20 @@ def get_id(self) -> str: def get_error_message(self) -> str: job = _get_job_by_id(self._api, self._namespace, self._job_id) - assert job is not None + if job is None: + raise JobNotFoundException() return job.job_error_message def get_status(self) -> SparkJobStatus: job = _get_job_by_id(self._api, self._namespace, self._job_id) - assert job is not None + if job is None: + raise JobNotFoundException return job.state def get_start_time(self) -> datetime: job = _get_job_by_id(self._api, self._namespace, self._job_id) - assert job is not None + if job is None: + raise JobNotFoundException return job.start_time def cancel(self):