Skip to content

Commit d0848af

Browse files
authored
BigQuery: Harden systests against transient GCS errors. (googleapis#7006)
- Centralize bucket creation in BQ systests. - Add retry for transient errors from 'bucket.create', blob operations Closes googleapis#7005.
1 parent ebe21de commit d0848af

File tree

1 file changed

+41
-41
lines changed

1 file changed

+41
-41
lines changed

bigquery/tests/system.py

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
from google.api_core.exceptions import Conflict
5050
from google.api_core.exceptions import Forbidden
5151
from google.api_core.exceptions import NotFound
52+
from google.api_core.exceptions import InternalServerError
53+
from google.api_core.exceptions import ServiceUnavailable
5254
from google.api_core.exceptions import TooManyRequests
5355
from google.cloud import bigquery
5456
from google.cloud.bigquery.dataset import Dataset
@@ -97,6 +99,10 @@
9799
),
98100
]
99101

102+
retry_storage_errors = RetryErrors(
103+
(TooManyRequests, InternalServerError, ServiceUnavailable)
104+
)
105+
100106

101107
def _has_rows(result):
102108
return len(result) > 0
@@ -154,10 +160,12 @@ def _still_in_use(bad_request):
154160
)
155161

156162
retry_in_use = RetryErrors(BadRequest, error_predicate=_still_in_use)
157-
retry_409_429 = RetryErrors((Conflict, TooManyRequests))
163+
retry_storage_errors_conflict = RetryErrors(
164+
(Conflict, TooManyRequests, InternalServerError, ServiceUnavailable)
165+
)
158166
for doomed in self.to_delete:
159167
if isinstance(doomed, storage.Bucket):
160-
retry_409_429(doomed.delete)(force=True)
168+
retry_storage_errors_conflict(doomed.delete)(force=True)
161169
elif isinstance(doomed, (Dataset, bigquery.DatasetReference)):
162170
retry_in_use(Config.CLIENT.delete_dataset)(doomed, delete_contents=True)
163171
elif isinstance(doomed, (Table, bigquery.TableReference)):
@@ -173,6 +181,14 @@ def test_get_service_account_email(self):
173181
self.assertIsInstance(got, six.text_type)
174182
self.assertIn("@", got)
175183

184+
def _create_bucket(self, bucket_name, location=None):
185+
storage_client = storage.Client()
186+
bucket = storage_client.bucket(bucket_name)
187+
retry_storage_errors(bucket.create)(location=location)
188+
self.to_delete.append(bucket)
189+
190+
return bucket
191+
176192
def test_create_dataset(self):
177193
DATASET_ID = _make_dataset_id("create_dataset")
178194
dataset = self.temp_dataset(DATASET_ID)
@@ -683,12 +699,8 @@ def test_load_table_from_uri_then_dump_table(self):
683699

684700
def test_load_table_from_file_w_explicit_location(self):
685701
# Create a temporary bucket for extract files.
686-
storage_client = storage.Client()
687702
bucket_name = "bq_load_table_eu_extract_test" + unique_resource_id()
688-
bucket = storage_client.bucket(bucket_name)
689-
bucket.location = "eu"
690-
self.to_delete.append(bucket)
691-
bucket.create()
703+
self._create_bucket(bucket_name, location="eu")
692704

693705
# Create a temporary dataset & table in the EU.
694706
table_bytes = six.BytesIO(b"a,3\nb,2\nc,1\n")
@@ -768,65 +780,57 @@ def test_load_table_from_file_w_explicit_location(self):
768780
table_ref, "gs://{}/letters-us.csv".format(bucket_name), location="US"
769781
).result()
770782

771-
def _create_storage(self, bucket_name, blob_name):
772-
storage_client = storage.Client()
773-
774-
# In the **very** rare case the bucket name is reserved, this
775-
# fails with a ConnectionError.
776-
bucket = storage_client.create_bucket(bucket_name)
777-
self.to_delete.append(bucket)
778-
779-
return bucket.blob(blob_name)
780-
781783
def _write_csv_to_storage(self, bucket_name, blob_name, header_row, data_rows):
782784
from google.cloud._testing import _NamedTemporaryFile
783785

784-
blob = self._create_storage(bucket_name, blob_name)
786+
bucket = self._create_bucket(bucket_name)
787+
blob = bucket.blob(blob_name)
788+
785789
with _NamedTemporaryFile() as temp:
786790
with open(temp.name, "w") as csv_write:
787791
writer = csv.writer(csv_write)
788792
writer.writerow(header_row)
789793
writer.writerows(data_rows)
790794

