Skip to content

Commit 2a078e0

Browse files
committed
Add GatewayTimeout exception to pubsub subscription pull.
1 parent 53c189a commit 2a078e0

3 files changed

Lines changed: 19 additions & 0 deletions

File tree

core/google/cloud/_testing.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ def _make_grpc_failed_precondition(self):
7777
from grpc import StatusCode
7878
return self._make_grpc_error(StatusCode.FAILED_PRECONDITION)
7979

80+
def _make_grpc_deadline_exceeded(self):
81+
from grpc import StatusCode
82+
return self._make_grpc_error(StatusCode.DEADLINE_EXCEEDED)
83+
8084

8185
class _GAXPageIterator(object):
8286

pubsub/google/cloud/pubsub/_gax.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from google.cloud._helpers import _to_bytes
3232
from google.cloud._helpers import _pb_timestamp_to_rfc3339
3333
from google.cloud.exceptions import Conflict
34+
from google.cloud.exceptions import GatewayTimeout
3435
from google.cloud.exceptions import NotFound
3536
from google.cloud.iterator import GAXIterator
3637
from google.cloud.pubsub._helpers import subscription_name_from_path
@@ -413,6 +414,8 @@ def subscription_pull(self, subscription_path, return_immediately=False,
413414
except GaxError as exc:
414415
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
415416
raise NotFound(subscription_path)
417+
if exc_to_code(exc.cause) == StatusCode.DEADLINE_EXCEEDED:
418+
raise GatewayTimeout(subscription_path)
416419
raise
417420
return [_received_message_pb_to_mapping(rmpb)
418421
for rmpb in response_pb.received_messages]

pubsub/unit_tests/test__gax.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,14 @@ def test_subscription_pull_defaults_error(self):
762762
self.assertFalse(return_immediately)
763763
self.assertIsNone(options)
764764

765+
def test_subscription_pull_deadline_exceeded(self):
766+
from google.cloud.exceptions import GatewayTimeout
767+
gax_api = _GAXSubscriberAPI(_deadline_exceeded_gax_error=True)
768+
api = self._makeOne(gax_api)
769+
770+
with self.assertRaises(GatewayTimeout):
771+
api.subscription_pull(self.SUB_PATH)
772+
765773
def test_subscription_acknowledge_hit(self):
766774
ACK_ID1 = 'DEADBEEF'
767775
ACK_ID2 = 'BEADCAFE'
@@ -1035,6 +1043,7 @@ class _GAXSubscriberAPI(_GAXBaseAPI):
10351043
_modify_push_config_ok = False
10361044
_acknowledge_ok = False
10371045
_modify_ack_deadline_ok = False
1046+
_deadline_exceeded_gax_error = False
10381047

10391048
def list_subscriptions(self, project, page_size, options=None):
10401049
self._list_subscriptions_called_with = (project, page_size, options)
@@ -1084,6 +1093,9 @@ def pull(self, name, max_messages, return_immediately, options=None):
10841093
name, max_messages, return_immediately, options)
10851094
if self._random_gax_error:
10861095
raise GaxError('error')
1096+
if self._deadline_exceeded_gax_error:
1097+
raise GaxError('deadline exceeded',
1098+
self._make_grpc_deadline_exceeded())
10871099
try:
10881100
return self._pull_response
10891101
except AttributeError:

0 commit comments

Comments
 (0)