Skip to content

Commit a9b3b5d

Browse files
committed
Add support for auto-acknowledging pulled messages.
Follows @tmatsuo's suggested implementation in: googleapis#798 (comment) Closes googleapis#798.
1 parent 167a911 commit a9b3b5d

4 files changed

Lines changed: 222 additions & 28 deletions

File tree

docs/pubsub-usage.rst

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,24 @@ Re-synchronize a subscription with the back-end:
158158
:start-after: [START subscription_reload]
159159
:end-before: [END subscription_reload]
160160

161+
Fetch the IAM policy for a subscription
162+
163+
.. literalinclude:: pubsub_snippets.py
164+
:start-after: [START subscription_get_iam_policy]
165+
:end-before: [END subscription_get_iam_policy]
166+
167+
Update the IAM policy for a subscription:
168+
169+
.. literalinclude:: pubsub_snippets.py
170+
:start-after: [START subscription_set_iam_policy]
171+
:end-before: [END subscription_set_iam_policy]
172+
173+
Test permissions allowed by the current IAM policy on a subscription:
174+
175+
.. literalinclude:: pubsub_snippets.py
176+
:start-after: [START subscription_check_iam_permissions]
177+
:end-before: [END subscription_check_iam_permissions]
178+
161179
Delete a subscription:
162180

163181
.. literalinclude:: pubsub_snippets.py
@@ -193,20 +211,15 @@ Update the acknowlegement deadline for pulled messages:
193211
:start-after: [START subscription_modify_ack_deadline]
194212
:end-before: [END subscription_modify_ack_deadline]
195213

196-
Fetch the IAM policy for a subscription
197-
198-
.. literalinclude:: pubsub_snippets.py
199-
:start-after: [START subscription_get_iam_policy]
200-
:end-before: [END subscription_get_iam_policy]
201-
202-
Update the IAM policy for a subscription:
214+
Fetch pending messages, acknowledging those whose processing doesn't raise an
215+
error:
203216

204217
.. literalinclude:: pubsub_snippets.py
205-
:start-after: [START subscription_set_iam_policy]
206-
:end-before: [END subscription_set_iam_policy]
218+
:start-after: [START subscription_pull_autoack]
219+
:end-before: [END subscription_pull_autoack]
207220

208-
Test permissions allowed by the current IAM policy on a subscription:
221+
.. note::
209222

210-
.. literalinclude:: pubsub_snippets.py
211-
:start-after: [START subscription_check_iam_permissions]
212-
:end-before: [END subscription_check_iam_permissions]
223+
The ``pull`` API request occurs at entry to the ``with`` block, and the
224+
``acknowlege`` API request occurs at the end, passing only the ``ack_ids``
225+
which haven't been deleted from ``ack``

docs/pubsub_snippets.py

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -178,21 +178,6 @@ def topic_publish_messages(client, to_delete):
178178
# [END topic_publish_message_with_attrs]
179179

180180

181-
@snippet
182-
def topic_batch(client, to_delete):
183-
"""Publish multiple messages in a single request."""
184-
TOPIC_NAME = 'topic_batch-%d' % (_millis(),)
185-
topic = client.topic(TOPIC_NAME)
186-
topic.create()
187-
to_delete.append(topic)
188-
189-
# [START topic_batch]
190-
with topic.batch() as batch:
191-
batch.publish(b'This is the message payload')
192-
batch.publish(b'Another message payload', extra='EXTRA')
193-
# [END topic_batch] API request on block exit
194-
195-
196181
@snippet
197182
def topic_subscription(client, to_delete):
198183
"""Create subscriptions to a topic."""
@@ -358,6 +343,52 @@ def log_exception(_):
358343
(extras,))
359344

360345

