Skip to content

Commit 8742eb1

Browse files
committed
Converting Pub/Sub topic->list_subscriptions to iterator.
1 parent 5f97bc7 commit 8742eb1

8 files changed

Lines changed: 249 additions & 119 deletions

File tree

docs/pubsub_snippets.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -206,14 +206,8 @@ def do_something_with(sub):
206206
sub_names.add(sub.full_name)
207207

208208
# [START topic_list_subscriptions]
209-
subscriptions, token = topic.list_subscriptions() # API request
210-
while True:
211-
for subscription in subscriptions:
212-
do_something_with(subscription)
213-
if token is None:
214-
break
215-
subscriptions, token = topic.list_subscriptions(
216-
page_token=token) # API request
209+
for subscription in topic.list_subscriptions(): # API request(s)
210+
do_something_with(subscription)
217211
# [END topic_list_subscriptions]
218212

219213
assert sub_names.issuperset(expected_names)

pubsub/google/cloud/pubsub/_gax.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
from google.cloud.exceptions import Conflict
3131
from google.cloud.exceptions import NotFound
3232
from google.cloud.iterator import GAXIterator
33+
from google.cloud.pubsub._helpers import subscription_name_from_path
34+
from google.cloud.pubsub.subscription import Subscription
3335
from google.cloud.pubsub.topic import Topic
3436

3537

@@ -175,16 +177,14 @@ def topic_publish(self, topic_path, messages):
175177
raise
176178
return result.message_ids
177179

178-
def topic_list_subscriptions(self, topic_path, page_size=0,
179-
page_token=None):
180+
def topic_list_subscriptions(self, topic, page_size=0, page_token=None):
180181
"""API call: list subscriptions bound to a topic
181182
182183
See:
183184
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics.subscriptions/list
184185
185-
:type topic_path: str
186-
:param topic_path: fully-qualified path of the topic, in format
187-
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
186+
:type topic: :class:`~google.cloud.pubsub.topic.Topic`
187+
:param topic: The topic that owns the subscriptions.
188188
189189
:type page_size: int
190190
:param page_size: maximum number of subscriptions to return, If not
@@ -195,25 +195,32 @@ def topic_list_subscriptions(self, topic_path, page_size=0,
195195
If not passed, the API will return the first page
196196
of subscriptions.
197197
198-
:rtype: list of strings
199-
:returns: fully-qualified names of subscriptions for the supplied
200-
topic.
201-
:raises: :exc:`google.cloud.exceptions.NotFound` if the topic does not
202-
exist
198+
:rtype: :class:`~google.cloud.iterator.Iterator`
199+
:returns: Iterator of
200+
:class:`~google.cloud.pubsub.subscription.Subscription`
201+
accessible to the current API.
202+
:raises: :exc:`~google.cloud.exceptions.NotFound` if the topic does
203+
not exist.
203204
"""
204205
if page_token is None:
205206
page_token = INITIAL_PAGE
206207
options = CallOptions(page_token=page_token)
208+
topic_path = topic.full_name
207209
try:
208210
page_iter = self._gax_api.list_topic_subscriptions(
209211
topic_path, page_size=page_size, options=options)
210212
except GaxError as exc:
211213
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
212214
raise NotFound(topic_path)
213215
raise
214-
subs = page_iter.next()
215-
token = page_iter.page_token or None
216-
return subs, token
216+
217+
iter_kwargs = {}
218+
if page_size: # page_size can be 0 or explicit None.
219+
iter_kwargs['max_results'] = page_size
220+
iterator = GAXIterator(self._client, page_iter,
221+
_item_to_subscription, **iter_kwargs)
222+
iterator.topic = topic
223+
return iterator
217224

218225

219226
class _SubscriberAPI(object):
@@ -554,7 +561,7 @@ def make_gax_subscriber_api(connection):
554561

555562

