Skip to content

Commit 67567fd

Browse files
authored
Prevent crashing when job was deleted externally (#147) (#149)
Signed-off-by: shuheng <khor.heng@gojek.com> Co-authored-by: shuheng <khor.heng@gojek.com> Co-authored-by: shuheng <khor.heng@gojek.com>
1 parent 7324f17 commit 67567fd

File tree

1 file changed

+42
-7
lines changed

1 file changed

+42
-7
lines changed

python/feast_spark/client.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,21 @@
11
import configparser
22
import os
33
import uuid
4-
from datetime import datetime
4+
from datetime import datetime, timedelta
55
from itertools import groupby
66
from typing import Dict, List, Optional, Union, cast
77

88
import pandas as pd
99
import redis
1010
from croniter import croniter
11+
from google.cloud import bigquery
1112

1213
import feast
1314
from feast.config import Config
1415
from feast.constants import ConfigOptions as feast_opt
1516
from feast.data_source import BigQuerySource, FileSource
1617
from feast.grpc.grpc import create_grpc_channel
17-
from feast.staging.entities import (
18-
stage_entities_to_bq,
19-
stage_entities_to_fs,
20-
table_reference_from_string,
21-
)
18+
from feast.staging.entities import stage_entities_to_fs, table_reference_from_string
2219
from feast_spark.api.JobService_pb2 import (
2320
GetHealthMetricsRequest,
2421
GetHistoricalFeaturesRequest,
@@ -51,6 +48,44 @@
5148
)
5249

5350

51+
def stage_entities_to_bq_with_partition(
52+
entity_source: pd.DataFrame, project: str, dataset: str
53+
) -> BigQuerySource:
54+
"""
55+
Stores given (entity) dataframe as new table in BQ. Name of the table generated based on current time.
56+
Table will expire in 1 day.
57+
Returns BigQuerySource with reference to created table.
58+
"""
59+
60+
bq_client: bigquery.Client = bigquery.Client()
61+
destination = bigquery.TableReference(
62+
bigquery.DatasetReference(project, dataset),
63+
f"_entities_{datetime.now():%Y%m%d%H%M%s}",
64+
)
65+
66+
# prevent casting ns -> ms exception inside pyarrow
67+
entity_source["event_timestamp"] = entity_source["event_timestamp"].dt.floor("ms")
68+
69+
load_job_config = bigquery.LoadJobConfig(
70+
time_partitioning=bigquery.TimePartitioning(
71+
type_=bigquery.TimePartitioningType.DAY, field="event_timestamp",
72+
)
73+
)
74+
load_job: bigquery.LoadJob = bq_client.load_table_from_dataframe(
75+
entity_source, destination, job_config=load_job_config,
76+
)
77+
load_job.result() # wait until complete
78+
79+
dest_table: bigquery.Table = bq_client.get_table(destination)
80+
dest_table.expires = datetime.now() + timedelta(days=1)
81+
bq_client.update_table(dest_table, fields=["expires"])
82+
83+
return BigQuerySource(
84+
event_timestamp_column="event_timestamp",
85+
table_ref=f"{destination.project}:{destination.dataset_id}.{destination.table_id}",
86+
)
87+
88+
5489
class Client:
5590
_feast: feast.Client
5691

@@ -197,7 +232,7 @@ def get_historical_features(
197232
staging_bq_project = source_ref.project
198233
staging_bq_dataset = source_ref.dataset_id
199234

200-
entity_source = stage_entities_to_bq(
235+
entity_source = stage_entities_to_bq_with_partition(
201236
entity_source, staging_bq_project, staging_bq_dataset
202237
)
203238
else:

0 commit comments

Comments
 (0)