791795
with open(temp.name, "rb") as csv_read:
792-
blob.upload_from_file(csv_read, content_type="text/csv")
796+
retry_storage_errors(blob.upload_from_file)(
797+
csv_read, content_type="text/csv"
798+
)
793799

794800
self.to_delete.insert(0, blob)
795801
return "gs://{}/{}".format(bucket_name, blob_name)
796802

797803
def _write_avro_to_storage(self, bucket_name, blob_name, avro_file):
798-
blob = self._create_storage(bucket_name, blob_name)
799-
blob.upload_from_file(avro_file, content_type="application/x-avro-binary")
804+
bucket = self._create_bucket(bucket_name)
805+
blob = bucket.blob(blob_name)
806+
retry_storage_errors(blob.upload_from_file)(
807+
avro_file, content_type="application/x-avro-binary"
808+
)
800809
self.to_delete.insert(0, blob)
801810
return "gs://{}/{}".format(bucket_name, blob_name)
802811

803-
def _load_table_for_extract_table(
804-
self, storage_client, rows, bucket_name, blob_name, table
805-
):
812+
def _load_table_for_extract_table(self, bucket, blob_name, table, rows):
806813
from google.cloud._testing import _NamedTemporaryFile
807814

808-
gs_url = "gs://{}/{}".format(bucket_name, blob_name)
809-
810-
# In the **very** rare case the bucket name is reserved, this
811-
# fails with a ConnectionError.
812-
bucket = storage_client.create_bucket(bucket_name)
813-
self.to_delete.append(bucket)
814815
blob = bucket.blob(blob_name)
815-
816816
with _NamedTemporaryFile() as temp:
817817
with open(temp.name, "w") as csv_write:
818818
writer = csv.writer(csv_write)
819819
writer.writerow(HEADER_ROW)
820820
writer.writerows(rows)
821821

822822
with open(temp.name, "rb") as csv_read:
823-
blob.upload_from_file(csv_read, content_type="text/csv")
823+
retry_storage_errors(blob.upload_from_file)(
824+
csv_read, content_type="text/csv"
825+
)
826+
824827
self.to_delete.insert(0, blob)
825828

826829
dataset = self.temp_dataset(table.dataset_id)
827830
table_ref = dataset.table(table.table_id)
828831
config = bigquery.LoadJobConfig()
829832
config.autodetect = True
833+
gs_url = "gs://{}/{}".format(bucket.name, blob_name)
830834
job = Config.CLIENT.load_table_from_uri(gs_url, table_ref, job_config=config)
831835
# TODO(jba): do we need this retry now that we have job.result()?
832836
# Allow for 90 seconds of "warm up" before rows visible. See
@@ -836,21 +840,16 @@ def _load_table_for_extract_table(
836840
retry(job.reload)()
837841

838842
def test_extract_table(self):
839-
from google.cloud.storage import Client as StorageClient
840-
841-
storage_client = StorageClient()
842843
local_id = unique_resource_id()
843844
bucket_name = "bq_extract_test" + local_id
844-
blob_name = "person_ages.csv"
845+
source_blob_name = "person_ages.csv"
845846
dataset_id = _make_dataset_id("load_gcs_then_extract")
846847
table_id = "test_table"
847848
table_ref = Config.CLIENT.dataset(dataset_id).table(table_id)
848849
table = Table(table_ref)
849850
self.to_delete.insert(0, table)
850-
self._load_table_for_extract_table(
851-
storage_client, ROWS, bucket_name, blob_name, table_ref
852-
)
853-
bucket = storage_client.bucket(bucket_name)
851+
bucket = self._create_bucket(bucket_name)
852+
self._load_table_for_extract_table(bucket, source_blob_name, table_ref, ROWS)
854853
destination_blob_name = "person_ages_out.csv"
855854
destination = bucket.blob(destination_blob_name)
856855
destination_uri = "gs://{}/person_ages_out.csv".format(bucket_name)
@@ -859,7 +858,8 @@ def test_extract_table(self):
859858
job.result(timeout=100)
860859

861860
self.to_delete.insert(0, destination)
862-
got = destination.download_as_string().decode("utf-8")
861+
got_bytes = retry_storage_errors(destination.download_as_string)()
862+
got = got_bytes.decode("utf-8")
863863
self.assertIn("Bharney Rhubble", got)
864864

865865
def test_copy_table(self):

0 commit comments

Comments
 (0)