Skip to content

Commit bae0f75

Browse files
author
Jon Wayne Parrott
authored
Add pending request backpressure for subscriber. (googleapis#4892)
This change includes the consumer's pending request backlog in the policy's load calculations. This allows the policy to pause (and resume) the response stream if there are a large number of outstanding requests to be send on the stream. This additionally adds `max_requests` to `FlowControl` to allow tweaking this number. Resolves: googleapis#4792 Related: googleapis#4841
1 parent 929a86b commit bae0f75

7 files changed

Lines changed: 95 additions & 49 deletions

File tree

pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ def __init__(self):
187187
def active(self):
188188
"""bool: Indicates if the consumer is active.
189189
190+
*Active* means that the stream is open and that it is possible to
191+
send and receive messages. This is distinct from *pausing* which just
192+
pauses *response* consumption.
193+
190194
This is intended to be an implementation independent way of indicating
191195
that the consumer is stopped. (E.g. so a policy that owns a consumer
192196
doesn't need to know what a ``threading.Event`` is.)
@@ -202,6 +206,14 @@ def send_request(self, request):
202206
with self._put_lock:
203207
self._request_queue.put(request)
204208

209+
@property
210+
def pending_requests(self):
211+
"""int: An approximate count of the outstanding requests.
212+
213+
This can be used to determine if the consumer should be paused if there
214+
are too many outstanding requests."""
215+
return self._request_queue.qsize()
216+
205217
def _request_generator_thread(self, policy):
206218
"""Generate requests for the stream.
207219
@@ -231,8 +243,9 @@ def _request_generator_thread(self, policy):
231243
_LOGGER.debug('Request generator signaled to stop.')
232244
break
233245

234-
_LOGGER.debug('Sending request:\n%r', request)
246+
_LOGGER.debug('Sending request on stream')
235247
yield request
248+
policy.on_request(request)
236249

237250
def _stop_request_generator(self, request_generator, response_generator):
238251
"""Ensure a request generator is closed.
@@ -325,7 +338,7 @@ def _blocking_consume(self, policy):
325338
# checks to make sure we're not exiting before opening a new
326339
# stream.
327340
if self._stopped.is_set():
328-
_LOGGER.debug('Event signalled consumer exit.')
341+
_LOGGER.debug('Event signaled consumer exit.')
329342
break
330343

331344
request_generator = self._request_generator_thread(policy)
@@ -334,14 +347,14 @@ def _blocking_consume(self, policy):
334347
response_generator, self._can_consume)
335348
try:
336349
for response in responses:
337-
_LOGGER.debug('Received response:\n%r', response)
350+
_LOGGER.debug('Received response on stream')
338351
policy.on_response(response)
339352

340353
# If the loop above exits without an exception, then the
341354
# request stream terminated cleanly, which should only happen
342355
# when it was signaled to do so by stop_consuming. In this
343356
# case, break out of the while loop and exit this thread.
344-
_LOGGER.debug('Clean RPC loop exit signalled consumer exit.')
357+
_LOGGER.debug('Clean RPC loop exit signaled consumer exit.')
345358
break
346359
except Exception as exc:
347360
recover = policy.on_exception(exc)
@@ -364,7 +377,8 @@ def pause(self):
364377
365378
This will clear the ``_can_consume`` event which is checked
366379
every time :meth:`_blocking_consume` consumes a response from the
367-
bidirectional streaming pull.
380+
bidirectional streaming pull. *requests* can still be sent along
381+
the stream.
368382
369383
Complement to :meth:`resume`.
370384
"""

pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,21 @@ def _load(self):
156156
return max([
157157
len(self.managed_ack_ids) / self.flow_control.max_messages,
158158
self._bytes / self.flow_control.max_bytes,
159+
self._consumer.pending_requests / self.flow_control.max_requests
159160
])
160161

162+
def _maybe_resume_consumer(self):
163+
"""Check the current load and resume the consumer if needed."""
164+
# If we have been paused by flow control, check and see if we are
165+
# back within our limits.
166+
#
167+
# In order to not thrash too much, require us to have passed below
168+
# the resume threshold (80% by default) of each flow control setting
169+
# before restarting.
170+
if (self._consumer.paused and
171+
self._load < self.flow_control.resume_threshold):
172+
self._consumer.resume()
173+
161174
def ack(self, ack_id, time_to_ack=None, byte_size=None):
162175
"""Acknowledge the message corresponding to the given ack_id.
163176
@@ -216,15 +229,7 @@ def drop(self, ack_id, byte_size):
216229
'Bytes was unexpectedly negative: %d', self._bytes)
217230
self._bytes = 0
218231

219-
# If we have been paused by flow control, check and see if we are
220-
# back within our limits.
221-
#
222-
# In order to not thrash too much, require us to have passed below
223-
# the resume threshold (80% by default) of each flow control setting
224-
# before restarting.
225-
if (self._consumer.paused and
226-
self._load < self.flow_control.resume_threshold):
227-
self._consumer.resume()
232+
self._maybe_resume_consumer()
228233

229234
def get_initial_request(self, ack_queue=False):
230235
"""Return the initial request.
@@ -397,6 +402,18 @@ def on_exception(self, exception):
397402
"""
398403
raise NotImplementedError
399404

405+
def on_request(self, request):
406+
"""Called whenever a request has been sent to gRPC.
407+
408+
This allows the policy to measure the rate of requests sent along the
409+
stream and apply backpressure by pausing or resuming the consumer
410+
if needed.
411+
412+
Args:
413+
request (Any): The protobuf request that was sent to gRPC.
414+
"""
415+
self._maybe_resume_consumer()
416+
400417
@abc.abstractmethod
401418
def on_response(self, response):
402419
"""Process a response from gRPC.

pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import sys
2020
import threading
2121

22-
import grpc
2322
from six.moves import queue as queue_mod
2423

2524
from google.cloud.pubsub_v1 import types
@@ -33,19 +32,6 @@
3332
_CALLBACK_WORKER_NAME = 'Thread-Consumer-CallbackRequestsWorker'
3433

3534

36-
def _callback_completed(future):
37-
"""Simple callback that just logs a future's result.
38-
39-
Used on completion of processing a message received by a
40-
subscriber.
41-
42-
Args:
43-
future (concurrent.futures.Future): A future returned
44-
from :meth:`~concurrent.futures.Executor.submit`.
45-
"""
46-
_LOGGER.debug('Result: %s', future.result())
47-
48-
4935
def _do_nothing_callback(message):
5036
"""Default callback for messages received by subscriber.
5137
@@ -332,8 +318,7 @@ def on_response(self, response):
332318
"""
333319
for msg in response.received_messages:
334320
_LOGGER.debug(
335-
'Using %s to process new message received:\n%r',
336-
self._callback, msg)
321+
'Using %s to process message with ack_id %s.',
322+
self._callback, msg.ack_id)
337323
message = Message(msg.message, msg.ack_id, self._request_queue)
338-
future = self._executor.submit(self._callback, message)
339-
future.add_done_callback(_callback_completed)
324+
self._executor.submit(self._callback, message)

