Skip to content

Commit 841bcbd

Browse files
mcdonctheacodes
authored andcommitted
Make Publisher batch-related interfaces private (googleapis#5784)
1 parent 74a5895 commit 841bcbd

9 files changed

Lines changed: 25 additions & 43 deletions

File tree

docs/pubsub/publisher/api/batch.rst

Lines changed: 0 additions & 8 deletions
This file was deleted.

docs/pubsub/publisher/index.rst

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,23 +58,21 @@ If you want to include attributes, simply add keyword arguments:
5858
Batching
5959
--------
6060

61-
Whenever you publish a message, a
62-
:class:`~.pubsub_v1.publisher.batch.thread.Batch` is automatically created.
63-
This way, if you publish a large volume of messages, it reduces the number of
64-
requests made to the server.
61+
Whenever you publish a message, the publisher will automatically batch the
62+
messages over a small time window to avoid making too many separate requests to
63+
the service. This helps increase throughput.
6564

6665
.. note::
6766

6867
By default, this uses ``threading``, and you will need to be in an
6968
environment with threading enabled. It is possible to provide an
7069
alternative batch class that uses another concurrency strategy.
7170

72-
The way that this works is that on the first message that you send, a new
73-
:class:`~.pubsub_v1.publisher.batch.thread.Batch` is created automatically.
74-
For every subsequent message, if there is already a valid batch that is still
75-
accepting messages, then that batch is used. When the batch is created, it
76-
begins a countdown that publishes the batch once sufficient time has
77-
elapsed (by default, this is 0.05 seconds).
71+
The way that this works is that on the first message that you send, a new batch
72+
is created automatically. For every subsequent message, if there is already a
73+
valid batch that is still accepting messages, then that batch is used. When the
74+
batch is created, it begins a countdown that publishes the batch once
75+
sufficient time has elapsed (by default, this is 0.05 seconds).
7876

7977
If you need different batching settings, simply provide a
8078
:class:`~.pubsub_v1.types.BatchSettings` object when you instantiate the

pubsub/google/cloud/pubsub_v1/publisher/batch/__init__.py renamed to pubsub/google/cloud/pubsub_v1/publisher/_batch/__init__.py

File renamed without changes.

pubsub/google/cloud/pubsub_v1/publisher/batch/base.py renamed to pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py

File renamed without changes.

pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py renamed to pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from google.cloud.pubsub_v1 import types
2525
from google.cloud.pubsub_v1.publisher import exceptions
2626
from google.cloud.pubsub_v1.publisher import futures
27-
from google.cloud.pubsub_v1.publisher.batch import base
27+
from google.cloud.pubsub_v1.publisher._batch import base
2828

2929

3030
_LOGGER = logging.getLogger(__name__)

pubsub/google/cloud/pubsub_v1/publisher/client.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@
2626
from google.cloud.pubsub_v1 import _gapic
2727
from google.cloud.pubsub_v1 import types
2828
from google.cloud.pubsub_v1.gapic import publisher_client
29-
from google.cloud.pubsub_v1.publisher.batch import thread
29+
from google.cloud.pubsub_v1.publisher._batch import thread
3030

3131

3232
__version__ = pkg_resources.get_distribution('google-cloud-pubsub').version
3333

3434

3535
@_gapic.add_methods(publisher_client.PublisherClient, blacklist=('publish',))
3636
class Client(object):
37+
_batch_class = thread.Batch
3738
"""A publisher client for Google Cloud Pub/Sub.
3839
3940
This creates an object that is capable of publishing messages.
@@ -43,14 +44,6 @@ class Client(object):
4344
Args:
4445
batch_settings (~google.cloud.pubsub_v1.types.BatchSettings): The
4546
settings for batch publishing.
46-
batch_class (Optional[Type]): A class that describes how to handle
47-
batches. You may subclass the
48-
:class:`.pubsub_v1.publisher.batch.base.BaseBatch` class in
49-
order to define your own batcher. This is primarily provided to
50-
allow use of different concurrency models; the default
51-
is based on :class:`threading.Thread`. This class should also have
52-
a class method (or static method) that takes no arguments and
53-
produces a lock that can be used as a context manager.
5447
kwargs (dict): Any additional arguments provided are sent as keyword
5548
arguments to the underlying
5649
:class:`~.gapic.pubsub.v1.publisher_client.PublisherClient`.
@@ -59,7 +52,7 @@ class Client(object):
5952
be added if ``credentials`` are passed explicitly or if the
6053
Pub / Sub emulator is detected as running.
6154
"""
62-
def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
55+
def __init__(self, batch_settings=(), **kwargs):
6356
# Sanity check: Is our goal to use the emulator?
6457
# If so, create a grpc insecure channel with the emulator host
6558
# as the target.
@@ -89,8 +82,7 @@ def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
8982

9083
# The batches on the publisher client are responsible for holding
9184
# messages. One batch exists for each topic.
92-
self._batch_class = batch_class
93-
self._batch_lock = batch_class.make_lock()
85+
self._batch_lock = self._batch_class.make_lock()
9486
self._batches = {}
9587

9688
@property
@@ -102,7 +94,7 @@ def target(self):
10294
"""
10395
return publisher_client.PublisherClient.SERVICE_ADDRESS
10496

105-
def batch(self, topic, create=False, autocommit=True):
97+
def _batch(self, topic, create=False, autocommit=True):
10698
"""Return the current batch for the provided topic.
10799
108100
This will create a new batch if ``create=True`` or if no batch
@@ -119,7 +111,7 @@ def batch(self, topic, create=False, autocommit=True):
119111
might have (e.g. spawning a worker to publish a batch).
120112
121113
Returns:
122-
~.pubsub_v1.batch.Batch: The batch object.
114+
~.pubsub_v1._batch.Batch: The batch object.
123115
"""
124116
# If there is no matching batch yet, then potentially create one
125117
# and place it on the batches dictionary.
@@ -201,11 +193,11 @@ def publish(self, topic, data, **attrs):
201193
message = types.PubsubMessage(data=data, attributes=attrs)
202194

203195
# Delegate the publishing to the batch.
204-
batch = self.batch(topic)
196+
batch = self._batch(topic)
205197
future = None
206198
while future is None:
207199
future = batch.publish(message)
208200
if future is None:
209-
batch = self.batch(topic, create=True)
201+
batch = self._batch(topic, create=True)
210202

211203
return future

pubsub/tests/unit/pubsub_v1/publisher/batch/test_base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
from google.auth import credentials
2020
from google.cloud.pubsub_v1 import publisher
2121
from google.cloud.pubsub_v1 import types
22-
from google.cloud.pubsub_v1.publisher.batch.base import BatchStatus
23-
from google.cloud.pubsub_v1.publisher.batch.thread import Batch
22+
from google.cloud.pubsub_v1.publisher._batch.base import BatchStatus
23+
from google.cloud.pubsub_v1.publisher._batch.thread import Batch
2424

2525

2626
def create_batch(status=None, settings=types.BatchSettings()):

pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
from google.cloud.pubsub_v1 import publisher
2323
from google.cloud.pubsub_v1 import types
2424
from google.cloud.pubsub_v1.publisher import exceptions
25-
from google.cloud.pubsub_v1.publisher.batch.base import BatchStatus
26-
from google.cloud.pubsub_v1.publisher.batch import thread
27-
from google.cloud.pubsub_v1.publisher.batch.thread import Batch
25+
from google.cloud.pubsub_v1.publisher._batch.base import BatchStatus
26+
from google.cloud.pubsub_v1.publisher._batch import thread
27+
from google.cloud.pubsub_v1.publisher._batch.thread import Batch
2828

2929

3030
def create_client():

pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def test_batch_create():
5656

5757
assert len(client._batches) == 0
5858
topic = 'topic/path'
59-
batch = client.batch(topic, autocommit=False)
59+
batch = client._batch(topic, autocommit=False)
6060
assert client._batches == {topic: batch}
6161

6262

@@ -68,7 +68,7 @@ def test_batch_exists():
6868
client._batches[topic] = mock.sentinel.batch
6969

7070
# A subsequent request should return the same batch.
71-
batch = client.batch(topic, autocommit=False)
71+
batch = client._batch(topic, autocommit=False)
7272
assert batch is mock.sentinel.batch
7373
assert client._batches == {topic: batch}
7474

@@ -81,7 +81,7 @@ def test_batch_create_and_exists():
8181
client._batches[topic] = mock.sentinel.batch
8282

8383
# A subsequent request should return the same batch.
84-
batch = client.batch(topic, create=True, autocommit=False)
84+
batch = client._batch(topic, create=True, autocommit=False)
8585
assert batch is not mock.sentinel.batch
8686
assert client._batches == {topic: batch}
8787

0 commit comments

Comments
 (0)