|
14 | 14 |
|
15 | 15 | """GAX wrapper for Pubsub API requests.""" |
16 | 16 |
|
| 17 | +import functools |
| 18 | + |
17 | 19 | from google.cloud.gapic.pubsub.v1.publisher_api import PublisherApi |
18 | 20 | from google.cloud.gapic.pubsub.v1.subscriber_api import SubscriberApi |
19 | 21 | from google.gax import CallOptions |
@@ -78,12 +80,7 @@ def list_topics(self, project, page_size=0, page_token=None): |
78 | 80 | path = 'projects/%s' % (project,) |
79 | 81 | page_iter = self._gax_api.list_topics( |
80 | 82 | path, page_size=page_size, options=options) |
81 | | - |
82 | | - iter_kwargs = {} |
83 | | - if page_size: # page_size can be 0 or explicit None. |
84 | | - iter_kwargs['max_results'] = page_size |
85 | | - return GAXIterator(self._client, page_iter, _item_to_topic, |
86 | | - **iter_kwargs) |
| 83 | + return GAXIterator(self._client, page_iter, _item_to_topic) |
87 | 84 |
|
88 | 85 | def topic_create(self, topic_path): |
89 | 86 | """API call: create a topic |
@@ -215,12 +212,8 @@ def topic_list_subscriptions(self, topic, page_size=0, page_token=None): |
215 | 212 | raise NotFound(topic_path) |
216 | 213 | raise |
217 | 214 |
|
218 | | - iter_kwargs = {} |
219 | | - if page_size: # page_size can be 0 or explicit None. |
220 | | - iter_kwargs['max_results'] = page_size |
221 | | - iterator = GAXIterator( |
222 | | - self._client, page_iter, |
223 | | - _item_to_subscription_for_topic, **iter_kwargs) |
| 215 | + iterator = GAXIterator(self._client, page_iter, |
| 216 | + _item_to_subscription_for_topic) |
224 | 217 | iterator.topic = topic |
225 | 218 | return iterator |
226 | 219 |
|
@@ -256,22 +249,25 @@ def list_subscriptions(self, project, page_size=0, page_token=None): |
256 | 249 | If not passed, the API will return the first page |
257 | 250 | of subscriptions. |
258 | 251 |
|
259 | | - :rtype: tuple, (list, str) |
260 | | - :returns: list of ``Subscription`` resource dicts, plus a |
261 | | - "next page token" string: if not None, indicates that |
262 | | - more topics can be retrieved with another call (pass that |
263 | | - value as ``page_token``). |
| 252 | + :rtype: :class:`~google.cloud.iterator.Iterator` |
| 253 | + :returns: Iterator of |
| 254 | + :class:`~google.cloud.pubsub.subscription.Subscription` |
| 255 | + accessible to the current API. |
264 | 256 | """ |
265 | 257 | if page_token is None: |
266 | 258 | page_token = INITIAL_PAGE |
267 | 259 | options = CallOptions(page_token=page_token) |
268 | 260 | path = 'projects/%s' % (project,) |
269 | 261 | page_iter = self._gax_api.list_subscriptions( |
270 | 262 | path, page_size=page_size, options=options) |
271 | | - subscriptions = [MessageToDict(sub_pb) |
272 | | - for sub_pb in page_iter.next()] |
273 | | - token = page_iter.page_token or None |
274 | | - return subscriptions, token |
| 263 | + |
| 264 | + # We attach a mutable topics dictionary so that as topic |
| 265 | + # objects are created by Subscription.from_api_repr, they |
| 266 | + # can be re-used by other subscriptions from the same topic. |
| 267 | + topics = {} |
| 268 | + item_to_value = functools.partial( |
| 269 | + _item_to_subscription_for_client, topics=topics) |
| 270 | + return GAXIterator(self._client, page_iter, item_to_value) |
275 | 271 |
|
276 | 272 | def subscription_create(self, subscription_path, topic_path, |
277 | 273 | ack_deadline=None, push_endpoint=None): |
@@ -579,3 +575,33 @@ def _item_to_subscription_for_topic(iterator, subscription_path): |
579 | 575 | subscription_name = subscription_name_from_path( |
580 | 576 | subscription_path, iterator.client.project) |
581 | 577 | return Subscription(subscription_name, iterator.topic) |
| 578 | + |
| 579 | + |
| 580 | +def _item_to_subscription_for_client(iterator, sub_pb, topics): |
| 581 | + """Convert a subscription protobuf to the native object. |
| 582 | +
|
| 583 | + .. note:: |
| 584 | +
|
| 585 | + This method does not have the correct signature to be used as |
| 586 | + the ``item_to_value`` argument to |
| 587 | + :class:`~google.cloud.iterator.Iterator`. It is intended to be |
| 588 | + patched with a mutable topics argument that can be updated |
| 589 | + on subsequent calls. For an example, see how the method is |
| 590 | + used above in :meth:`_SubscriberAPI.list_subscriptions`. |
| 591 | +
|
| 592 | + :type iterator: :class:`~google.cloud.iterator.Iterator` |
| 593 | + :param iterator: The iterator that is currently in use. |
| 594 | +
|
| 595 | + :type sub_pb: :class:`~google.pubsub.v1.pubsub_pb2.Subscription` |
| 596 | + :param sub_pb: A subscription returned from the API. |
| 597 | +
|
| 598 | + :type topics: dict |
| 599 | + :param topics: A dictionary of topics to be used (and modified) |
| 600 | + as new subscriptions are created bound to topics. |
| 601 | +
|
| 602 | + :rtype: :class:`~google.cloud.pubsub.subscription.Subscription` |
| 603 | + :returns: The next subscription in the page. |
| 604 | + """ |
| 605 | + resource = MessageToDict(sub_pb) |
| 606 | + return Subscription.from_api_repr( |
| 607 | + resource, iterator.client, topics=topics) |
0 commit comments