Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import time
import uuid
from dataclasses import asdict, dataclass
from datetime import date, datetime, timedelta
from datetime import datetime, timedelta
from typing import List, Optional, Set, Union

import pandas
Expand Down Expand Up @@ -224,19 +223,17 @@ def to_df(self):
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
return df

def to_bigquery(self, dry_run=False) -> Optional[str]:
def to_sql(self):
return self.query
Comment thread
codyjlin marked this conversation as resolved.
Outdated

def to_bigquery(self, job_config=None) -> Optional[str]:
Copy link
Copy Markdown
Member

@woop woop Jun 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a type here and a docstring?

@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
def _block_until_done():
return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"]

today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
dataset_project = self.config.offline_store.project_id or self.client.project
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run)
bq_job = self.client.query(self.query, job_config=job_config)

if dry_run:
if job_config.dry_run:
print(
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
)
Expand All @@ -247,8 +244,8 @@ def _block_until_done():
if bq_job.exception():
raise bq_job.exception()

print(f"Done writing to '{path}'.")
return path
print(f"Done writing to '{job_config.destination}'.")
Comment thread
codyjlin marked this conversation as resolved.
return str(job_config.destination)


@dataclass(frozen=True)
Expand Down
26 changes: 22 additions & 4 deletions sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numpy as np
import pandas as pd
import pytest
from google.api_core.exceptions import Conflict
from google.cloud import bigquery
from pandas.testing import assert_frame_equal
from pytz import utc
Expand Down Expand Up @@ -439,12 +440,16 @@ def test_historical_features_from_bigquery_sources(
)

# Just a dry run, should not create table
bq_dry_run = job_from_sql.to_bigquery(dry_run=True)
job_config = bigquery.QueryJobConfig(dry_run=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not write tests for these for the time being. It won't be worth the work and protection when tests are about to undergo refactoring. Also this is within a test that'll run 6 times due to its parametrizations

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think these kind of small changes don't need test cases generally

bq_dry_run = job_from_sql.to_bigquery(job_config=job_config)
assert bq_dry_run is None

bq_temp_table_path = job_from_sql.to_bigquery()
assert bq_temp_table_path.split(".")[0] == gcp_project
path = f"{gcp_project}.{bigquery_dataset}.historical_dest_table"
job_config = bigquery.QueryJobConfig(destination=path)
bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config)

# Check return output of to_bigquery()
assert bq_temp_table_path.split(".")[0] == gcp_project
if provider_type == "gcp_custom_offline_config":
assert bq_temp_table_path.split(".")[1] == "foo"
else:
Expand All @@ -454,13 +459,26 @@ def test_historical_features_from_bigquery_sources(
actual_bq_temp_table = bigquery.Client().get_table(bq_temp_table_path)
assert actual_bq_temp_table.table_id == bq_temp_table_path.split(".")[-1]

# Config to overwrite an existing table should succeed
job_config = bigquery.QueryJobConfig(
destination=path, write_disposition="WRITE_TRUNCATE"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: write_disposition has the string options listed in the docs here.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as comment above, and we don't need to test for individual settings in QueryJobConfig. We can assume they work

)
bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config)

# Config to fail on the existing table we created should fail
with pytest.raises(Conflict):
job_config = bigquery.QueryJobConfig(
destination=path, write_disposition="WRITE_EMPTY"
)
bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config)

start_time = datetime.utcnow()
actual_df_from_sql_entities = job_from_sql.to_df()
end_time = datetime.utcnow()
with capsys.disabled():
print(
str(
f"\nTime to execute job_from_df.to_df() = '{(end_time - start_time)}'"
f"\nTime to execute job_from_sql.to_df() = '{(end_time - start_time)}'"
)
)

Expand Down