Skip to content

Commit 0859cc3

Browse files
committed
Using Iterator as response for Pub/Sub list_subscriptions().
Also remove GAX confusion in Pub/Sub between max_results and page_size.
1 parent af5f2d4 commit 0859cc3

3 files changed

Lines changed: 99 additions & 47 deletions

File tree

pubsub/google/cloud/pubsub/_gax.py

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
"""GAX wrapper for Pubsub API requests."""
1616

17+
import functools
18+
1719
from google.cloud.gapic.pubsub.v1.publisher_api import PublisherApi
1820
from google.cloud.gapic.pubsub.v1.subscriber_api import SubscriberApi
1921
from google.gax import CallOptions
@@ -78,12 +80,7 @@ def list_topics(self, project, page_size=0, page_token=None):
7880
path = 'projects/%s' % (project,)
7981
page_iter = self._gax_api.list_topics(
8082
path, page_size=page_size, options=options)
81-
82-
iter_kwargs = {}
83-
if page_size: # page_size can be 0 or explicit None.
84-
iter_kwargs['max_results'] = page_size
85-
return GAXIterator(self._client, page_iter, _item_to_topic,
86-
**iter_kwargs)
83+
return GAXIterator(self._client, page_iter, _item_to_topic)
8784

8885
def topic_create(self, topic_path):
8986
"""API call: create a topic
@@ -215,12 +212,8 @@ def topic_list_subscriptions(self, topic, page_size=0, page_token=None):
215212
raise NotFound(topic_path)
216213
raise
217214

218-
iter_kwargs = {}
219-
if page_size: # page_size can be 0 or explicit None.
220-
iter_kwargs['max_results'] = page_size
221-
iterator = GAXIterator(
222-
self._client, page_iter,
223-
_item_to_subscription_for_topic, **iter_kwargs)
215+
iterator = GAXIterator(self._client, page_iter,
216+
_item_to_subscription_for_topic)
224217
iterator.topic = topic
225218
return iterator
226219

@@ -256,22 +249,25 @@ def list_subscriptions(self, project, page_size=0, page_token=None):
256249
If not passed, the API will return the first page
257250
of subscriptions.
258251
259-
:rtype: tuple, (list, str)
260-
:returns: list of ``Subscription`` resource dicts, plus a
261-
"next page token" string: if not None, indicates that
262-
more topics can be retrieved with another call (pass that
263-
value as ``page_token``).
252+
:rtype: :class:`~google.cloud.iterator.Iterator`
253+
:returns: Iterator of
254+
:class:`~google.cloud.pubsub.subscription.Subscription`
255+
accessible to the current API.
264256
"""
265257
if page_token is None:
266258
page_token = INITIAL_PAGE
267259
options = CallOptions(page_token=page_token)
268260
path = 'projects/%s' % (project,)
269261
page_iter = self._gax_api.list_subscriptions(
270262
path, page_size=page_size, options=options)
271-
subscriptions = [MessageToDict(sub_pb)
272-
for sub_pb in page_iter.next()]
273-
token = page_iter.page_token or None
274-
return subscriptions, token
263+
264+
# We attach a mutable topics dictionary so that as topic
265+
# objects are created by Subscription.from_api_repr, they
266+
# can be re-used by other subscriptions from the same topic.
267+
topics = {}
268+
item_to_value = functools.partial(
269+
_item_to_subscription_for_client, topics=topics)
270+
return GAXIterator(self._client, page_iter, item_to_value)
275271

276272
def subscription_create(self, subscription_path, topic_path,
277273
ack_deadline=None, push_endpoint=None):
@@ -579,3 +575,33 @@ def _item_to_subscription_for_topic(iterator, subscription_path):
579575
subscription_name = subscription_name_from_path(
580576
subscription_path, iterator.client.project)
581577
return Subscription(subscription_name, iterator.topic)
578+
579+
580+
def _item_to_subscription_for_client(iterator, sub_pb, topics):
581+
"""Convert a subscription protobuf to the native object.
582+
583+
.. note::
584+
585+
This method does not have the correct signature to be used as
586+
the ``item_to_value`` argument to
587+
:class:`~google.cloud.iterator.Iterator`. It is intended to be
588+
patched with a mutable topics argument that can be updated
589+
on subsequent calls. For an example, see how the method is
590+
used above in :meth:`_SubscriberAPI.list_subscriptions`.
591+
592+
:type iterator: :class:`~google.cloud.iterator.Iterator`
593+
:param iterator: The iterator that is currently in use.
594+
595+
:type sub_pb: :class:`~google.pubsub.v1.pubsub_pb2.Subscription`
596+
:param sub_pb: A subscription returned from the API.
597+
598+
:type topics: dict
599+
:param topics: A dictionary of topics to be used (and modified)
600+
as new subscriptions are created bound to topics.
601+
602+
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
603+
:returns: The next subscription in the page.
604+
"""
605+
resource = MessageToDict(sub_pb)
606+
return Subscription.from_api_repr(
607+
resource, iterator.client, topics=topics)

