Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,32 @@ def bytes(self):

def add(self, items):
"""Add messages to be managed by the leaser."""
for item in items:
# Add the ack ID to the set of managed ack IDs, and increment
# the size counter.
if item.ack_id not in self._leased_messages:
self._leased_messages[item.ack_id] = _LeasedMessage(
added_time=time.time(), size=item.byte_size
)
self._bytes += item.byte_size
else:
_LOGGER.debug("Message %s is already lease managed", item.ack_id)
with self._operational_lock:
for item in items:
# Add the ack ID to the set of managed ack IDs, and increment
# the size counter.
if item.ack_id not in self._leased_messages:
self._leased_messages[item.ack_id] = _LeasedMessage(
added_time=time.time(), size=item.byte_size
)
self._bytes += item.byte_size
else:
_LOGGER.debug("Message %s is already lease managed", item.ack_id)

def remove(self, items):
"""Remove messages from lease management."""
# Remove the ack ID from lease management, and decrement the
# byte counter.
for item in items:
if self._leased_messages.pop(item.ack_id, None) is not None:
self._bytes -= item.byte_size
else:
_LOGGER.debug("Item %s was not managed.", item.ack_id)

if self._bytes < 0:
_LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes)
self._bytes = 0
with self._operational_lock:
# Remove the ack ID from lease management, and decrement the
# byte counter.
for item in items:
if self._leased_messages.pop(item.ack_id, None) is not None:
self._bytes -= item.byte_size
else:
_LOGGER.debug("Item %s was not managed.", item.ack_id)

if self._bytes < 0:
_LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes)
self._bytes = 0

def maintain_leases(self):
"""Maintain all of the leases being managed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import grpc
import six
from six.moves import queue

from google.api_core import bidi
from google.api_core import exceptions
Expand Down Expand Up @@ -52,24 +53,35 @@ def _maybe_wrap_exception(exception):
return exception


def _wrap_callback_errors(callback, on_callback_error, message):
"""Wraps a user callback so that if an exception occurs the message is
def _wrap_callback_errors(callback, on_callback_error, messages):
"""Wrap a user callback so that if an exception occurs the messages are
nacked.

Args:
callback (Callable[None, Message]): The user callback.
message (~Message): The Pub/Sub message.
callback (Callable[Union[Message, List[Message]]]): The user callback.
on_callback_error (Callable[Exception]): The handler to invoke if
unhandled error occurs in ``callback``.
messages (Union[~Message, List[~Message]]): The Pub/Sub message(s).
"""
try:
callback(message)
callback(messages)
except Exception as exc:
# Note: the likelihood of this failing is extremely low. This just adds
# a message to a queue, so if this doesn't work the world is in an
# unrecoverable state and this thread should just bail.
_LOGGER.exception(
"Top-level exception occurred in callback while processing a message"
"Top-level exception occurred in callback while processing message(s)"
)
message.nack()

if isinstance(messages, google.cloud.pubsub_v1.subscriber.message.Message):
messages = [messages]

for msg in messages:
try:
msg.nack()
except google.cloud.pubsub_v1.subscriber.message.AckStatusSentError:
pass # this message does not have to be NACK-ed

on_callback_error(exc)


Expand Down Expand Up @@ -116,6 +128,12 @@ def __init__(
else:
self._scheduler = scheduler

# A FIFO queue for the messages that have been received from the server,
# but not yet added to the lease management (and not sent to user callback),
# because the FlowControl limits have been hit.
self._messages_on_hold = queue.Queue()
self._pause_resume_lock = threading.Lock()

# The threads created in ``.open()``.
self._dispatcher = None
self._leaser = None
Expand Down Expand Up @@ -211,26 +229,82 @@ def add_close_callback(self, callback):

def maybe_pause_consumer(self):
"""Check the current load and pause the consumer if needed."""
if self.load >= 1.0:
if self._consumer is not None and not self._consumer.is_paused:
_LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load)
self._consumer.pause()
with self._pause_resume_lock:
if self.load >= 1.0:
if self._consumer is not None and not self._consumer.is_paused:
_LOGGER.debug(
"Message backlog over load at %.2f, pausing.", self.load
)
self._consumer.pause()