556563
def _item_to_topic(iterator, resource):
557-
"""Convert a JSON job to the native object.
564+
"""Convert a protobuf topic to the native object.
558565
559566
:type iterator: :class:`~google.cloud.iterator.Iterator`
560567
:param iterator: The iterator that is currently in use.
@@ -567,3 +574,20 @@ def _item_to_topic(iterator, resource):
567574
"""
568575
return Topic.from_api_repr(
569576
{'name': resource.name}, iterator.client)
577+
578+
579+
def _item_to_subscription(iterator, subscription_path):
580+
"""Convert a subscription name to the native object.
581+
582+
:type iterator: :class:`~google.cloud.iterator.Iterator`
583+
:param iterator: The iterator that is currently in use.
584+
585+
:type subscription_path: str
586+
:param subscription_path: Subscription path returned from the API.
587+
588+
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
589+
:returns: The next subscription in the page.
590+
"""
591+
subscription_name = subscription_name_from_path(
592+
subscription_path, iterator.client.project)
593+
return Subscription(subscription_name, iterator.topic)

pubsub/google/cloud/pubsub/connection.py

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
from google.cloud import connection as base_connection
2121
from google.cloud.environment_vars import PUBSUB_EMULATOR
2222
from google.cloud.iterator import HTTPIterator
23+
from google.cloud.pubsub._helpers import subscription_name_from_path
24+
from google.cloud.pubsub.subscription import Subscription
2325
from google.cloud.pubsub.topic import Topic
2426

2527

@@ -207,16 +209,14 @@ def topic_publish(self, topic_path, messages):
207209
method='POST', path='/%s:publish' % (topic_path,), data=data)
208210
return response['messageIds']
209211

210-
def topic_list_subscriptions(self, topic_path, page_size=None,
211-
page_token=None):
212+
def topic_list_subscriptions(self, topic, page_size=None, page_token=None):
212213
"""API call: list subscriptions bound to a topic
213214
214215
See:
215216
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics.subscriptions/list
216217
217-
:type topic_path: str
218-
:param topic_path: the fully-qualified path of the topic, in format
219-
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
218+
:type topic: :class:`~google.cloud.pubsub.topic.Topic`
219+
:param topic: The topic that owns the subscriptions.
220220
221221
:type page_size: int
222222
:param page_size: maximum number of subscriptions to return, If not
@@ -231,18 +231,17 @@ def topic_list_subscriptions(self, topic_path, page_size=None,
231231
:returns: fully-qualified names of subscriptions for the supplied
232232
topic.
233233
"""
234-
conn = self._connection
235-
params = {}
236-
234+
extra_params = {}
237235
if page_size is not None:
238-
params['pageSize'] = page_size
239-
240-
if page_token is not None:
241-
params['pageToken'] = page_token
236+
extra_params['pageSize'] = page_size
237+
path = '/%s/subscriptions' % (topic.full_name,)
242238

243-
path = '/%s/subscriptions' % (topic_path,)
244-
resp = conn.api_request(method='GET', path=path, query_params=params)
245-
return resp.get('subscriptions', ()), resp.get('nextPageToken')
239+
iterator = HTTPIterator(
240+
client=self._client, path=path,
241+
item_to_value=_item_to_subscription, items_key='subscriptions',
242+
page_token=page_token, extra_params=extra_params)
243+
iterator.topic = topic
244+
return iterator
246245

247246

248247
class _SubscriberAPI(object):
@@ -577,7 +576,7 @@ def _transform_messages_base64(messages, transform, key=None):
577576

578577

579578
def _item_to_topic(iterator, resource):
580-
"""Convert a JSON job to the native object.
579+
"""Convert a JSON topic to the native object.
581580
582581
:type iterator: :class:`~google.cloud.iterator.Iterator`
583582
:param iterator: The iterator that is currently in use.
@@ -589,3 +588,20 @@ def _item_to_topic(iterator, resource):
589588
:returns: The next topic in the page.
590589
"""
591590
return Topic.from_api_repr(resource, iterator.client)
591+
592+
593+
def _item_to_subscription(iterator, subscription_path):
594+
"""Convert a subscription name to the native object.
595+
596+
:type iterator: :class:`~google.cloud.iterator.Iterator`
597+
:param iterator: The iterator that is currently in use.
598+
599+
:type subscription_path: str
600+
:param subscription_path: Subscription path returned from the API.
601+
602+
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
603+
:returns: The next subscription in the page.
604+
"""
605+
subscription_name = subscription_name_from_path(
606+
subscription_path, iterator.client.project)
607+
return Subscription(subscription_name, iterator.topic)

pubsub/google/cloud/pubsub/topic.py

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from google.cloud._helpers import _datetime_to_rfc3339
1818
from google.cloud._helpers import _NOW
1919
from google.cloud.exceptions import NotFound
20-
from google.cloud.pubsub._helpers import subscription_name_from_path
2120
from google.cloud.pubsub._helpers import topic_name_from_path
2221
from google.cloud.pubsub.iam import Policy
2322
from google.cloud.pubsub.subscription import Subscription
@@ -306,21 +305,14 @@ def list_subscriptions(self, page_size=None, page_token=None, client=None):
306305
:param client: the client to use. If not passed, falls back to the
307306
``client`` stored on the current topic.
308307
309-
:rtype: tuple, (list, str)
310-
:returns: list of :class:`~.pubsub.subscription.Subscription`,
311-
plus a "next page token" string: if not None, indicates that
312-
more topics can be retrieved with another call (pass that
313-
value as ``page_token``).
308+
:rtype: :class:`~google.cloud.iterator.Iterator`
309+
:returns: Iterator of
310+
:class:`~google.cloud.pubsub.subscription.Subscription`
311+
accessible to the current topic.
314312
"""
315313
client = self._require_client(client)
316314
api = client.publisher_api
317-
sub_paths, next_token = api.topic_list_subscriptions(
318-
self.full_name, page_size, page_token)
319-
subscriptions = []
320-
for sub_path in sub_paths:
321-
sub_name = subscription_name_from_path(sub_path, self.project)
322-
subscriptions.append(Subscription(sub_name, self))
323-
return subscriptions, next_token
315+
return api.topic_list_subscriptions(self, page_size, page_token)
324316

325317
def get_iam_policy(self, client=None):
326318
"""Fetch the IAM policy for the topic.

pubsub/unit_tests/test__gax.py

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -292,21 +292,28 @@ def test_topic_publish_error(self):
292292
def test_topic_list_subscriptions_no_paging(self):
293293
from google.gax import INITIAL_PAGE
294294
from google.cloud._testing import _GAXPageIterator
295-
response = _GAXPageIterator(
296-
[{'name': self.SUB_PATH, 'topic': self.TOPIC_PATH}])
295+
from google.cloud.pubsub.subscription import Subscription
296+
from google.cloud.pubsub.topic import Topic
297+
298+
local_sub_path = '%s/subscriptions/%s' % (
299+
self.PROJECT_PATH, self.SUB_NAME)
300+
response = _GAXPageIterator([local_sub_path])
297301
gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response)
298302
client = _Client(self.PROJECT)
299303
api = self._makeOne(gax_api, client)
300304