346+
@snippet
347+
def subscription_pull_w_autoack(client, to_delete):
348+
"""Pull messges from a topic, auto-acknowldging them"""
349+
TOPIC_NAME = 'subscription_pull_autoack-%d' % (_millis(),)
350+
SUB_NAME = 'subscription_pull_autoack-defaults-%d' % (_millis(),)
351+
PAYLOAD1 = b'PAYLOAD1'
352+
PAYLOAD2 = b'PAYLOAD2'
353+
EXTRA = 'EXTRA'
354+
topic = client.topic(TOPIC_NAME)
355+
topic.create()
356+
to_delete.append(topic)
357+
358+
subscription = topic.subscription(SUB_NAME)
359+
subscription.create()
360+
to_delete.append(subscription)
361+
362+
# [START topic_batch]
363+
with topic.batch() as batch:
364+
batch.publish(PAYLOAD1)
365+
batch.publish(PAYLOAD2, extra=EXTRA)
366+
# [END topic_batch]
367+
368+
time.sleep(1) # eventually-consistent
369+
370+
payloads = []
371+
extras = []
372+
373+
def do_something_with(message): # pylint: disable=unused-argument
374+
payloads.append(message.data)
375+
if message.attributes:
376+
extras.append(message.attributes)
377+
378+
# [START subscription_pull_autoack]
379+
from gcloud.pubsub.subscription import AutoAck
380+
with AutoAck(subscription, max_messages=10) as ack:
381+
for ack_id, message in ack.items():
382+
try:
383+
do_something_with(message)
384+
except Exception: # pylint: disable=broad-except
385+
del ack[ack_id]
386+
# [END subscription_pull_autoack]
387+
388+
assert set(payloads) == set(PAYLOAD1, PAYLOAD1), "eventual consistency"
389+
assert extras == [{'extra': EXTRA}], "eventual consistency"
390+
391+
361392
@snippet
362393
def subscription_iam_policy(client, to_delete):
363394
"""Fetch / set a subscription's IAM policy."""

gcloud/pubsub/subscription.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,3 +420,51 @@ def check_iam_permissions(self, permissions, client=None):
420420
api = client.iam_policy_api
421421
return api.test_iam_permissions(
422422
self.full_name, list(permissions))
423+
424+
425+
class AutoAck(dict):
426+
"""Wrapper for :meth:`Subscription.pull` results.
427+
428+
Mapping, tracks messages still-to-be-acknowledged.
429+
430+
When used as a context manager, acknowledges all messages still in the
431+
mapping on `__exit__`. E.g.:
432+
433+
.. code-block: python
434+
435+
with AutoAck(subscription) as ack: # calls ``subscription.pull``
436+
for ack_id, message in ack.items():
437+
try:
438+
do_something_with(message):
439+
except:
440+
del ack[ack_id]
441+
442+
:type subscription: :class:`Subscription`
443+
:param subscription: subcription to be pulled.
444+
445+
:type return_immediately: boolean
446+
:param return_immediately: passed through to :meth:`Subscription.pull`
447+
448+
:type max_messages: int
449+
:param max_messages: passed through to :meth:`Subscription.pull`
450+
451+
:type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
452+
:param client: passed through to :meth:`Subscription.pull` and
453+
:meth:`Subscription.acknowledge`.
454+
"""
455+
def __init__(self, subscription,
456+
return_immediately=False, max_messages=1, client=None):
457+
super(AutoAck, self).__init__()
458+
self._subscription = subscription
459+
self._return_immediately = return_immediately
460+
self._max_messages = max_messages
461+
self._client = client
462+
463+
def __enter__(self):
464+
items = self._subscription.pull(
465+
self._return_immediately, self._max_messages, self._client)
466+
self.update(items)
467+
return self
468+
469+
def __exit__(self, exc_type, exc_val, exc_tb):
470+
self._subscription.acknowledge(list(self), self._client)

gcloud/pubsub/test_subscription.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,81 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
642642
return self._subscription_modify_ack_deadline_response
643643

644644

