Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
feat: Add support for table_create_disposition in bigquery job for …
…offline store (#3762)

* Add bigquery table create disposition to offline store

Signed-off-by: Nick Zeolla <nick.zeolla@starlingbank.com>

* linting

Signed-off-by: Nick Zeolla <nick.zeolla@starlingbank.com>

---------

Signed-off-by: Nick Zeolla <nick.zeolla@starlingbank.com>
  • Loading branch information
nickozilla authored and JohnLemmonMedely committed Sep 15, 2023
commit 83386879ba8bd09d7a201847be4ce5ee36dccdff
14 changes: 13 additions & 1 deletion sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import StrictStr, validator
from pydantic import ConstrainedStr, StrictStr, validator
from pydantic.typing import Literal
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

Expand Down Expand Up @@ -72,6 +72,13 @@ def get_http_client_info():
return http_client_info.ClientInfo(user_agent=get_user_agent())


class BigQueryTableCreateDisposition(ConstrainedStr):
"""Custom constraint for table_create_disposition. To understand more, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition"""

values = {"CREATE_NEVER", "CREATE_IF_NEEDED"}


class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for GCP BigQuery"""

Expand All @@ -95,6 +102,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""

table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""

@validator("billing_project_id")
def project_id_exists(cls, v, values, **kwargs):
if v and not values["project_id"]:
Expand Down Expand Up @@ -324,6 +334,7 @@ def write_logged_features(
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
schema=arrow_schema_to_bq_schema(source.get_schema(registry)),
create_disposition=config.offline_store.table_create_disposition,
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field=source.get_log_timestamp_column(),
Expand Down Expand Up @@ -384,6 +395,7 @@ def offline_write_batch(
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
schema=arrow_schema_to_bq_schema(pa_schema),
create_disposition=config.offline_store.table_create_disposition,
write_disposition="WRITE_APPEND", # Default but included for clarity
)

Expand Down