Skip to content

Commit d138648

Browse files
codyjlinwoop
authored andcommitted
Provide the user with more options for setting the to_bigquery config (feast-dev#1661)
* Provide more options for to_bigquery config Signed-off-by: Cody Lin <codyl@twitter.com> * Fix default job_config when none; remove excessive testing Signed-off-by: Cody Lin <codyl@twitter.com> * Add param type and docstring Signed-off-by: Cody Lin <codyl@twitter.com> * add docstrings and typing Signed-off-by: Cody Lin <codyl@twitter.com> * Apply docstring suggestions from code review Co-authored-by: Willem Pienaar <6728866+woop@users.noreply.github.com> Signed-off-by: Cody Lin <codyjlin@yahoomail.com> Co-authored-by: Willem Pienaar <6728866+woop@users.noreply.github.com> Signed-off-by: Mwad22 <51929507+Mwad22@users.noreply.github.com>
1 parent c14023f commit d138648

2 files changed

Lines changed: 35 additions & 31 deletions

File tree

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -241,31 +241,51 @@ def to_df(self):
241241
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
242242
return df
243243

244-
def to_bigquery(self, dry_run=False) -> Optional[str]:
244+
def to_sql(self) -> str:
245+
"""
246+
Returns the SQL query that will be executed in BigQuery to build the historical feature table.
247+
"""
248+
return self.query
249+
250+
def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]:
251+
"""
252+
Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table.
253+
254+
Args:
255+
job_config: An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc.
256+
257+
Returns:
258+
Returns the destination table name or returns None if job_config.dry_run is True.
259+
"""
260+
245261
@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
246262
def _block_until_done():
247263
return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"]
248264

249-
today = date.today().strftime("%Y%m%d")
250-
rand_id = str(uuid.uuid4())[:7]
251-
dataset_project = self.config.offline_store.project_id or self.client.project
252-
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
253-
job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run)
254-
bq_job = self.client.query(self.query, job_config=job_config)
255-
256-
if dry_run:
257-
print(
258-
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
265+
if not job_config:
266+
today = date.today().strftime("%Y%m%d")
267+
rand_id = str(uuid.uuid4())[:7]
268+
dataset_project = (
269+
self.config.offline_store.project_id or self.client.project
259270
)
260-
return None
271+
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
272+
job_config = bigquery.QueryJobConfig(destination=path)
273+
274+
bq_job = self.client.query(self.query, job_config=job_config)
261275

262276
_block_until_done()
263277

264278
if bq_job.exception():
265279
raise bq_job.exception()
266280

267-
print(f"Done writing to '{path}'.")
268-
return path
281+
if job_config.dry_run:
282+
print(
283+
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
284+
)
285+
return None
286+
287+
print(f"Done writing to '{job_config.destination}'.")
288+
return str(job_config.destination)
269289

270290

271291
@dataclass(frozen=True)

sdk/python/tests/test_historical_retrieval.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -485,29 +485,13 @@ def test_historical_features_from_bigquery_sources(
485485
full_feature_names=full_feature_names,
486486
)
487487

488-
# Just a dry run, should not create table
489-
bq_dry_run = job_from_sql.to_bigquery(dry_run=True)
490-
assert bq_dry_run is None
491-
492-
bq_temp_table_path = job_from_sql.to_bigquery()
493-
assert bq_temp_table_path.split(".")[0] == gcp_project
494-
495-
if provider_type == "gcp_custom_offline_config":
496-
assert bq_temp_table_path.split(".")[1] == "foo"
497-
else:
498-
assert bq_temp_table_path.split(".")[1] == bigquery_dataset
499-
500-
# Check that this table actually exists
501-
actual_bq_temp_table = bigquery.Client().get_table(bq_temp_table_path)
502-
assert actual_bq_temp_table.table_id == bq_temp_table_path.split(".")[-1]
503-
504488
start_time = datetime.utcnow()
505489
actual_df_from_sql_entities = job_from_sql.to_df()
506490
end_time = datetime.utcnow()
507491
with capsys.disabled():
508492
print(
509493
str(
510-
f"\nTime to execute job_from_df.to_df() = '{(end_time - start_time)}'"
494+
f"\nTime to execute job_from_sql.to_df() = '{(end_time - start_time)}'"
511495
)
512496
)
513497

0 commit comments

Comments
 (0)