Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,31 +224,51 @@ def to_df(self):
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
return df

def to_bigquery(self, dry_run=False) -> Optional[str]:
def to_sql(self) -> str:
"""
Returns the SQL query that will be executed in BigQuery to build the historical feature table.
"""
return self.query

def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]:
"""
Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table.

Args:
job_config: An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc.

Returns:
Returns the destination table name or returns None if job_config.dry_run is True.
"""

@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
def _block_until_done():
return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"]

today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
dataset_project = self.config.offline_store.project_id or self.client.project
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run)
bq_job = self.client.query(self.query, job_config=job_config)

if dry_run:
print(
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
if not job_config:
today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
dataset_project = (
self.config.offline_store.project_id or self.client.project
)
return None
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path)

bq_job = self.client.query(self.query, job_config=job_config)

_block_until_done()

if bq_job.exception():
raise bq_job.exception()

print(f"Done writing to '{path}'.")
return path
if job_config.dry_run:
print(
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
)
return None

print(f"Done writing to '{job_config.destination}'.")
Comment thread
codyjlin marked this conversation as resolved.
return str(job_config.destination)


@dataclass(frozen=True)
Expand Down
18 changes: 1 addition & 17 deletions sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,29 +438,13 @@ def test_historical_features_from_bigquery_sources(
],
)

# Just a dry run, should not create table
bq_dry_run = job_from_sql.to_bigquery(dry_run=True)
assert bq_dry_run is None

bq_temp_table_path = job_from_sql.to_bigquery()
assert bq_temp_table_path.split(".")[0] == gcp_project

if provider_type == "gcp_custom_offline_config":
assert bq_temp_table_path.split(".")[1] == "foo"
else:
assert bq_temp_table_path.split(".")[1] == bigquery_dataset

# Check that this table actually exists
actual_bq_temp_table = bigquery.Client().get_table(bq_temp_table_path)
assert actual_bq_temp_table.table_id == bq_temp_table_path.split(".")[-1]

start_time = datetime.utcnow()
actual_df_from_sql_entities = job_from_sql.to_df()
end_time = datetime.utcnow()
with capsys.disabled():
print(
str(
f"\nTime to execute job_from_df.to_df() = '{(end_time - start_time)}'"
f"\nTime to execute job_from_sql.to_df() = '{(end_time - start_time)}'"
)
)

Expand Down