pubsub/google/cloud/pubsub/client.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -160,20 +160,14 @@ def list_subscriptions(self, page_size=None, page_token=None):
160160
passed, the API will return the first page of
161161
topics.
162162
163-
:rtype: tuple, (list, str)
164-
:returns: list of :class:`~.pubsub.subscription.Subscription`,
165-
plus a "next page token" string: if not None, indicates that
166-
more topics can be retrieved with another call (pass that
167-
value as ``page_token``).
163+
:rtype: :class:`~google.cloud.iterator.Iterator`
164+
:returns: Iterator of
165+
:class:`~google.cloud.pubsub.subscription.Subscription`
166+
accessible to the current client.
168167
"""
169168
api = self.subscriber_api
170-
resources, next_token = api.list_subscriptions(
169+
return api.list_subscriptions(
171170
self.project, page_size, page_token)
172-
topics = {}
173-
subscriptions = [Subscription.from_api_repr(resource, self,
174-
topics=topics)
175-
for resource in resources]
176-
return subscriptions, next_token
177171

178172
def topic(self, name, timestamp_messages=False):
179173
"""Creates a topic bound to the current client.

pubsub/google/cloud/pubsub/connection.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Create / interact with Google Cloud Pub/Sub connections."""
1616

1717
import base64
18+
import functools
1819
import os
1920

2021
from google.cloud import connection as base_connection
@@ -274,24 +275,26 @@ def list_subscriptions(self, project, page_size=None, page_token=None):
274275
If not passed, the API will return the first page
275276
of subscriptions.
276277
277-
:rtype: tuple, (list, str)
278-
:returns: list of ``Subscription`` resource dicts, plus a
279-
"next page token" string: if not None, indicates that
280-
more subscriptions can be retrieved with another call (pass
281-
that value as ``page_token``).
278+
:rtype: :class:`~google.cloud.iterator.Iterator`
279+
:returns: Iterator of
280+
:class:`~google.cloud.pubsub.subscription.Subscription`
281+
accessible to the current API.
282282
"""
283-
conn = self._connection
284-
params = {}
285-
283+
extra_params = {}
286284
if page_size is not None:
287-
params['pageSize'] = page_size
288-
289-
if page_token is not None:
290-
params['pageToken'] = page_token
291-
285+
extra_params['pageSize'] = page_size
292286
path = '/projects/%s/subscriptions' % (project,)
293-
resp = conn.api_request(method='GET', path=path, query_params=params)
294-
return resp.get('subscriptions', ()), resp.get('nextPageToken')
287+
288+
# We attach a mutable topics dictionary so that as topic
289+
# objects are created by Subscription.from_api_repr, they
290+
# can be re-used by other subscriptions from the same topic.
291+
topics = {}
292+
item_to_value = functools.partial(
293+
_item_to_subscription_for_client, topics=topics)
294+
return HTTPIterator(
295+
client=self._client, path=path, item_to_value=item_to_value,
296+
items_key='subscriptions', page_token=page_token,
297+
extra_params=extra_params)
295298

296299
def subscription_create(self, subscription_path, topic_path,
297300
ack_deadline=None, push_endpoint=None):
@@ -607,3 +610,32 @@ def _item_to_subscription_for_topic(iterator, subscription_path):
607610
subscription_name = subscription_name_from_path(
608611
subscription_path, iterator.client.project)
609612
return Subscription(subscription_name, iterator.topic)
613+
614+
615+
def _item_to_subscription_for_client(iterator, resource, topics):
616+
"""Convert a subscription to the native object.
617+
618+
.. note::
619+
620+
This method does not have the correct signature to be used as
621+
the ``item_to_value`` argument to
622+
:class:`~google.cloud.iterator.Iterator`. It is intended to be
623+
patched with a mutable topics argument that can be updated
624+
on subsequent calls. For an example, see how the method is
625+
used above in :meth:`_SubscriberAPI.list_subscriptions`.
626+
627+
:type iterator: :class:`~google.cloud.iterator.Iterator`
628+
:param iterator: The iterator that is currently in use.
629+
630+
:type resource: dict
631+
:param resource: A subscription returned from the API.
632+
633+
:type topics: dict
634+
:param topics: A dictionary of topics to be used (and modified)
635+
as new subscriptions are created bound to topics.
636+
637+
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
638+
:returns: The next subscription in the page.
639+
"""
640+
return Subscription.from_api_repr(
641+
resource, iterator.client, topics=topics)

0 commit comments

Comments
 (0)