-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Provide the user with more options for setting the to_bigquery config #1661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
ccc59f8
30b7b32
0b5f6ed
e87dad0
e6faf27
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,6 @@ | ||
| import time | ||
| import uuid | ||
| from dataclasses import asdict, dataclass | ||
| from datetime import date, datetime, timedelta | ||
| from datetime import datetime, timedelta | ||
| from typing import List, Optional, Set, Union | ||
|
|
||
| import pandas | ||
|
|
@@ -224,19 +223,17 @@ def to_df(self): | |
| df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True) | ||
| return df | ||
|
|
||
| def to_bigquery(self, dry_run=False) -> Optional[str]: | ||
| def to_sql(self): | ||
| return self.query | ||
|
|
||
| def to_bigquery(self, job_config=None) -> Optional[str]: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please add a type here and a docstring? |
||
| @retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True) | ||
| def _block_until_done(): | ||
| return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"] | ||
|
|
||
| today = date.today().strftime("%Y%m%d") | ||
| rand_id = str(uuid.uuid4())[:7] | ||
| dataset_project = self.config.offline_store.project_id or self.client.project | ||
| path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" | ||
| job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run) | ||
| bq_job = self.client.query(self.query, job_config=job_config) | ||
|
|
||
| if dry_run: | ||
| if job_config.dry_run: | ||
| print( | ||
| "This query will process {} bytes.".format(bq_job.total_bytes_processed) | ||
| ) | ||
|
|
@@ -247,8 +244,8 @@ def _block_until_done(): | |
| if bq_job.exception(): | ||
| raise bq_job.exception() | ||
|
|
||
| print(f"Done writing to '{path}'.") | ||
| return path | ||
| print(f"Done writing to '{job_config.destination}'.") | ||
|
codyjlin marked this conversation as resolved.
|
||
| return str(job_config.destination) | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| import numpy as np | ||
| import pandas as pd | ||
| import pytest | ||
| from google.api_core.exceptions import Conflict | ||
| from google.cloud import bigquery | ||
| from pandas.testing import assert_frame_equal | ||
| from pytz import utc | ||
|
|
@@ -439,12 +440,16 @@ def test_historical_features_from_bigquery_sources( | |
| ) | ||
|
|
||
| # Just a dry run, should not create table | ||
| bq_dry_run = job_from_sql.to_bigquery(dry_run=True) | ||
| job_config = bigquery.QueryJobConfig(dry_run=True) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's not write tests for these for the time being. It won't be worth the work and protection when tests are about to undergo refactoring. Also this is within a test that'll run 6 times due to its parametrizations
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also think these kind of small changes don't need test cases generally |
||
| bq_dry_run = job_from_sql.to_bigquery(job_config=job_config) | ||
| assert bq_dry_run is None | ||
|
|
||
| bq_temp_table_path = job_from_sql.to_bigquery() | ||
| assert bq_temp_table_path.split(".")[0] == gcp_project | ||
| path = f"{gcp_project}.{bigquery_dataset}.historical_dest_table" | ||
| job_config = bigquery.QueryJobConfig(destination=path) | ||
| bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config) | ||
|
|
||
| # Check return output of to_bigquery() | ||
| assert bq_temp_table_path.split(".")[0] == gcp_project | ||
| if provider_type == "gcp_custom_offline_config": | ||
| assert bq_temp_table_path.split(".")[1] == "foo" | ||
| else: | ||
|
|
@@ -454,13 +459,26 @@ def test_historical_features_from_bigquery_sources( | |
| actual_bq_temp_table = bigquery.Client().get_table(bq_temp_table_path) | ||
| assert actual_bq_temp_table.table_id == bq_temp_table_path.split(".")[-1] | ||
|
|
||
| # Config to overwrite an existing table should succeed | ||
| job_config = bigquery.QueryJobConfig( | ||
| destination=path, write_disposition="WRITE_TRUNCATE" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: write_disposition has the string options listed in the docs here.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as comment above, and we don't need to test for individual settings in QueryJobConfig. We can assume they work |
||
| ) | ||
| bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config) | ||
|
|
||
| # Config to fail on the existing table we created should fail | ||
| with pytest.raises(Conflict): | ||
| job_config = bigquery.QueryJobConfig( | ||
| destination=path, write_disposition="WRITE_EMPTY" | ||
| ) | ||
| bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config) | ||
|
|
||
| start_time = datetime.utcnow() | ||
| actual_df_from_sql_entities = job_from_sql.to_df() | ||
| end_time = datetime.utcnow() | ||
| with capsys.disabled(): | ||
| print( | ||
| str( | ||
| f"\nTime to execute job_from_df.to_df() = '{(end_time - start_time)}'" | ||
| f"\nTime to execute job_from_sql.to_df() = '{(end_time - start_time)}'" | ||
| ) | ||
| ) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.