Skip to content

Commit bb26745

Browse files
committed
Merge pull request #1795 from tseaver/pubsub-wrap_gax_subscriber_api
Add GAX-based _SubscriberAPI.
2 parents 14b1143 + 5ecff7b commit bb26745

File tree

3 files changed

+779
-19
lines changed

3 files changed

+779
-19
lines changed

gcloud/pubsub/_gax.py

Lines changed: 276 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from google.gax.errors import GaxError
2020
from google.gax.grpc import exc_to_code
2121
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
22+
from google.pubsub.v1.pubsub_pb2 import PushConfig
2223
from grpc.beta.interfaces import StatusCode
2324
# pylint: enable=import-error
2425

@@ -47,9 +48,9 @@ def list_topics(self, project):
4748
4849
:rtype: tuple, (list, str)
4950
:returns: list of ``Topic`` resource dicts, plus a
50-
"next page token" string: if not None, indicates that
51-
more topics can be retrieved with another call (pass that
52-
value as ``page_token``).
51+
"next page token" string: if not None, indicates that
52+
more topics can be retrieved with another call (pass that
53+
value as ``page_token``).
5354
"""
5455
options = CallOptions(is_page_streaming=False)
5556
path = 'projects/%s' % (project,)
@@ -180,7 +181,279 @@ def topic_list_subscriptions(self, topic_path):
180181
return subs, response.next_page_token
181182

182183

184+
class _SubscriberAPI(object):
185+
"""Helper mapping subscriber-related APIs.
186+
187+
:type gax_api: :class:`google.pubsub.v1.publisher_api.SubscriberApi`
188+
:param gax_api: API object used to make GAX requests.
189+
"""
190+
def __init__(self, gax_api):
191+
self._gax_api = gax_api
192+
193+
def list_subscriptions(self, project):
194+
"""List subscriptions for the project associated with this API.
195+
196+
See:
197+
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list
198+
199+
:type project: string
200+
:param project: project ID
201+
202+
:rtype: tuple, (list, str)
203+
:returns: list of ``Subscription`` resource dicts, plus a
204+
"next page token" string: if not None, indicates that
205+
more topics can be retrieved with another call (pass that
206+
value as ``page_token``).
207+
"""
208+
options = CallOptions(is_page_streaming=False)
209+
path = 'projects/%s' % (project,)
210+
response = self._gax_api.list_subscriptions(path, options)
211+
subscriptions = [_subscription_pb_to_mapping(sub_pb)
212+
for sub_pb in response.subscriptions]
213+
return subscriptions, response.next_page_token
214+
215+
def subscription_create(self, subscription_path, topic_path,
216+
ack_deadline=None, push_endpoint=None):
217+
"""API call: create a subscription
218+
219+
See:
220+
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/create
221+
222+
:type subscription_path: string
223+
:param subscription_path: the fully-qualified path of the new
224+
subscription, in format
225+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
226+
227+
:type topic_path: string
228+
:param topic_path: the fully-qualified path of the topic being
229+
subscribed, in format
230+
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
231+
232+
:type ack_deadline: int, or ``NoneType``
233+
:param ack_deadline: the deadline (in seconds) by which messages pulled
234+
from the back-end must be acknowledged.
235+
236+
:type push_endpoint: string, or ``NoneType``
237+
:param push_endpoint: URL to which messages will be pushed by the
238+
back-end. If not set, the application must pull
239+
messages.
240+
241+
:rtype: dict
242+
:returns: ``Subscription`` resource returned from the API.
243+
"""
244+
if push_endpoint is not None:
245+
push_config = PushConfig(push_endpoint=push_endpoint)
246+
else:
247+
push_config = None
248+
249+
if ack_deadline is None:
250+
ack_deadline = 0
251+
252+
try:
253+
sub_pb = self._gax_api.create_subscription(
254+
subscription_path, topic_path, push_config, ack_deadline)
255+
except GaxError as exc:
256+
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
257+
raise Conflict(topic_path)
258+
raise
259+
return _subscription_pb_to_mapping(sub_pb)
260+
261+
def subscription_get(self, subscription_path):
262+
"""API call: retrieve a subscription
263+
264+
See:
265+
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/get
266+
267+
:type subscription_path: string
268+
:param subscription_path: the fully-qualified path of the subscription,
269+
in format
270+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
271+
272+
:rtype: dict
273+
:returns: ``Subscription`` resource returned from the API.
274+
"""
275+
try:
276+
sub_pb = self._gax_api.get_subscription(subscription_path)
277+
except GaxError as exc:
278+
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
279+
raise NotFound(subscription_path)
280+
raise
281+
return _subscription_pb_to_mapping(sub_pb)
282+
283+
def subscription_delete(self, subscription_path):
284+
"""API call: delete a subscription
285+
286+
See:
287+
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/delete
288+
289+
:type subscription_path: string
290+
:param subscription_path: the fully-qualified path of the subscription,
291+
in format
292+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
293+
"""
294+
try:
295+
self._gax_api.delete_subscription(subscription_path)
296+
except GaxError as exc:
297+
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
298+
raise NotFound(subscription_path)
299+
raise
300+
301+
def subscription_modify_push_config(self, subscription_path,
302+
push_endpoint):
303+
"""API call: update push config of a subscription
304+
305+
See:
306+
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig
307+
308+
:type subscription_path: string
309+
:param subscription_path: the fully-qualified path of the new
310+
subscription, in format
311+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
312+
313+
:type push_endpoint: string, or ``NoneType``
314+
:param push_endpoint: URL to which messages will be pushed by the
315+
back-end. If not set, the application must pull
316+
messages.
317+
"""
318+
push_config = PushConfig(push_endpoint=push_endpoint)
319+
try:
320+
self._gax_api.modify_push_config(subscription_path, push_config)
321+
except GaxError as exc:
322+
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
323+
raise NotFound(subscription_path)
324+
raise
325+
326+
def subscription_pull(self, subscription_path, return_immediately=False,
327+
max_messages=1):
328+
"""API call: retrieve messages for a subscription
329+
330+
See:
331+
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig
332+
333+
:type subscription_path: string
334+
:param subscription_path: the fully-qualified path of the new
335+
subscription, in format
336+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
337+
338+
:type return_immediately: boolean
339+
:param return_immediately: if True, the back-end returns even if no
340+
messages are available; if False, the API
341+
call blocks until one or more messages are
342+
available.
343+
344+
:type max_messages: int
345+
:param max_messages: the maximum number of messages to return.
346+
347+
:rtype: list of dict
348+
:returns: the ``receivedMessages`` element of the response.
349+
"""
350+
try:
351+
response_pb = self._gax_api.pull(
352+
subscription_path, max_messages, return_immediately)
353+
except GaxError as exc:
354+
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
355+
raise NotFound(subscription_path)
356+
raise
357+
return [_received_message_pb_to_mapping(rmpb)
358+
for rmpb in response_pb.received_messages]
359+
360+
def subscription_acknowledge(self, subscription_path, ack_ids):
361+
"""API call: acknowledge retrieved messages
362+
363+
See:
364+
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig
365+
366+
:type subscription_path: string
367+
:param subscription_path: the fully-qualified path of the new
368+
subscription, in format
369+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
370+
371+
:type ack_ids: list of string
372+
:param ack_ids: ack IDs of messages being acknowledged
373+
"""
374+
try:
375+
self._gax_api.acknowledge(subscription_path, ack_ids)
376+
except GaxError as exc:
377+
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
378+
raise NotFound(subscription_path)
379+
raise
380+
381+
def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
382+
ack_deadline):
383+
"""API call: update ack deadline for retrieved messages
384+
385+
See:
386+
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyAckDeadline
387+
388+
:type subscription_path: string
389+
:param subscription_path: the fully-qualified path of the new
390+
subscription, in format
391+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
392+
393+
:type ack_ids: list of string
394+
:param ack_ids: ack IDs of messages being acknowledged
395+
396+
:type ack_deadline: int
397+
:param ack_deadline: the deadline (in seconds) by which messages pulled
398+
from the back-end must be acknowledged.
399+
"""
400+
try:
401+
self._gax_api.modify_ack_deadline(
402+
subscription_path, ack_ids, ack_deadline)
403+
except GaxError as exc:
404+
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
405+
raise NotFound(subscription_path)
406+
raise
407+
408+
183409
def _message_pb_from_dict(message):
184410
"""Helper for :meth:`_PublisherAPI.topic_publish`."""
185411
return PubsubMessage(data=_to_bytes(message['data']),
186412
attributes=message['attributes'])
413+
414+
415+
def _subscription_pb_to_mapping(sub_pb):
416+
"""Helper for :meth:`list_subscriptions`, et aliae
417+
418+
Ideally, would use a function from :mod:`protobuf.json_format`, but
419+
the right one isn't public. See:
420+
https://github.com/google/protobuf/issues/1351
421+
"""
422+
mapping = {
423+
'name': sub_pb.name,
424+
'topic': sub_pb.topic,
425+
'ack_deadline': sub_pb.ack_deadline,
426+
}
427+
if sub_pb.push_config.push_endpoint != '':
428+
mapping['push_config'] = {
429+
'push_endpoint': sub_pb.push_config.push_endpoint,
430+
}
431+
return mapping
432+
433+
434+
def _message_pb_to_mapping(message_pb):
435+
"""Helper for :meth:`pull`, et aliae
436+
437+
Ideally, would use a function from :mod:`protobuf.json_format`, but
438+
the right one isn't public. See:
439+
https://github.com/google/protobuf/issues/1351
440+
"""
441+
return {
442+
'messageId': message_pb.message_id,
443+
'data': message_pb.data,
444+
'attributes': message_pb.attributes,
445+
}
446+
447+
448+
def _received_message_pb_to_mapping(received_message_pb):
449+
"""Helper for :meth:`pull`, et aliae
450+
451+
Ideally, would use a function from :mod:`protobuf.json_format`, but
452+
the right one isn't public. See:
453+
https://github.com/google/protobuf/issues/1351
454+
"""
455+
return {
456+
'ackId': received_message_pb.ack_id,
457+
'message': _message_pb_to_mapping(
458+
received_message_pb.message),
459+
}

gcloud/pubsub/connection.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def subscription_create(self, subscription_path, topic_path,
299299
:type subscription_path: string
300300
:param subscription_path: the fully-qualified path of the new
301301
subscription, in format
302-
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
302+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
303303
304304
:type topic_path: string
305305
:param topic_path: the fully-qualified path of the topic being
@@ -373,7 +373,7 @@ def subscription_modify_push_config(self, subscription_path,
373373
:type subscription_path: string
374374
:param subscription_path: the fully-qualified path of the new
375375
subscription, in format
376-
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
376+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
377377
378378
:type push_endpoint: string, or ``NoneType``
379379
:param push_endpoint: URL to which messages will be pushed by the
@@ -395,7 +395,7 @@ def subscription_pull(self, subscription_path, return_immediately=False,
395395
:type subscription_path: string
396396
:param subscription_path: the fully-qualified path of the new
397397
subscription, in format
398-
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
398+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
399399
400400
:type return_immediately: boolean
401401
:param return_immediately: if True, the back-end returns even if no
@@ -427,7 +427,7 @@ def subscription_acknowledge(self, subscription_path, ack_ids):
427427
:type subscription_path: string
428428
:param subscription_path: the fully-qualified path of the new
429429
subscription, in format
430-
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
430+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
431431
432432
:type ack_ids: list of string
433433
:param ack_ids: ack IDs of messages being acknowledged
@@ -449,7 +449,7 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
449449
:type subscription_path: string
450450
:param subscription_path: the fully-qualified path of the new
451451
subscription, in format
452-
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
452+
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
453453
454454
:type ack_ids: list of string
455455
:param ack_ids: ack IDs of messages being acknowledged

0 commit comments

Comments
 (0)