Skip to content

Commit 892e34e

Browse files
committed
Add GatewayTimeout exception to pubsub subscription pull.
1 parent 0e56db8 commit 892e34e

File tree

3 files changed

+34
-1
lines changed

3 files changed

+34
-1
lines changed

core/google/cloud/_testing.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ def _make_grpc_failed_precondition(self):
7777
from grpc import StatusCode
7878
return self._make_grpc_error(StatusCode.FAILED_PRECONDITION)
7979

80+
def _make_grpc_deadline_exceeded(self):
81+
from grpc import StatusCode
82+
return self._make_grpc_error(StatusCode.DEADLINE_EXCEEDED)
83+
8084

8185
class _GAXPageIterator(object):
8286

pubsub/google/cloud/pubsub/_gax.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,15 @@ def subscription_pull(self, subscription_path, return_immediately=False,
413413
subscription_path, max_messages,
414414
return_immediately=return_immediately)
415415
except GaxError as exc:
416-
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
416+
code = exc_to_code(exc.cause)
417+
if code == StatusCode.NOT_FOUND:
417418
raise NotFound(subscription_path)
419+
elif code == StatusCode.DEADLINE_EXCEEDED:
420+
# NOTE: The JSON-over-HTTP API returns a 200 with an empty
421+
# response when ``return_immediately`` is ``False``, so
422+
# we "mutate" the gRPC error into a non-error to conform.
423+
if not return_immediately:
424+
return []
418425
raise
419426
return [_received_message_pb_to_mapping(rmpb)
420427
for rmpb in response_pb.received_messages]

pubsub/unit_tests/test__gax.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,24 @@ def test_subscription_pull_defaults_error(self):
764764
self.assertFalse(return_immediately)
765765
self.assertIsNone(options)
766766

767+
def test_subscription_pull_deadline_exceeded(self):
768+
client = _Client(self.PROJECT)
769+
gax_api = _GAXSubscriberAPI(_deadline_exceeded_gax_error=True)
770+
api = self._make_one(gax_api, client)
771+
772+
result = api.subscription_pull(self.SUB_PATH)
773+
self.assertEqual(result, [])
774+
775+
def test_subscription_pull_deadline_exceeded_return_immediately(self):
776+
from google.gax.errors import GaxError
777+
778+
client = _Client(self.PROJECT)
779+
gax_api = _GAXSubscriberAPI(_deadline_exceeded_gax_error=True)
780+
api = self._make_one(gax_api, client)
781+
782+
with self.assertRaises(GaxError):
783+
api.subscription_pull(self.SUB_PATH, return_immediately=True)
784+
767785
def test_subscription_acknowledge_hit(self):
768786
ACK_ID1 = 'DEADBEEF'
769787
ACK_ID2 = 'BEADCAFE'
@@ -1075,6 +1093,7 @@ class _GAXSubscriberAPI(_GAXBaseAPI):
10751093
_modify_push_config_ok = False
10761094
_acknowledge_ok = False
10771095
_modify_ack_deadline_ok = False
1096+
_deadline_exceeded_gax_error = False
10781097

10791098
def list_subscriptions(self, project, page_size, options=None):
10801099
self._list_subscriptions_called_with = (project, page_size, options)
@@ -1124,6 +1143,9 @@ def pull(self, name, max_messages, return_immediately, options=None):
11241143
name, max_messages, return_immediately, options)
11251144
if self._random_gax_error:
11261145
raise GaxError('error')
1146+
if self._deadline_exceeded_gax_error:
1147+
raise GaxError('deadline exceeded',
1148+
self._make_grpc_deadline_exceeded())
11271149
try:
11281150
return self._pull_response
11291151
except AttributeError:

0 commit comments

Comments
 (0)