301-
subscriptions, next_token = api.topic_list_subscriptions(
302-
self.TOPIC_PATH)
305+
topic = Topic(self.TOPIC_NAME, client)
306+
iterator = api.topic_list_subscriptions(topic)
307+
subscriptions = list(iterator)
308+
next_token = iterator.next_page_token
303309

310+
self.assertIsNone(next_token)
304311
self.assertEqual(len(subscriptions), 1)
305312
subscription = subscriptions[0]
306-
self.assertIsInstance(subscription, dict)
307-
self.assertEqual(subscription['name'], self.SUB_PATH)
308-
self.assertEqual(subscription['topic'], self.TOPIC_PATH)
309-
self.assertIsNone(next_token)
313+
self.assertIsInstance(subscription, Subscription)
314+
self.assertEqual(subscription.name, self.SUB_NAME)
315+
self.assertEqual(subscription.topic, topic)
316+
self.assertIs(subscription._client, client)
310317

311318
topic_path, page_size, options = (
312319
gax_api._list_topic_subscriptions_called_with)
@@ -316,25 +323,33 @@ def test_topic_list_subscriptions_no_paging(self):
316323

317324
def test_topic_list_subscriptions_with_paging(self):
318325
from google.cloud._testing import _GAXPageIterator
326+
from google.cloud.pubsub.subscription import Subscription
327+
from google.cloud.pubsub.topic import Topic
328+
319329
SIZE = 23
320330
TOKEN = 'TOKEN'
321331
NEW_TOKEN = 'NEW_TOKEN'
332+
local_sub_path = '%s/subscriptions/%s' % (
333+
self.PROJECT_PATH, self.SUB_NAME)
322334
response = _GAXPageIterator(
323-
[{'name': self.SUB_PATH, 'topic': self.TOPIC_PATH}],
324-
page_token=NEW_TOKEN)
335+
[local_sub_path], page_token=NEW_TOKEN)
325336
gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response)
326337
client = _Client(self.PROJECT)
327338
api = self._makeOne(gax_api, client)
328339

