Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit ad758aa

Browse files
feat(bigquery): Addressed comments and add unit test
1 parent fbbf79d commit ad758aa

File tree

2 files changed

+70
-11
lines changed

2 files changed

+70
-11
lines changed

google/cloud/bigquery/client.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
from google.cloud.bigquery._helpers import _record_field_to_json
5858
from google.cloud.bigquery._helpers import _str_or_none
5959
from google.cloud.bigquery._helpers import _verify_job_config_type
60+
from google.cloud.bigquery._helpers import _del_sub_prop
6061
from google.cloud.bigquery._http import Connection
6162
from google.cloud.bigquery import _pandas_helpers
6263
from google.cloud.bigquery.dataset import Dataset
@@ -1338,9 +1339,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
13381339
load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
13391340
job_config
13401341
)
1341-
destination = TableReference.from_api_repr(
1342-
job_config["load"]["destinationTable"]
1343-
)
1342+
destination = _get_sub_prop(job_config, ["load", "destinationTable"])
13441343
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
13451344
return self.load_table_from_uri(
13461345
source_uris, destination, job_config=load_job_config, retry=retry
@@ -1349,14 +1348,12 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
13491348
copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
13501349
job_config
13511350
)
1352-
copy_resource = job_config["copy"]
1353-
destination = TableReference.from_api_repr(
1354-
copy_resource["destinationTable"]
1355-
)
1351+
destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
13561352
sources = []
1357-
source_configs = copy_resource.get("sourceTables")
1353+
source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"])
1354+
13581355
if source_configs is None:
1359-
source_configs = [copy_resource["sourceTable"]]
1356+
source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])]
13601357
for source_config in source_configs:
13611358
table_ref = TableReference.from_api_repr(source_config)
13621359
sources.append(table_ref)
@@ -1367,13 +1364,13 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
13671364
extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
13681365
job_config
13691366
)
1370-
source = TableReference.from_api_repr(job_config["extract"]["sourceTable"])
1367+
source = _get_sub_prop(job_config, ["extract", "sourceTable"])
13711368
destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
13721369
return self.extract_table(
13731370
source, destination_uris, job_config=extract_job_config, retry=retry
13741371
)
13751372
elif "query" in job_config:
1376-
del job_config["query"]["destinationTable"]
1373+
_del_sub_prop(job_config, ["query", "destinationTable"])
13771374
query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
13781375
job_config
13791376
)

tests/unit/test_client.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2893,6 +2893,68 @@ def test_create_job_query_config(self):
28932893
configuration, "google.cloud.bigquery.client.Client.query",
28942894
)
28952895

2896+
def test_create_job_query_config_w_rateLimitExceeded_error(self):
2897+
from google.cloud.exceptions import Forbidden
2898+
from google.cloud.bigquery.retry import DEFAULT_RETRY
2899+
2900+
query = "select count(*) from persons"
2901+
configuration = {
2902+
"query": {
2903+
"query": query,
2904+
"useLegacySql": False,
2905+
"destinationTable": {"tableId": "table_id"},
2906+
}
2907+
}
2908+
resource = {
2909+
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
2910+
"configuration": {
2911+
"query": {
2912+
"query": query,
2913+
"useLegacySql": False,
2914+
"destinationTable": {
2915+
"projectId": self.PROJECT,
2916+
"datasetId": self.DS_ID,
2917+
"tableId": "query_destination_table",
2918+
},
2919+
}
2920+
},
2921+
}
2922+
data_without_destination = {
2923+
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
2924+
"configuration": {"query": {"query": query, "useLegacySql": False}},
2925+
}
2926+
2927+
creds = _make_credentials()
2928+
http = object()
2929+
retry = DEFAULT_RETRY.with_deadline(1).with_predicate(
2930+
lambda exc: isinstance(exc, Forbidden)
2931+
)
2932+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
2933+
2934+
api_request_patcher = mock.patch.object(
2935+
client._connection,
2936+
"api_request",
2937+
side_effect=[
2938+
Forbidden("", errors=[{"reason": "rateLimitExceeded"}]),
2939+
resource,
2940+
],
2941+
)
2942+
2943+
with api_request_patcher as fake_api_request:
2944+
job = client.create_job(job_config=configuration, retry=retry)
2945+
2946+
self.assertEqual(job.destination.table_id, "query_destination_table")
2947+
self.assertEqual(len(fake_api_request.call_args_list), 2) # was retried once
2948+
self.assertEqual(
2949+
fake_api_request.call_args_list[1],
2950+
mock.call(
2951+
method="POST",
2952+
path="/projects/PROJECT/jobs",
2953+
data=data_without_destination,
2954+
timeout=None,
2955+
),
2956+
)
2957+
28962958
def test_create_job_w_invalid_job_config(self):
28972959
configuration = {"unknown": {}}
28982960
creds = _make_credentials()

0 commit comments

Comments
 (0)