4949from google .api_core .exceptions import Conflict
5050from google .api_core .exceptions import Forbidden
5151from google .api_core .exceptions import NotFound
52+ from google .api_core .exceptions import InternalServerError
53+ from google .api_core .exceptions import ServiceUnavailable
5254from google .api_core .exceptions import TooManyRequests
5355from google .cloud import bigquery
5456from google .cloud .bigquery .dataset import Dataset
9799 ),
98100]
99101
102+ retry_storage_errors = RetryErrors (
103+ (TooManyRequests , InternalServerError , ServiceUnavailable )
104+ )
105+
100106
101107def _has_rows (result ):
102108 return len (result ) > 0
@@ -154,10 +160,12 @@ def _still_in_use(bad_request):
154160 )
155161
156162 retry_in_use = RetryErrors (BadRequest , error_predicate = _still_in_use )
157- retry_409_429 = RetryErrors ((Conflict , TooManyRequests ))
163+ retry_storage_errors_conflict = RetryErrors (
164+ (Conflict , TooManyRequests , InternalServerError , ServiceUnavailable )
165+ )
158166 for doomed in self .to_delete :
159167 if isinstance (doomed , storage .Bucket ):
160- retry_409_429 (doomed .delete )(force = True )
168+ retry_storage_errors_conflict (doomed .delete )(force = True )
161169 elif isinstance (doomed , (Dataset , bigquery .DatasetReference )):
162170 retry_in_use (Config .CLIENT .delete_dataset )(doomed , delete_contents = True )
163171 elif isinstance (doomed , (Table , bigquery .TableReference )):
@@ -173,6 +181,14 @@ def test_get_service_account_email(self):
173181 self .assertIsInstance (got , six .text_type )
174182 self .assertIn ("@" , got )
175183
184+ def _create_bucket (self , bucket_name , location = None ):
185+ storage_client = storage .Client ()
186+ bucket = storage_client .bucket (bucket_name )
187+ retry_storage_errors (bucket .create )(location = location )
188+ self .to_delete .append (bucket )
189+
190+ return bucket
191+
176192 def test_create_dataset (self ):
177193 DATASET_ID = _make_dataset_id ("create_dataset" )
178194 dataset = self .temp_dataset (DATASET_ID )
@@ -683,12 +699,8 @@ def test_load_table_from_uri_then_dump_table(self):
683699
684700 def test_load_table_from_file_w_explicit_location (self ):
685701 # Create a temporary bucket for extract files.
686- storage_client = storage .Client ()
687702 bucket_name = "bq_load_table_eu_extract_test" + unique_resource_id ()
688- bucket = storage_client .bucket (bucket_name )
689- bucket .location = "eu"
690- self .to_delete .append (bucket )
691- bucket .create ()
703+ self ._create_bucket (bucket_name , location = "eu" )
692704
693705 # Create a temporary dataset & table in the EU.
694706 table_bytes = six .BytesIO (b"a,3\n b,2\n c,1\n " )
@@ -768,65 +780,57 @@ def test_load_table_from_file_w_explicit_location(self):
768780 table_ref , "gs://{}/letters-us.csv" .format (bucket_name ), location = "US"
769781 ).result ()
770782
771- def _create_storage (self , bucket_name , blob_name ):
772- storage_client = storage .Client ()
773-
774- # In the **very** rare case the bucket name is reserved, this
775- # fails with a ConnectionError.
776- bucket = storage_client .create_bucket (bucket_name )
777- self .to_delete .append (bucket )
778-
779- return bucket .blob (blob_name )
780-
781783 def _write_csv_to_storage (self , bucket_name , blob_name , header_row , data_rows ):
782784 from google .cloud ._testing import _NamedTemporaryFile
783785
784- blob = self ._create_storage (bucket_name , blob_name )
786+ bucket = self ._create_bucket (bucket_name )
787+ blob = bucket .blob (blob_name )
788+
785789 with _NamedTemporaryFile () as temp :
786790 with open (temp .name , "w" ) as csv_write :
787791 writer = csv .writer (csv_write )
788792 writer .writerow (header_row )
789793 writer .writerows (data_rows )
790794
791795 with open (temp .name , "rb" ) as csv_read :
792- blob .upload_from_file (csv_read , content_type = "text/csv" )
796+ retry_storage_errors (blob .upload_from_file )(
797+ csv_read , content_type = "text/csv"
798+ )
793799
794800 self .to_delete .insert (0 , blob )
795801 return "gs://{}/{}" .format (bucket_name , blob_name )
796802
797803 def _write_avro_to_storage (self , bucket_name , blob_name , avro_file ):
798- blob = self ._create_storage (bucket_name , blob_name )
799- blob .upload_from_file (avro_file , content_type = "application/x-avro-binary" )
804+ bucket = self ._create_bucket (bucket_name )
805+ blob = bucket .blob (blob_name )
806+ retry_storage_errors (blob .upload_from_file )(
807+ avro_file , content_type = "application/x-avro-binary"
808+ )
800809 self .to_delete .insert (0 , blob )
801810 return "gs://{}/{}" .format (bucket_name , blob_name )
802811
803- def _load_table_for_extract_table (
804- self , storage_client , rows , bucket_name , blob_name , table
805- ):
812+ def _load_table_for_extract_table (self , bucket , blob_name , table , rows ):
806813 from google .cloud ._testing import _NamedTemporaryFile
807814
808- gs_url = "gs://{}/{}" .format (bucket_name , blob_name )
809-
810- # In the **very** rare case the bucket name is reserved, this
811- # fails with a ConnectionError.
812- bucket = storage_client .create_bucket (bucket_name )
813- self .to_delete .append (bucket )
814815 blob = bucket .blob (blob_name )
815-
816816 with _NamedTemporaryFile () as temp :
817817 with open (temp .name , "w" ) as csv_write :
818818 writer = csv .writer (csv_write )
819819 writer .writerow (HEADER_ROW )
820820 writer .writerows (rows )
821821
822822 with open (temp .name , "rb" ) as csv_read :
823- blob .upload_from_file (csv_read , content_type = "text/csv" )
823+ retry_storage_errors (blob .upload_from_file )(
824+ csv_read , content_type = "text/csv"
825+ )
826+
824827 self .to_delete .insert (0 , blob )
825828
826829 dataset = self .temp_dataset (table .dataset_id )
827830 table_ref = dataset .table (table .table_id )
828831 config = bigquery .LoadJobConfig ()
829832 config .autodetect = True
833+ gs_url = "gs://{}/{}" .format (bucket .name , blob_name )
830834 job = Config .CLIENT .load_table_from_uri (gs_url , table_ref , job_config = config )
831835 # TODO(jba): do we need this retry now that we have job.result()?
832836 # Allow for 90 seconds of "warm up" before rows visible. See
@@ -836,21 +840,16 @@ def _load_table_for_extract_table(
836840 retry (job .reload )()
837841
838842 def test_extract_table (self ):
839- from google .cloud .storage import Client as StorageClient
840-
841- storage_client = StorageClient ()
842843 local_id = unique_resource_id ()
843844 bucket_name = "bq_extract_test" + local_id
844- blob_name = "person_ages.csv"
845+ source_blob_name = "person_ages.csv"
845846 dataset_id = _make_dataset_id ("load_gcs_then_extract" )
846847 table_id = "test_table"
847848 table_ref = Config .CLIENT .dataset (dataset_id ).table (table_id )
848849 table = Table (table_ref )
849850 self .to_delete .insert (0 , table )
850- self ._load_table_for_extract_table (
851- storage_client , ROWS , bucket_name , blob_name , table_ref
852- )
853- bucket = storage_client .bucket (bucket_name )
851+ bucket = self ._create_bucket (bucket_name )
852+ self ._load_table_for_extract_table (bucket , source_blob_name , table_ref , ROWS )
854853 destination_blob_name = "person_ages_out.csv"
855854 destination = bucket .blob (destination_blob_name )
856855 destination_uri = "gs://{}/person_ages_out.csv" .format (bucket_name )
@@ -859,7 +858,8 @@ def test_extract_table(self):
859858 job .result (timeout = 100 )
860859
861860 self .to_delete .insert (0 , destination )
862- got = destination .download_as_string ().decode ("utf-8" )
861+ got_bytes = retry_storage_errors (destination .download_as_string )()
862+ got = got_bytes .decode ("utf-8" )
863863 self .assertIn ("Bharney Rhubble" , got )
864864
865865 def test_copy_table (self ):
0 commit comments