329-
subscriptions, next_token = api.topic_list_subscriptions(
330-
self.TOPIC_PATH, page_size=SIZE, page_token=TOKEN)
340+
topic = Topic(self.TOPIC_NAME, client)
341+
iterator = api.topic_list_subscriptions(
342+
topic, page_size=SIZE, page_token=TOKEN)
343+
subscriptions = list(iterator)
344+
next_token = iterator.next_page_token
331345

346+
self.assertEqual(next_token, NEW_TOKEN)
332347
self.assertEqual(len(subscriptions), 1)
333348
subscription = subscriptions[0]
334-
self.assertIsInstance(subscription, dict)
335-
self.assertEqual(subscription['name'], self.SUB_PATH)
336-
self.assertEqual(subscription['topic'], self.TOPIC_PATH)
337-
self.assertEqual(next_token, NEW_TOKEN)
349+
self.assertIsInstance(subscription, Subscription)
350+
self.assertEqual(subscription.name, self.SUB_NAME)
351+
self.assertEqual(subscription.topic, topic)
352+
self.assertIs(subscription._client, client)
338353

339354
name, page_size, options = (
340355
gax_api._list_topic_subscriptions_called_with)
@@ -345,12 +360,15 @@ def test_topic_list_subscriptions_with_paging(self):
345360
def test_topic_list_subscriptions_miss(self):
346361
from google.gax import INITIAL_PAGE
347362
from google.cloud.exceptions import NotFound
363+
from google.cloud.pubsub.topic import Topic
364+
348365
gax_api = _GAXPublisherAPI()
349366
client = _Client(self.PROJECT)
350367
api = self._makeOne(gax_api, client)
351368

352369
with self.assertRaises(NotFound):
353-
api.topic_list_subscriptions(self.TOPIC_PATH)
370+
topic = Topic(self.TOPIC_NAME, client)
371+
api.topic_list_subscriptions(topic)
354372

355373
topic_path, page_size, options = (
356374
gax_api._list_topic_subscriptions_called_with)
@@ -361,12 +379,15 @@ def test_topic_list_subscriptions_miss(self):
361379
def test_topic_list_subscriptions_error(self):
362380
from google.gax import INITIAL_PAGE
363381
from google.gax.errors import GaxError
382+
from google.cloud.pubsub.topic import Topic
383+
364384
gax_api = _GAXPublisherAPI(_random_gax_error=True)
365385
client = _Client(self.PROJECT)
366386
api = self._makeOne(gax_api, client)
367387

368388
with self.assertRaises(GaxError):
369-
api.topic_list_subscriptions(self.TOPIC_PATH)
389+
topic = Topic(self.TOPIC_NAME, client)
390+
api.topic_list_subscriptions(topic)
370391

371392
topic_path, page_size, options = (
372393
gax_api._list_topic_subscriptions_called_with)

0 commit comments

Comments
 (0)