def maybe_resume_consumer(self):
"""Check the current load and resume the consumer if needed."""
# If we have been paused by flow control, check and see if we are
# back within our limits.
#
# In order to not thrash too much, require us to have passed below
# the resume threshold (80% by default) of each flow control setting
# before restarting.
if self._consumer is None or not self._consumer.is_paused:
"""Check the load and held messages and resume the consumer if needed.

If there are messages held internally, release those messages before
resuming the consumer. That will avoid leaser overload.
"""
with self._pause_resume_lock:
# If we have been paused by flow control, check and see if we are
# back within our limits.
#
# In order to not thrash too much, require us to have passed below
# the resume threshold (80% by default) of each flow control setting
# before restarting.
if self._consumer is None or not self._consumer.is_paused:
return

_LOGGER.debug("Current load: %.2f", self.load)

# Before maybe resuming the background consumer, release any messages
# currently on hold, if the current load allows for it.
self._maybe_release_messages()

if self.load < self.flow_control.resume_threshold:
_LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
self._consumer.resume()
else:
_LOGGER.debug("Did not resume, current load is %.2f.", self.load)

def _maybe_release_messages(self):
"""Release (some of) the held messages if the current load allows for it.

The method tries to release as many messages as the current leaser load
would allow. Each released message is added to the lease management,
and the user callback is scheduled for them.

If there are currently no messageges on hold, or if the leaser is
already overloaded, this method is effectively a no-op.
"""
released_messages = []

while True:
if self.load >= 1.0:
break # already overloaded

try:
msg = self._messages_on_hold.get_nowait()
except queue.Empty:
break

released_messages.append(msg)

self.leaser.add(
[requests.LeaseRequest(ack_id=msg.ack_id, byte_size=msg.size)]
)
_LOGGER.debug(
"Released held message to leaser, still on hold %s.",
self._messages_on_hold.qsize(),
)

if not released_messages:
return

if self.load < self.flow_control.resume_threshold:
self._consumer.resume()
else:
_LOGGER.debug("Did not resume, current load is %s", self.load)
_LOGGER.debug(
"Scheduling %s for %s released message(s).",
"batch callback" if self._batch_callback else "callbacks",
len(released_messages),
)
self._schedule_callbacks(released_messages)

def _send_unary_request(self, request):
"""Send a request using a separate unary request instead of over the
Expand Down Expand Up @@ -300,13 +374,24 @@ def heartbeat(self):
if self._rpc is not None and self._rpc.is_active:
self._rpc.send(types.StreamingPullRequest())

def open(self, callback, on_callback_error):
def open(self, callback, batch, on_callback_error):
"""Begin consuming messages.

Args:
callback (Callable[None, google.cloud.pubsub_v1.message.Message]):
A callback that will be called for each message received on the
callback (
Callable[
Union[
google.cloud.pubsub_v1.message.Message,
List[google.cloud.pubsub_v1.message.Message],
]
]
):
A callback that will be called for messages received on the
stream.
batch (bool): If ``False``, invoke ``callback`` for each individual
message. If ``True``, invoke ``callback`` for each group of
messages received in a single server response (may be less than
a full group depending on the ``flow_control`` limits).
on_callback_error (Callable[Exception]):
A callable that will be called if an exception is raised in
the provided `callback`.
Expand All @@ -320,6 +405,7 @@ def open(self, callback, on_callback_error):
self._callback = functools.partial(
_wrap_callback_errors, callback, on_callback_error
)
self._batch_callback = batch

