Skip to content

Commit 743ade4

Browse files
Set gRPC message options and keepalive. (googleapis#4269)
1 parent b53731d commit 743ade4

File tree

4 files changed

+103
-0
lines changed

4 files changed

+103
-0
lines changed

pubsub/google/cloud/pubsub_v1/publisher/client.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
from __future__ import absolute_import
1616

1717
import copy
18+
import os
1819
import pkg_resources
1920
import threading
2021

22+
import grpc
2123
import six
2224

25+
from google.api_core import grpc_helpers
2326
from google.cloud.gapic.pubsub.v1 import publisher_client
2427

2528
from google.cloud.pubsub_v1 import _gapic
@@ -53,6 +56,28 @@ class Client(object):
5356
Generally, you should not need to set additional keyword arguments.
5457
"""
5558
def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
59+
# Sanity check: Is our goal to use the emulator?
60+
# If so, create a grpc insecure channel with the emulator host
61+
# as the target.
62+
if os.environ.get('PUBSUB_EMULATOR_HOST'):
63+
kwargs['channel'] = grpc.insecure_channel(
64+
target=os.environ.get('PUBSUB_EMULATOR_HOST'),
65+
)
66+
67+
# Use a custom channel.
68+
# We need this in order to set appropriate default message size and
69+
# keepalive options.
70+
if 'channel' not in kwargs:
71+
kwargs['channel'] = grpc_helpers.create_channel(
72+
credentials=kwargs.get('credentials', None),
73+
target=self.target,
74+
scopes=publisher_client.PublisherClient._ALL_SCOPES,
75+
options={
76+
'grpc.max_send_message_length': -1,
77+
'grpc.max_receive_message_length': -1,
78+
}.items(),
79+
)
80+
5681
# Add the metrics headers, and instantiate the underlying GAPIC
5782
# client.
5883
kwargs['lib_name'] = 'gccl'
@@ -66,6 +91,18 @@ def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
6691
self._batch_lock = threading.Lock()
6792
self._batches = {}
6893

94+
@property
95+
def target(self):
96+
"""Return the target (where the API is).
97+
98+
Returns:
99+
str: The location of the API.
100+
"""
101+
return '{host}:{port}'.format(
102+
host=publisher_client.PublisherClient.SERVICE_ADDRESS,
103+
port=publisher_client.PublisherClient.DEFAULT_SERVICE_PORT,
104+
)
105+
69106
def batch(self, topic, message, create=True, autocommit=True):
70107
"""Return the current batch for the provided topic.
71108

pubsub/google/cloud/pubsub_v1/subscriber/client.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
from __future__ import absolute_import
1616

1717
import pkg_resources
18+
import os
1819

20+
import grpc
21+
22+
from google.api_core import grpc_helpers
1923
from google.cloud.gapic.pubsub.v1 import subscriber_client
2024

2125
from google.cloud.pubsub_v1 import _gapic
@@ -49,6 +53,29 @@ class in order to define your own consumer. This is primarily
4953
arguments.
5054
"""
5155
def __init__(self, policy_class=thread.Policy, **kwargs):
56+
# Sanity check: Is our goal to use the emulator?
57+
# If so, create a grpc insecure channel with the emulator host
58+
# as the target.
59+
if os.environ.get('PUBSUB_EMULATOR_HOST'):
60+
kwargs['channel'] = grpc.insecure_channel(
61+
target=os.environ.get('PUBSUB_EMULATOR_HOST'),
62+
)
63+
64+
# Use a custom channel.
65+
# We need this in order to set appropriate default message size and
66+
# keepalive options.
67+
if 'channel' not in kwargs:
68+
kwargs['channel'] = grpc_helpers.create_channel(
69+
credentials=kwargs.get('credentials', None),
70+
target=self.target,
71+
scopes=subscriber_client.SubscriberClient._ALL_SCOPES,
72+
options={
73+
'grpc.max_send_message_length': -1,
74+
'grpc.max_receive_message_length': -1,
75+
'grpc.keepalive_time_ms': 30000,
76+
}.items(),
77+
)
78+
5279
# Add the metrics headers, and instantiate the underlying GAPIC
5380
# client.
5481
kwargs['lib_name'] = 'gccl'
@@ -59,6 +86,18 @@ def __init__(self, policy_class=thread.Policy, **kwargs):
5986
# messages.
6087
self._policy_class = policy_class
6188

89+
@property
90+
def target(self):
91+
"""Return the target (where the API is).
92+
93+
Returns:
94+
str: The location of the API.
95+
"""
96+
return '{host}:{port}'.format(
97+
host=subscriber_client.SubscriberClient.SERVICE_ADDRESS,
98+
port=subscriber_client.SubscriberClient.DEFAULT_SERVICE_PORT,
99+
)
100+
62101
def subscribe(self, subscription, callback=None, flow_control=()):
63102
"""Return a representation of an individual subscription.
64103

pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from __future__ import absolute_import
16+
import os
17+
1518
import mock
1619

1720
import pytest
@@ -38,6 +41,18 @@ def test_init():
3841
assert client.batch_settings.max_messages == 1000
3942

4043

44+
def test_init_emulator(monkeypatch):
45+
monkeypatch.setenv('PUBSUB_EMULATOR_HOST', '/foo/bar/')
46+
client = create_client()
47+
48+
# Establish that a gRPC request would attempt to hit the emulator host.
49+
#
50+
# Sadly, there seems to be no good way to do this without poking at
51+
# the private API of gRPC.
52+
channel = client.api.publisher_stub.Publish._channel
53+
assert channel.target().decode('utf8') == '/foo/bar/'
54+
55+
4156
def test_batch_accepting():
4257
"""Establish that an existing batch is returned if it accepts messages."""
4358
client = create_client()

pubsub/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,18 @@ def test_init():
2929
assert client._policy_class is thread.Policy
3030

3131

32+
def test_init_emulator(monkeypatch):
33+
monkeypatch.setenv('PUBSUB_EMULATOR_HOST', '/baz/bacon/')
34+
client = create_client()
35+
36+
# Establish that a gRPC request would attempt to hit the emulator host.
37+
#
38+
# Sadly, there seems to be no good way to do this without poking at
39+
# the private API of gRPC.
40+
channel = client.api.subscriber_stub.Pull._channel
41+
assert channel.target().decode('utf8') == '/baz/bacon/'
42+
43+
3244
def test_subscribe():
3345
client = create_client()
3446
subscription = client.subscribe('sub_name_a')

0 commit comments

Comments
 (0)