Skip to content

Commit 51bcf91

Browse files
committed
Snippets for 'Subscription.{pull,acknowledge,modify_ack_deadline}'.
1 parent ecd0ffc commit 51bcf91

3 files changed

Lines changed: 92 additions & 49 deletions

File tree

docs/pubsub-usage.rst

Lines changed: 14 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -170,61 +170,28 @@ Pull messages from a subscription
170170

171171
Fetch pending messages for a pull subscription:
172172

173-
.. doctest::
174-
175-
>>> from gcloud import pubsub
176-
>>> client = pubsub.Client()
177-
>>> topic = client.topic('topic_name')
178-
>>> subscription = topic.subscription('subscription_name')
179-
>>> with topic.batch() as batch:
180-
... batch.publish('this is the first message_payload')
181-
... batch.publish('this is the second message_payload',
182-
... attr1='value1', attr2='value2')
183-
>>> received = subscription.pull() # API request
184-
>>> messages = [recv[1] for recv in received]
185-
>>> [message.message_id for message in messages]
186-
[<message_id1>, <message_id2>]
187-
>>> [message.data for message in messages]
188-
['this is the first message_payload', 'this is the second message_payload']
189-
>>> [message.attributes for message in messages]
190-
[{}, {'attr1': 'value1', 'attr2': 'value2'}]
173+
.. literalinclude:: pubsub_snippets.py
174+
:start-after: [START subscription_pull]
175+
:end-before: [END subscription_pull]
191176

192177
Note that received messages must be acknowledged, or else the back-end
193178
will re-send them later:
194179

195-
.. doctest::
196-
197-
>>> ack_ids = [recv[0] for recv in received]
198-
>>> subscription.acknowledge(ack_ids)
199-
200-
Fetch a limited number of pending messages for a pull subscription:
201-
202-
.. doctest::
203-
204-
>>> from gcloud import pubsub
205-
>>> client = pubsub.Client()
206-
>>> topic = client.topic('topic_name')
207-
>>> subscription = topic.subscription('subscription_name')
208-
>>> with topic.batch() as batch:
209-
... batch.publish('this is the first message_payload')
210-
... batch.publish('this is the second message_payload',
211-
... attr1='value1', attr2='value2')
212-
>>> received = subscription.pull(max_messages=1) # API request
213-
>>> messages = [recv[1] for recv in received]
214-
>>> [message.message_id for message in messages]
180+
.. literalinclude:: pubsub_snippets.py
181+
:start-after: [START subscription_acknowledge]
182+
:end-before: [END subscription_acknowledge]
215183

216184
Fetch messages for a pull subscription without blocking (none pending):
217185

218-
.. doctest::
186+
.. literalinclude:: pubsub_snippets.py
187+
:start-after: [START subscription_pull_return_immediately]
188+
:end-before: [END subscription_pull_return_immediately]
219189

220-
>>> from gcloud import pubsub
221-
>>> client = pubsub.Client()
222-
>>> topic = client.topic('topic_name')
223-
>>> subscription = topic.subscription('subscription_name')
224-
>>> received = subscription.pull(return_immediately=True) # API request
225-
>>> messages = [recv[1] for recv in received]
226-
>>> [message.message_id for message in messages]
227-
[]
190+
Update the acknowlegement deadline for pulled messages:
191+
192+
.. literalinclude:: pubsub_snippets.py
193+
:start-after: [START subscription_modify_ack_deadline]
194+
:end-before: [END subscription_modify_ack_deadline]
228195

229196
Fetch the IAM policy for a subscription
230197

docs/pubsub_snippets.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,76 @@ def subscription_lifecycle(client, to_delete):
293293
# [END subscription_delete]
294294

295295

296+
@snippet
297+
def subscription_pull(client, to_delete):
298+
"""Pull messges from a subscribed topic."""
299+
TOPIC_NAME = 'subscription_pull-%d' % (_millis(),)
300+
SUB_NAME = 'subscription_pull-defaults-%d' % (_millis(),)
301+
PAYLOAD1 = b'PAYLOAD1'
302+
PAYLOAD2 = b'PAYLOAD2'
303+
EXTRA = 'EXTRA'
304+
topic = client.topic(TOPIC_NAME)
305+
topic.create()
306+
to_delete.append(topic)
307+
308+
subscription = topic.subscription(SUB_NAME)
309+
subscription.create()
310+
to_delete.append(subscription)
311+
312+
# [START subscription_pull_none_pending]
313+
pulled = subscription.pull(max_messages=1)
314+
# [END subscription_pull_none_pending]
315+
assert len(pulled) == 0
316+
317+
# [START subscription_pull_return_immediately]
318+
pulled = subscription.pull(return_immediately=True)
319+
# [END subscription_pull_return_immediately]
320+
assert len(pulled) == 0
321+
322+
topic.publish(PAYLOAD1)
323+
topic.publish(PAYLOAD2, extra=EXTRA)
324+
325+
# [START subscription_pull]
326+
pulled = subscription.pull(max_messages=2)
327+
# [END subscription_pull]
328+
329+
assert len(pulled) == 2
330+
331+
# [START subscription_modify_ack_deadline]
332+
for ack_id, _ in pulled:
333+
subscription.modify_ack_deadline(ack_id, 90) # API request
334+
# [END subscription_modify_ack_deadline]
335+
336+
payloads = []
337+
extras = []
338+
339+
def do_something_with(message): # pylint: disable=unused-argument
340+
payloads.append(message.data)
341+
if message.attributes:
342+
extras.append(message.attributes)
343+
344+
class ApplicationException(Exception):
345+
pass
346+
347+
def log_exception(_):
348+
pass
349+
350+
# [START subscription_acknowledge]
351+
for ack_id, message in pulled:
352+
try:
353+
do_something_with(message)
354+
except ApplicationException as e:
355+
log_exception(e)
356+
else:
357+
subscription.acknowledge([ack_id])
358+
# [END subscription_acknowledge]
359+
360+
assert set(payloads) == set([PAYLOAD1, PAYLOAD2]), 'payloads: %s' % (
361+
(payloads,))
362+
assert extras == [{'extra': EXTRA}], 'extras: %s' % (
363+
(extras,))
364+
365+
296366
def _find_examples():
297367
funcs = [obj for obj in globals().values()
298368
if getattr(obj, '_snippet', False)]

gcloud/pubsub/subscription.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,8 @@ def pull(self, return_immediately=False, max_messages=1, client=None):
267267
Example:
268268
269269
.. literalinclude:: pubsub_snippets.py
270-
:start-after: [START subscription_push_pull]
271-
:end-before: [END subscription_push_pull]
270+
:start-after: [START subscription_pull]
271+
:end-before: [END subscription_pull]
272272
273273
:type return_immediately: boolean
274274
:param return_immediately: if True, the back-end returns even if no
@@ -301,6 +301,12 @@ def acknowledge(self, ack_ids, client=None):
301301
See:
302302
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/acknowledge
303303
304+
Example:
305+
306+
.. literalinclude:: pubsub_snippets.py
307+
:start-after: [START subscription_acknowledge]
308+
:end-before: [END subscription_acknowledge]
309+
304310
:type ack_ids: list of string
305311
:param ack_ids: ack IDs of messages being acknowledged
306312

0 commit comments

Comments
 (0)