pubsub/google/cloud/pubsub_v1/types.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,13 @@
5353
# The defaults should be fine for most use cases.
5454
FlowControl = collections.namedtuple(
5555
'FlowControl',
56-
['max_bytes', 'max_messages', 'resume_threshold'],
56+
['max_bytes', 'max_messages', 'resume_threshold', 'max_requests'],
5757
)
5858
FlowControl.__new__.__defaults__ = (
5959
psutil.virtual_memory().total * 0.2, # max_bytes: 20% of total RAM
6060
float('inf'), # max_messages: no limit
6161
0.8, # resume_threshold: 80%
62+
100, # max_requests: 100
6263
)
6364

6465

pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def test_blocking_consume_when_exiting(_LOGGER):
8181
# Make sure method cleanly exits.
8282
assert consumer._blocking_consume(None) is None
8383

84-
_LOGGER.debug.assert_called_once_with('Event signalled consumer exit.')
84+
_LOGGER.debug.assert_called_once_with('Event signaled consumer exit.')
8585

8686

8787
class OnException(object):

pubsub/tests/unit/pubsub_v1/subscriber/test_policy_base.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,30 @@ def test_drop_below_threshold():
169169
assert consumer.paused is False
170170

171171

172-
def test_load():
172+
def test_on_request_below_threshold():
173+
"""Establish that we resume a paused subscription when the pending
174+
requests count is below threshold."""
175+
flow_control = types.FlowControl(max_requests=100)
176+
policy = create_policy(flow_control=flow_control)
177+
consumer = policy._consumer
178+
179+
assert consumer.paused is True
180+
181+
pending_requests_patch = mock.patch.object(
182+
consumer.__class__, 'pending_requests', new_callable=mock.PropertyMock)
183+
with pending_requests_patch as pending_requests:
184+
# should still be paused, not under the threshold.
185+
pending_requests.return_value = 90
186+
policy.on_request(None)
187+
assert consumer.paused is True
188+
189+
# should unpause, we're under the resume threshold
190+
pending_requests.return_value = 50
191+
policy.on_request(None)
192+
assert consumer.paused is False
193+
194+
195+
def test_load_w_lease():
173196
flow_control = types.FlowControl(max_messages=10, max_bytes=1000)
174197
policy = create_policy(flow_control=flow_control)
175198
consumer = policy._consumer
@@ -191,6 +214,26 @@ def test_load():
191214
pause.assert_called_once_with()
192215

193216

217+
def test_load_w_requests():
218+
flow_control = types.FlowControl(max_bytes=100, max_requests=100)
219+
policy = create_policy(flow_control=flow_control)
220+
consumer = policy._consumer
221+
222+
pending_requests_patch = mock.patch.object(
223+
consumer.__class__, 'pending_requests', new_callable=mock.PropertyMock)
224+
with pending_requests_patch as pending_requests:
225+
pending_requests.return_value = 0
226+
assert policy._load == 0
227+
228+
pending_requests.return_value = 100
229+
print(consumer.pending_requests)
230+
assert policy._load == 1
231+
232+
# If bytes count is higher, it should return that.
233+
policy._bytes = 110
234+
assert policy._load == 1.1
235+
236+
194237
def test_modify_ack_deadline():
195238
policy = create_policy()
196239
with mock.patch.object(policy._consumer, 'send_request') as send_request:

pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
from google.cloud.pubsub_v1 import subscriber
2727
from google.cloud.pubsub_v1 import types
28-
from google.cloud.pubsub_v1.subscriber import _helper_threads
2928
from google.cloud.pubsub_v1.subscriber import message
3029
from google.cloud.pubsub_v1.subscriber.futures import Future
3130
from google.cloud.pubsub_v1.subscriber.policy import thread
@@ -229,16 +228,3 @@ def test_on_response():
229228
for call in submit_calls:
230229
assert call[1][0] == callback
231230
assert isinstance(call[1][1], message.Message)
232-
233-
add_done_callback_calls = [
234-
m for m in future.method_calls if m[0] == 'add_done_callback']
235-
assert len(add_done_callback_calls) == 2
236-
for call in add_done_callback_calls:
237-
assert call[1][0] == thread._callback_completed
238-
239-
240-
def test__callback_completed():
241-
future = mock.Mock()
242-
thread._callback_completed(future)
243-
result_calls = [m for m in future.method_calls if m[0] == 'result']
244-
assert len(result_calls) == 1

0 commit comments

Comments
 (0)