Skip to content

Commit eab9cd6

Browse files
authored
Merge pull request #2050 from tseaver/consolidate-retry-logic
Consolidate retry logic
2 parents 9ed589c + 9f499ae commit eab9cd6

File tree

8 files changed

+297
-139
lines changed

8 files changed

+297
-139
lines changed

system_tests/attempt_system_tests.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@
2626
import subprocess
2727
import sys
2828

29+
from run_system_test import FailedSystemTestModule
2930
from run_system_test import run_module_tests
3031

3132

32-
MODULES = (
33-
'bigquery',
33+
MODULES = ( # ordered from most to least stable
3434
'datastore',
35-
'logging',
36-
'monitoring',
37-
'pubsub',
3835
'storage',
36+
'bigquery',
37+
'pubsub',
38+
'logging',
3939
'translate',
40+
'monitoring',
4041
)
4142
if sys.version_info[:2] == (2, 7):
4243
MODULES += ('bigtable', 'bigtable-happybase')
@@ -111,9 +112,14 @@ def prepare_to_run():
111112
def main():
112113
"""Run all the system tests if necessary."""
113114
prepare_to_run()
115+
failed_modules = 0
114116
for module in MODULES:
115-
run_module_tests(module)
117+
try:
118+
run_module_tests(module)
119+
except FailedSystemTestModule:
120+
failed_modules += 1
116121

122+
sys.exit(failed_modules)
117123

118124
if __name__ == '__main__':
119125
main()

system_tests/bigquery.py

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# limitations under the License.
1414

1515
import operator
16-
import time
1716

1817
import unittest
1918

@@ -22,7 +21,9 @@
2221
from gcloud import bigquery
2322
from gcloud.exceptions import Forbidden
2423

25-
from retry import Retry
24+
from retry import RetryErrors
25+
from retry import RetryInstanceState
26+
from retry import RetryResult
2627
from system_test_utils import unique_resource_id
2728

2829

@@ -86,7 +87,13 @@ def test_patch_dataset(self):
8687
def test_update_dataset(self):
8788
dataset = Config.CLIENT.dataset(DATASET_NAME)
8889
self.assertFalse(dataset.exists())
89-
dataset.create()
90+
91+
# We need to wait to stay within the rate limits.
92+
# The alternative outcome is a 403 Forbidden response from upstream.
93+
# See: https://cloud.google.com/bigquery/quota-policy
94+
retry = RetryErrors(Forbidden, max_tries=2, delay=30)
95+
retry(dataset.create)()
96+
9097
self.to_delete.append(dataset)
9198
self.assertTrue(dataset.exists())
9299
after = [grant for grant in dataset.access_grants
@@ -96,11 +103,8 @@ def test_update_dataset(self):
96103
# We need to wait to stay within the rate limits.
97104
# The alternative outcome is a 403 Forbidden response from upstream.
98105
# See: https://cloud.google.com/bigquery/quota-policy
99-
@Retry(Forbidden, tries=2, delay=30)
100-
def update_dataset():
101-
dataset.update()
106+
retry(dataset.update)()
102107

103-
update_dataset()
104108
self.assertEqual(len(dataset.access_grants), len(after))
105109
for found, expected in zip(dataset.access_grants, after):
106110
self.assertEqual(found.role, expected.role)
@@ -202,11 +206,9 @@ def test_update_table(self):
202206
# We need to wait to stay within the rate limits.
203207
# The alternative outcome is a 403 Forbidden response from upstream.
204208
# See: https://cloud.google.com/bigquery/quota-policy
205-
@Retry(Forbidden, tries=2, delay=30)
206-
def create_dataset():
207-
dataset.create()
209+
retry = RetryErrors(Forbidden, max_tries=2, delay=30)
210+
retry(dataset.create)()
208211

209-
create_dataset()
210212
self.to_delete.append(dataset)
211213
TABLE_NAME = 'test_table'
212214
full_name = bigquery.SchemaField('full_name', 'STRING',
@@ -261,15 +263,15 @@ def test_load_table_then_dump_table(self):
261263
self.assertEqual(len(errors), 0)
262264

263265
rows = ()
264-
counter = 9
266+
267+
def _has_rows(result):
268+
return len(result[0]) > 0
269+
265270
# Allow for 90 seconds of "warm up" before rows visible. See:
266271
# https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability
267-
268-
while len(rows) == 0 and counter > 0:
269-
counter -= 1
270-
rows, _, _ = table.fetch_data()
271-
if len(rows) == 0:
272-
time.sleep(10)
272+
# 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds
273+
retry = RetryResult(_has_rows, max_tries=8)
274+
rows, _, _ = retry(table.fetch_data)()
273275

274276
by_age = operator.itemgetter(1)
275277
self.assertEqual(sorted(rows, key=by_age),
@@ -329,13 +331,14 @@ def test_load_table_from_storage_then_dump_table(self):
329331

330332
job.begin()
331333

332-
counter = 9 # Allow for 90 seconds of lag.
334+
def _job_done(instance):
335+
return instance.state in ('DONE', 'done')
333336

334-
while job.state not in ('DONE', 'done') and counter > 0:
335-
counter -= 1
336-
job.reload()
337-
if job.state not in ('DONE', 'done'):
338-
time.sleep(10)
337+
# Allow for 90 seconds of "warm up" before rows visible. See:
338+
# https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability
339+
# 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds
340+
retry = RetryInstanceState(_job_done, max_tries=8)
341+
retry(job.reload)()
339342

340343
self.assertTrue(job.state in ('DONE', 'done'))
341344

system_tests/bigtable.py

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
import datetime
1616
import operator
17-
import time
1817

1918
import unittest
2019

@@ -32,6 +31,8 @@
3231
from gcloud.bigtable.row_data import PartialRowData
3332
from gcloud.environment_vars import TESTS_PROJECT
3433

34+
from retry import RetryErrors
35+
from retry import RetryResult
3536
from system_test_utils import unique_resource_id
3637

3738

@@ -75,39 +76,28 @@ def _operation_wait(operation, max_attempts=5):
7576
:rtype: bool
7677
:returns: Boolean indicating if the operation finished.
7778
"""
78-
total_sleep = 0
79-
while not operation.finished():
80-
if total_sleep > max_attempts:
81-
return False
82-
time.sleep(1)
83-
total_sleep += 1
8479

85-
return True
80+
def _operation_finished(result):
81+
return result
8682

83+
retry = RetryResult(_operation_finished, max_tries=max_attempts)
84+
return retry(operation.finished)()
8785

88-
def _retry_backoff(meth, *args, **kw):
86+
87+
def _retry_on_unavailable(exc):
88+
"""Retry only AbortionErrors whose status code is 'UNAVAILABLE'."""
8989
from grpc.beta.interfaces import StatusCode
90-
from grpc.framework.interfaces.face.face import AbortionError
91-
backoff_intervals = [1, 2, 4, 8]
92-
while True:
93-
try:
94-
return meth(*args, **kw)
95-
except AbortionError as error:
96-
if error.code != StatusCode.UNAVAILABLE:
97-
raise
98-
if backoff_intervals:
99-
time.sleep(backoff_intervals.pop(0))
100-
else:
101-
raise
90+
return exc.code == StatusCode.UNAVAILABLE
10291

10392

10493
def setUpModule():
94+
from grpc.framework.interfaces.face.face import AbortionError
10595
_helpers.PROJECT = TESTS_PROJECT
10696
Config.CLIENT = Client(admin=True)
10797
Config.INSTANCE = Config.CLIENT.instance(INSTANCE_ID, LOCATION_ID)
10898
Config.CLIENT.start()
109-
instances, failed_locations = _retry_backoff(
110-
Config.CLIENT.list_instances)
99+
retry = RetryErrors(AbortionError, error_predicate=_retry_on_unavailable)
100+
instances, failed_locations = retry(Config.CLIENT.list_instances)()
111101

112102
if len(failed_locations) != 0:
113103
raise ValueError('List instances failed in module set up.')

system_tests/logging_.py

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import time
16-
1715
import unittest
1816

1917
from gcloud import _helpers
2018
from gcloud.environment_vars import TESTS_PROJECT
2119
from gcloud import logging
2220

21+
from retry import RetryErrors
22+
from retry import RetryResult
2323
from system_test_utils import unique_resource_id
2424

2525

@@ -33,27 +33,10 @@
3333
TOPIC_NAME = 'gcloud-python-system-testing%s' % (_RESOURCE_ID,)
3434

3535

36-
def _retry_backoff(result_predicate, meth, *args, **kw):
36+
def _retry_on_unavailable(exc):
37+
"""Retry only AbortionErrors whose status code is 'UNAVAILABLE'."""
3738
from grpc.beta.interfaces import StatusCode
38-
from grpc.framework.interfaces.face.face import AbortionError
39-
backoff_intervals = [1, 2, 4, 8]
40-
while True:
41-
try:
42-
result = meth(*args, **kw)
43-
except AbortionError as error:
44-
if error.code != StatusCode.UNAVAILABLE:
45-
raise
46-
if backoff_intervals:
47-
time.sleep(backoff_intervals.pop(0))
48-
continue
49-
else:
50-
raise
51-
if result_predicate(result):
52-
return result
53-
if backoff_intervals:
54-
time.sleep(backoff_intervals.pop(0))
55-
else:
56-
raise RuntimeError('%s: %s %s' % (meth, args, kw))
39+
return exc.code == StatusCode.UNAVAILABLE
5740

5841

5942
def _has_entries(result):
@@ -81,28 +64,26 @@ def setUp(self):
8164

8265
def tearDown(self):
8366
from gcloud.exceptions import NotFound
67+
retry = RetryErrors(NotFound)
8468
for doomed in self.to_delete:
85-
backoff_intervals = [1, 2, 4, 8]
86-
while True:
87-
try:
88-
doomed.delete()
89-
break
90-
except NotFound:
91-
if backoff_intervals:
92-
time.sleep(backoff_intervals.pop(0))
93-
else:
94-
raise
69+
retry(doomed.delete)()
9570

9671
@staticmethod
9772
def _logger_name():
9873
return 'system-tests-logger' + unique_resource_id('-')
9974

75+
def _list_entries(self, logger):
76+
from grpc.framework.interfaces.face.face import AbortionError
77+
inner = RetryResult(_has_entries)(logger.list_entries)
78+
outer = RetryErrors(AbortionError, _retry_on_unavailable)(inner)
79+
return outer()
80+
10081
def test_log_text(self):
10182
TEXT_PAYLOAD = 'System test: test_log_text'
10283
logger = Config.CLIENT.logger(self._logger_name())
10384
self.to_delete.append(logger)
10485
logger.log_text(TEXT_PAYLOAD)
105-
entries, _ = _retry_backoff(_has_entries, logger.list_entries)
86+
entries, _ = self._list_entries(logger)
10687
self.assertEqual(len(entries), 1)
10788
self.assertEqual(entries[0].payload, TEXT_PAYLOAD)
10889

@@ -123,7 +104,7 @@ def test_log_text_w_metadata(self):
123104

124105
logger.log_text(TEXT_PAYLOAD, insert_id=INSERT_ID, severity=SEVERITY,
125106
http_request=REQUEST)
126-
entries, _ = _retry_backoff(_has_entries, logger.list_entries)
107+
entries, _ = self._list_entries(logger)
127108

128109
self.assertEqual(len(entries), 1)
129110

@@ -146,7 +127,7 @@ def test_log_struct(self):
146127
self.to_delete.append(logger)
147128

148129
logger.log_struct(JSON_PAYLOAD)
149-
entries, _ = _retry_backoff(_has_entries, logger.list_entries)
130+
entries, _ = self._list_entries(logger)
150131

151132
self.assertEqual(len(entries), 1)
152133
self.assertEqual(entries[0].payload, JSON_PAYLOAD)
@@ -171,7 +152,7 @@ def test_log_struct_w_metadata(self):
171152

172153
logger.log_struct(JSON_PAYLOAD, insert_id=INSERT_ID, severity=SEVERITY,
173154
http_request=REQUEST)
174-
entries, _ = _retry_backoff(_has_entries, logger.list_entries)
155+
entries, _ = self._list_entries(logger)
175156

176157
self.assertEqual(len(entries), 1)
177158
self.assertEqual(entries[0].payload, JSON_PAYLOAD)
@@ -205,10 +186,12 @@ def test_list_metrics(self):
205186
set([DEFAULT_METRIC_NAME]))
206187

207188
def test_reload_metric(self):
189+
from gcloud.exceptions import Conflict
190+
retry = RetryErrors(Conflict)
208191
metric = Config.CLIENT.metric(
209192
DEFAULT_METRIC_NAME, DEFAULT_FILTER, DEFAULT_DESCRIPTION)
210193
self.assertFalse(metric.exists())
211-
metric.create()
194+
retry(metric.create)()
212195
self.to_delete.append(metric)
213196
metric.filter_ = 'logName:other'
214197
metric.description = 'local changes'
@@ -217,12 +200,14 @@ def test_reload_metric(self):
217200
self.assertEqual(metric.description, DEFAULT_DESCRIPTION)
218201

219202
def test_update_metric(self):
203+
from gcloud.exceptions import Conflict
204+
retry = RetryErrors(Conflict)
220205
NEW_FILTER = 'logName:other'
221206
NEW_DESCRIPTION = 'updated'
222207
metric = Config.CLIENT.metric(
223208
DEFAULT_METRIC_NAME, DEFAULT_FILTER, DEFAULT_DESCRIPTION)
224209
self.assertFalse(metric.exists())
225-
metric.create()
210+
retry(metric.create)()
226211
self.to_delete.append(metric)
227212
metric.filter_ = NEW_FILTER
228213
metric.description = NEW_DESCRIPTION
@@ -324,10 +309,12 @@ def test_list_sinks(self):
324309
set([DEFAULT_SINK_NAME]))
325310

326311
def test_reload_sink(self):
312+
from gcloud.exceptions import Conflict
313+
retry = RetryErrors(Conflict)
327314
uri = self._init_bigquery_dataset()
328315
sink = Config.CLIENT.sink(DEFAULT_SINK_NAME, DEFAULT_FILTER, uri)
329316
self.assertFalse(sink.exists())
330-
sink.create()
317+
retry(sink.create)()
331318
self.to_delete.append(sink)
332319
sink.filter_ = 'BOGUS FILTER'
333320
sink.destination = 'BOGUS DESTINATION'
@@ -336,13 +323,15 @@ def test_reload_sink(self):
336323
self.assertEqual(sink.destination, uri)
337324

338325
def test_update_sink(self):
326+
from gcloud.exceptions import Conflict
327+
retry = RetryErrors(Conflict)
339328
bucket_uri = self._init_storage_bucket()
340329
dataset_uri = self._init_bigquery_dataset()
341330
UPDATED_FILTER = 'logName:syslog'
342331
sink = Config.CLIENT.sink(
343332
DEFAULT_SINK_NAME, DEFAULT_FILTER, bucket_uri)
344333
self.assertFalse(sink.exists())
345-
sink.create()
334+
retry(sink.create)()
346335
self.to_delete.append(sink)
347336
sink.filter_ = UPDATED_FILTER
348337
sink.destination = dataset_uri

0 commit comments

Comments
 (0)