645+
class TestAutoAck(unittest2.TestCase):
646+
647+
def _getTargetClass(self):
648+
from gcloud.pubsub.subscription import AutoAck
649+
return AutoAck
650+
651+
def _makeOne(self, *args, **kw):
652+
return self._getTargetClass()(*args, **kw)
653+
654+
def test_ctor_defaults(self):
655+
subscription = _FauxSubscription(())
656+
auto_ack = self._makeOne(subscription)
657+
self.assertEqual(auto_ack._return_immediately, False)
658+
self.assertEqual(auto_ack._max_messages, 1)
659+
self.assertTrue(auto_ack._client is None)
660+
661+
def test_ctor_explicit(self):
662+
CLIENT = object()
663+
subscription = _FauxSubscription(())
664+
auto_ack = self._makeOne(
665+
subscription, return_immediately=True, max_messages=10,
666+
client=CLIENT)
667+
self.assertTrue(auto_ack._subscription is subscription)
668+
self.assertEqual(auto_ack._return_immediately, True)
669+
self.assertEqual(auto_ack._max_messages, 10)
670+
self.assertTrue(auto_ack._client is CLIENT)
671+
672+
def test___enter___w_defaults(self):
673+
subscription = _FauxSubscription(())
674+
auto_ack = self._makeOne(subscription)
675+
676+
with auto_ack as returned:
677+
pass
678+
679+
self.assertTrue(returned is auto_ack)
680+
self.assertEqual(subscription._return_immediately, False)
681+
self.assertEqual(subscription._max_messages, 1)
682+
self.assertTrue(subscription._client is None)
683+
684+
def test___enter___w_explicit(self):
685+
CLIENT = object()
686+
subscription = _FauxSubscription(())
687+
auto_ack = self._makeOne(
688+
subscription, return_immediately=True, max_messages=10,
689+
client=CLIENT)
690+
691+
with auto_ack as returned:
692+
pass
693+
694+
self.assertTrue(returned is auto_ack)
695+
self.assertEqual(subscription._return_immediately, True)
696+
self.assertEqual(subscription._max_messages, 10)
697+
self.assertTrue(subscription._client is CLIENT)
698+
699+
def test___exit___(self):
700+
CLIENT = object()
701+
ACK_ID1, MESSAGE1 = 'ACK_ID1', _FallibleMessage()
702+
ACK_ID2, MESSAGE2 = 'ACK_ID2', _FallibleMessage()
703+
ACK_ID3, MESSAGE3 = 'ACK_ID3', _FallibleMessage(True)
704+
ITEMS = [
705+
(ACK_ID1, MESSAGE1),
706+
(ACK_ID2, MESSAGE2),
707+
(ACK_ID3, MESSAGE3),
708+
]
709+
subscription = _FauxSubscription(ITEMS)
710+
auto_ack = self._makeOne(subscription, client=CLIENT)
711+
with auto_ack:
712+
for ack_id, message in list(auto_ack.items()):
713+
if message.fail:
714+
del auto_ack[ack_id]
715+
self.assertEqual(sorted(subscription._acknowledged),
716+
[ACK_ID1, ACK_ID2])
717+
self.assertTrue(subscription._ack_client is CLIENT)
718+
719+
645720
class _FauxIAMPolicy(object):
646721

647722
def get_iam_policy(self, target_path):
@@ -677,3 +752,30 @@ def __init__(self, project):
677752
def topic(self, name, timestamp_messages=False):
678753
from gcloud.pubsub.topic import Topic
679754
return Topic(name, client=self, timestamp_messages=timestamp_messages)
755+
756+
757+
class _FallibleMessage(object):
758+
759+
def __init__(self, fail=False):
760+
self.fail = fail
761+
762+
763+
class _FauxSubscription(object):
764+
765+
def __init__(self, items):
766+
self._items = items
767+
self._mapping = dict(items)
768+
self._acknowledged = set()
769+
770+
def pull(self, return_immediately=False, max_messages=1, client=None):
771+
self._return_immediately = return_immediately
772+
self._max_messages = max_messages
773+
self._client = client
774+
return self._items
775+
776+
def acknowledge(self, ack_ids, client=None):
777+
self._ack_client = client
778+
for ack_id in ack_ids:
779+
message = self._mapping[ack_id]
780+
assert not message.fail
781+
self._acknowledged.add(ack_id)

0 commit comments

Comments
 (0)