# Create the RPC
self._rpc = bidi.ResumableBidiRpc(
Expand Down Expand Up @@ -429,11 +515,14 @@ def _on_response(self, response):
redelivered multiple times.

After the messages have all had their ack deadline updated, execute
the callback for each message using the executor.
the callback (individually or in a batch) using the executor for all
messages that have been added to the lease management, i.e. not put on
hold due to leaser overload.
"""

_LOGGER.debug(
"Scheduling callbacks for %s messages.", len(response.received_messages)
"Processing %s received message(s), currenty on hold %s.",
len(response.received_messages),
self._messages_on_hold.qsize(),
)

# Immediately modack the messages we received, as this tells the server
Expand All @@ -443,12 +532,51 @@ def _on_response(self, response):
for message in response.received_messages
]
self._dispatcher.modify_ack_deadline(items)

invoke_callbacks_for = []

for received_message in response.received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message, received_message.ack_id, self._scheduler.queue
received_message.message,
received_message.ack_id,
self._scheduler.queue,
autolease=False,
)
# TODO: Immediately lease instead of using the callback queue.
self._scheduler.schedule(self._callback, message)
if self.load < 1.0:
req = requests.LeaseRequest(
ack_id=message.ack_id, byte_size=message.size
)
self.leaser.add([req])
invoke_callbacks_for.append(message)
self.maybe_pause_consumer()
else:
self._messages_on_hold.put(message)

_LOGGER.debug(
"Scheduling %s for %s new messages, new total on hold %s.",
"batch callback" if self._batch_callback else "callbacks",
len(invoke_callbacks_for),
self._messages_on_hold.qsize(),
)
self._schedule_callbacks(invoke_callbacks_for)

def _schedule_callbacks(self, messages):
"""Schedule callbacks for given messages for an async execution.

If the streaming pull manager was opened with `batch` == True, a single
batch callback will be scheduled for all given messages. Conversely, if
`batch` == False, a callback will be scheduled once for each message
in `messages`.

Args:
messages (Union[~Message, List[~Message]]): The Pub/Sub message(s)
to schedule the callbacks for.
"""
if self._batch_callback:
self._scheduler.schedule(self._callback, messages)
else:
for msg in messages:
self._scheduler.schedule(self._callback, msg)

def _should_recover(self, exception):
"""Determine if an error on the RPC stream should be recovered.
Expand Down
44 changes: 32 additions & 12 deletions pubsub/google/cloud/pubsub_v1/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,25 @@ def api(self):
"""The underlying gapic API client."""
return self._api

def subscribe(self, subscription, callback, flow_control=(), scheduler=None):
def subscribe(
self, subscription, callback, batch=False, flow_control=(), scheduler=None
):
"""Asynchronously start receiving messages on a given subscription.

This method starts a background thread to begin pulling messages from
a Pub/Sub subscription and scheduling them to be processed using the
provided ``callback``.

The ``callback`` will be called with an individual
:class:`google.cloud.pubsub_v1.subscriber.message.Message`. It is the
responsibility of the callback to either call ``ack()`` or ``nack()``
on the message when it finished processing. If an exception occurs in
the callback during processing, the exception is logged and the message
is ``nack()`` ed.
If ``batch`` is False (default), the ``callback`` will be called with an
individual :class:`google.cloud.pubsub_v1.subscriber.message.Message`.
If ``batch`` is True, the ``callback`` will be called with a list of
all messages received in a single response from the server, albeit
within limits imposed by the ``flow_control`` settings.

It is the responsibility of the callback to either call ``ack()`` or
``nack()`` on the message(s) when it finishes processing. If an exception
occurs in the callback during processing, the exception is logged and the
messages are ``nack()`` ed.

The ``flow_control`` argument can be used to control the rate of at
which messages are pulled. The settings are relatively conservative by
Expand Down Expand Up @@ -186,10 +192,22 @@ def callback(message):
subscription (str): The name of the subscription. The
subscription should have already been created (for example,
by using :meth:`create_subscription`).
callback (Callable[~google.cloud.pubsub_v1.subscriber.message.Message]):
The callback function. This function receives the message as
its only argument and will be called from a different thread/
process depending on the scheduling strategy.
callback ( \
Callable[ \
Union[ \
~google.cloud.pubsub_v1.subscriber.message.Message, \
List[~google.cloud.pubsub_v1.subscriber.message.Message], \
] \
] \
):
The callback function. This function receives the message (or
a list of messages if ``batch`` is ``True``) as its only argument
and will be called from a different thread/process depending on the
scheduling strategy.
batch (bool): If ``False`` (default), invoke ``callback`` for each
individual received message. If ``True``, invoke ``callback``
for each _group_ of messages received in a single server response
(within limits imposed by the ``flow_control`` settings).
flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow control
settings. Use this to prevent situations where you are
inundated with too many messages at once.
Expand All @@ -209,6 +227,8 @@ def callback(message):

future = futures.StreamingPullFuture(manager)

manager.open(callback=callback, on_callback_error=future.set_exception)
manager.open(
callback=callback, batch=batch, on_callback_error=future.set_exception
)

return future
Loading