Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fix lint
Signed-off-by: Danny Chiao <danny@tecton.ai>
  • Loading branch information
adchia committed Sep 30, 2022
commit 9bdbab9922df0c5c49a956efb09ecf799ec6098d
46 changes: 33 additions & 13 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ def pull_latest_from_table_or_query(
config.offline_store.billing_project_id or config.offline_store.project_id
)
client = _get_bigquery_client(
project=project_id, location=config.offline_store.location,
project=project_id,
location=config.offline_store.location,
)
query = f"""
SELECT
Expand All @@ -152,7 +153,10 @@ def pull_latest_from_table_or_query(

# When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized
return BigQueryRetrievalJob(
query=query, client=client, config=config, full_feature_names=False,
query=query,
client=client,
config=config,
full_feature_names=False,
)

@staticmethod
Expand All @@ -173,7 +177,8 @@ def pull_all_from_table_or_query(
config.offline_store.billing_project_id or config.offline_store.project_id
)
client = _get_bigquery_client(
project=project_id, location=config.offline_store.location,
project=project_id,
location=config.offline_store.location,
)
field_string = ", ".join(
join_key_columns + feature_name_columns + [timestamp_field]
Expand All @@ -184,7 +189,10 @@ def pull_all_from_table_or_query(
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
"""
return BigQueryRetrievalJob(
query=query, client=client, config=config, full_feature_names=False,
query=query,
client=client,
config=config,
full_feature_names=False,
)

@staticmethod
Expand All @@ -206,7 +214,8 @@ def get_historical_features(
config.offline_store.billing_project_id or config.offline_store.project_id
)
client = _get_bigquery_client(
project=project_id, location=config.offline_store.location,
project=project_id,
location=config.offline_store.location,
)

assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)
Expand All @@ -221,20 +230,27 @@ def get_historical_features(
config.offline_store.location,
)

entity_schema = _get_entity_schema(client=client, entity_df=entity_df,)
entity_schema = _get_entity_schema(
client=client,
entity_df=entity_df,
)

entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df(
entity_schema
entity_df_event_timestamp_col = (
offline_utils.infer_event_timestamp_from_entity_df(entity_schema)
)

entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
entity_df, entity_df_event_timestamp_col, client,
entity_df,
entity_df_event_timestamp_col,
client,
)

@contextlib.contextmanager
def query_generator() -> Iterator[str]:
_upload_entity_df(
client=client, table_name=table_reference, entity_df=entity_df,
client=client,
table_name=table_reference,
entity_df=entity_df,
)

expected_join_keys = offline_utils.get_expected_join_keys(
Expand Down Expand Up @@ -301,7 +317,8 @@ def write_logged_features(
config.offline_store.billing_project_id or config.offline_store.project_id
)
client = _get_bigquery_client(
project=project_id, location=config.offline_store.location,
project=project_id,
location=config.offline_store.location,
)

job_config = bigquery.LoadJobConfig(
Expand Down Expand Up @@ -360,7 +377,8 @@ def offline_write_batch(
config.offline_store.billing_project_id or config.offline_store.project_id
)
client = _get_bigquery_client(
project=project_id, location=config.offline_store.location,
project=project_id,
location=config.offline_store.location,
)

job_config = bigquery.LoadJobConfig(
Expand Down Expand Up @@ -618,7 +636,9 @@ def _get_table_reference_for_new_entity(


def _upload_entity_df(
client: Client, table_name: str, entity_df: Union[pd.DataFrame, str],
client: Client,
table_name: str,
entity_df: Union[pd.DataFrame, str],
) -> Table:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table"""
job: Union[bigquery.job.query.QueryJob, bigquery.job.load.LoadJob]
Expand Down