Skip to content

Commit ef2458a

Browse files
author
Jon Wayne Parrott
authored
Add ability for subscriber to batch requests. (googleapis#4895)
The size of the batches and the rate at which they are submitted is controlled by two new flow control settings, `max_request_batch_size` and `max_request_batch_latency`.
1 parent e3c4002 commit ef2458a

File tree

9 files changed

+378
-190
lines changed

9 files changed

+378
-190
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313
# limitations under the License.
1414

1515
import logging
16+
import time
1617
import uuid
1718

19+
from six.moves import queue
20+
1821

1922
__all__ = (
2023
'QueueCallbackWorker',
@@ -31,6 +34,36 @@
3134
STOP = uuid.uuid4()
3235

3336

37+
def _get_many(queue_, max_items=None, max_latency=0):
38+
"""Get multiple items from a Queue.
39+
40+
Gets at least one (blocking) and at most ``max_items`` items
41+
(non-blocking) from a given Queue. Does not mark the items as done.
42+
43+
Args:
44+
queue_ (~queue.Queue`): The Queue to get items from.
45+
max_items (int): The maximum number of items to get. If ``None``, then
46+
all available items in the queue are returned.
47+
max_latency (float): The maximum number of seconds to wait for more
48+
than one item from a queue. This number includes the time required
49+
to retrieve the first item.
50+
51+
Returns:
52+
Sequence[Any]: A sequence of items retrieved from the queue.
53+
"""
54+
start = time.time()
55+
# Always return at least one item.
56+
items = [queue_.get()]
57+
while max_items is None or len(items) < max_items:
58+
try:
59+
elapsed = time.time() - start
60+
timeout = max(0, max_latency - elapsed)
61+
items.append(queue_.get(timeout=timeout))
62+
except queue.Empty:
63+
break
64+
return items
65+
66+
3467
class QueueCallbackWorker(object):
3568
"""A helper that executes a callback for every item in the queue.
3669
@@ -42,27 +75,42 @@ class QueueCallbackWorker(object):
4275
concurrency boundary implemented by ``executor``. Items will
4376
be popped off (with a blocking ``get()``) until :attr:`STOP`
4477
is encountered.
45-
callback (Callable[[str, Dict], Any]): A callback that can process
46-
items pulled off of the queue. Items are assumed to be a pair
47-
of a method name to be invoked and a dictionary of keyword
48-
arguments for that method.
78+
callback (Callable[Sequence[Any], Any]): A callback that can process
79+
items pulled off of the queue. Multiple items will be passed to
80+
the callback in batches.
81+
max_items (int): The maximum amount of items that will be passed to the
82+
callback at a time.
83+
max_latency (float): The maximum amount of time in seconds to wait for
84+
additional items before executing the callback.
4985
"""
5086

51-
def __init__(self, queue, callback):
87+
def __init__(self, queue, callback, max_items=100, max_latency=0):
5288
self.queue = queue
5389
self._callback = callback
90+
self.max_items = max_items
91+
self.max_latency = max_latency
5492

5593
def __call__(self):
56-
while True:
57-
item = self.queue.get()
58-
if item == STOP:
59-
_LOGGER.debug('Exiting the QueueCallbackWorker.')
60-
return
94+
continue_ = True
95+
while continue_:
96+
items = _get_many(
97+
self.queue,
98+
max_items=self.max_items,
99+
max_latency=self.max_latency)
100+
101+
# If stop is in the items, process all items up to STOP and then
102+
# exit.
103+
try:
104+
items = items[:items.index(STOP)]
105+
continue_ = False
106+
except ValueError:
107+
pass
61108

62109
# Run the callback. If any exceptions occur, log them and
63110
# continue.
64111
try:
65-
action, kwargs = item
66-
self._callback(action, kwargs)
112+
self._callback(items)
67113
except Exception as exc:
68114
_LOGGER.error('%s: %s', exc.__class__.__name__, exc)
115+
116+
_LOGGER.debug('Exiting the QueueCallbackWorker.')

pubsub/google/cloud/pubsub_v1/subscriber/message.py

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import math
1919
import time
2020

21+
from google.cloud.pubsub_v1.subscriber.policy import base as base_policy
22+
2123

2224
_MESSAGE_REPR = """\
2325
Message {{
@@ -172,14 +174,11 @@ def ack(self):
172174
"""
173175
time_to_ack = math.ceil(time.time() - self._received_timestamp)
174176
self._request_queue.put(
175-
(
176-
'ack',
177-
{
178-
'ack_id': self._ack_id,
179-
'byte_size': self.size,
180-
'time_to_ack': time_to_ack,
181-
},
182-
),
177+
base_policy.AckRequest(
178+
ack_id=self._ack_id,
179+
byte_size=self.size,
180+
time_to_ack=time_to_ack
181+
)
183182
)
184183

185184
def drop(self):
@@ -196,13 +195,10 @@ def drop(self):
196195
directly.
197196
"""
198197
self._request_queue.put(
199-
(
200-
'drop',
201-
{
202-
'ack_id': self._ack_id,
203-
'byte_size': self.size,
204-
},
205-
),
198+
base_policy.DropRequest(
199+
ack_id=self._ack_id,
200+
byte_size=self.size
201+
)
206202
)
207203

208204
def lease(self):
@@ -213,19 +209,16 @@ def lease(self):
213209
need to call it manually.
214210
"""
215211
self._request_queue.put(
216-
(
217-
'lease',
218-
{
219-
'ack_id': self._ack_id,
220-
'byte_size': self.size,
221-
},
222-
),
212+
base_policy.LeaseRequest(
213+
ack_id=self._ack_id,
214+
byte_size=self.size
215+
)
223216
)
224217

225218
def modify_ack_deadline(self, seconds):
226219
"""Resets the deadline for acknowledgement.
227-
228-
New deadline will be the given value of seconds from now.
220+
221+
New deadline will be the given value of seconds from now.
229222
230223
The default implementation handles this for you; you should not need
231224
to manually deal with setting ack deadlines. The exception case is
@@ -238,13 +231,10 @@ def modify_ack_deadline(self, seconds):
238231
values below 10 are advised against.
239232
"""
240233
self._request_queue.put(
241-
(
242-
'modify_ack_deadline',
243-
{
244-
'ack_id': self._ack_id,
245-
'seconds': seconds,
246-
},
247-
),
234+
base_policy.ModAckRequest(
235+
ack_id=self._ack_id,
236+
seconds=seconds
237+
)
248238
)
249239

250240
def nack(self):
@@ -253,11 +243,8 @@ def nack(self):
253243
This will cause the message to be re-delivered to the subscription.
254244
"""
255245
self._request_queue.put(
256-
(
257-
'nack',
258-
{
259-
'ack_id': self._ack_id,
260-
'byte_size': self.size,
261-
},
262-
),
246+
base_policy.NackRequest(
247+
ack_id=self._ack_id,
248+
byte_size=self.size
249+
)
263250
)

0 commit comments

Comments
 (0)