Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit c29d7f8

Browse files
committed
Optimize creating and accesing pubsub messages
Profiling shows that the speed of creating a new pubsub message and the speed of accessing the message's attributes significantly affects the throughput of publisher and subscriber. This commit makes everything faster by circumventing the wrapper class around the raw protobuf pubsub messages where possible.
1 parent cca5683 commit c29d7f8

File tree

6 files changed

+56
-22
lines changed

6 files changed

+56
-22
lines changed

google/cloud/pubsub_v1/publisher/_batch/thread.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
3333
_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest
3434

35+
_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()
36+
3537

3638
class Batch(base.Batch):
3739
"""A batch of messages.
@@ -337,7 +339,11 @@ def publish(self, message):
337339

338340
# Coerce the type, just in case.
339341
if not isinstance(message, gapic_types.PubsubMessage):
340-
message = gapic_types.PubsubMessage(**message)
342+
# For performance reasons, the message should be constructed by directly
343+
# using the raw protobuf class, and only then wrapping it into the
344+
# higher-level PubsubMessage class.
345+
vanilla_pb = _raw_proto_pubbsub_message(**message)
346+
message = gapic_types.PubsubMessage.wrap(vanilla_pb)
341347

342348
future = None
343349

google/cloud/pubsub_v1/publisher/client.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
"from_service_account_json",
5353
)
5454

55+
_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()
56+
5557

5658
def _set_nested_value(container, value, keys):
5759
current = container
@@ -346,10 +348,13 @@ def publish(
346348
"be sent as text strings."
347349
)
348350

349-
# Create the Pub/Sub message object.
350-
message = gapic_types.PubsubMessage(
351+
# Create the Pub/Sub message object. For performance reasons, the message
352+
# should be constructed by directly using the raw protobuf class, and only
353+
# then wrapping it into the higher-level PubsubMessage class.
354+
vanilla_pb = _raw_proto_pubbsub_message(
351355
data=data, ordering_key=ordering_key, attributes=attrs
352356
)
357+
message = gapic_types.PubsubMessage.wrap(vanilla_pb)
353358

354359
# Messages should go through flow control to prevent excessive
355360
# queuing on the client side (depending on the settings).

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -602,9 +602,13 @@ def _on_response(self, response):
602602
)
603603
return
604604

605+
# IMPORTANT: Circumvent the wrapper class and operate on the raw underlying
606+
# protobuf message to significantly gain on attribute access performance.
607+
received_messages = response._pb.received_messages
608+
605609
_LOGGER.debug(
606610
"Processing %s received message(s), currently on hold %s (bytes %s).",
607-
len(response.received_messages),
611+
len(received_messages),
608612
self._messages_on_hold.size,
609613
self._on_hold_bytes,
610614
)
@@ -614,12 +618,12 @@ def _on_response(self, response):
614618
# received them.
615619
items = [
616620
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
617-
for message in response.received_messages
621+
for message in received_messages
618622
]
619623
self._dispatcher.modify_ack_deadline(items)
620624

621625
with self._pause_resume_lock:
622-
for received_message in response.received_messages:
626+
for received_message in received_messages:
623627
message = google.cloud.pubsub_v1.subscriber.message.Message(
624628
received_message.message,
625629
received_message.ack_id,

google/cloud/pubsub_v1/subscriber/message.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414

1515
from __future__ import absolute_import
1616

17+
import datetime as dt
1718
import json
1819
import math
20+
import pytz
1921
import time
2022

2123
from google.cloud.pubsub_v1.subscriber._protocol import requests
@@ -79,7 +81,9 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue):
7981
8082
Args:
8183
message (~.pubsub_v1.types.PubsubMessage): The message received
82-
from Pub/Sub.
84+
from Pub/Sub. For performance reasons it should be the the raw
85+
protobuf message wrapped by the ``PubsubMessage`` class obtained
86+
through the message's ``.pb()`` method.
8387
ack_id (str): The ack_id received from Pub/Sub.
8488
delivery_attempt (int): The delivery attempt counter received
8589
from Pub/Sub if a DeadLetterPolicy is set on the subscription,
@@ -99,6 +103,18 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue):
99103
# the default lease deadline.
100104
self._received_timestamp = time.time()
101105

106+
# Store the message attributes directly to speed up attribute access, i.e.
107+
# to avoid two lookups if self._message.<attribute> pattern was used in
108+
# properties.
109+
self._attributes = message.attributes
110+
self._data = message.data
111+
self._publish_time = dt.datetime.fromtimestamp(
112+
message.publish_time.seconds + message.publish_time.nanos / 1e9,
113+
tz=pytz.UTC,
114+
)
115+
self._ordering_key = message.ordering_key
116+
self._size = message.ByteSize()
117+
102118
def __repr__(self):
103119
# Get an abbreviated version of the data.
104120
abbv_data = self._message.data
@@ -130,7 +146,7 @@ def attributes(self):
130146
.ScalarMapContainer: The message's attributes. This is a
131147
``dict``-like object provided by ``google.protobuf``.
132148
"""
133-
return self._message.attributes
149+
return self._attributes
134150

135151
@property
136152
def data(self):
@@ -140,7 +156,7 @@ def data(self):
140156
bytes: The message data. This is always a bytestring; if you
141157
want a text string, call :meth:`bytes.decode`.
142158
"""
143-
return self._message.data
159+
return self._data
144160

145161
@property
146162
def publish_time(self):
@@ -149,17 +165,17 @@ def publish_time(self):
149165
Returns:
150166
datetime: The date and time that the message was published.
151167
"""
152-
return self._message.publish_time
168+
return self._publish_time
153169

154170
@property
155171
def ordering_key(self):
156172
"""str: the ordering key used to publish the message."""
157-
return self._message.ordering_key
173+
return self._ordering_key
158174

159175
@property
160176
def size(self):
161177
"""Return the size of the underlying message, in bytes."""
162-
return self._message._pb.ByteSize()
178+
return self._size
163179

164180
@property
165181
def ack_id(self):

tests/unit/pubsub_v1/subscriber/test_message.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,19 @@
3636
def create_message(data, ack_id="ACKID", delivery_attempt=0, ordering_key="", **attrs):
3737
with mock.patch.object(time, "time") as time_:
3838
time_.return_value = RECEIVED_SECONDS
39-
msg = message.Message(
40-
message=gapic_types.PubsubMessage(
41-
attributes=attrs,
42-
data=data,
43-
message_id="message_id",
44-
publish_time=timestamp_pb2.Timestamp(
45-
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
46-
),
47-
ordering_key=ordering_key,
39+
gapic_pubsub_message = gapic_types.PubsubMessage(
40+
attributes=attrs,
41+
data=data,
42+
message_id="message_id",
43+
publish_time=timestamp_pb2.Timestamp(
44+
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
4845
),
46+
ordering_key=ordering_key,
47+
)
48+
msg = message.Message(
49+
# The code under test uses a raw protobuf PubsubMessage, i.e. w/o additional
50+
# Python class wrappers, hence the "_pb"
51+
message=gapic_pubsub_message._pb,
4952
ack_id=ack_id,
5053
delivery_attempt=delivery_attempt,
5154
request_queue=queue.Queue(),

tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
def make_message(ack_id, ordering_key):
2323
proto_msg = gapic_types.PubsubMessage(data=b"Q", ordering_key=ordering_key)
24-
return message.Message(proto_msg, ack_id, 0, queue.Queue())
24+
return message.Message(proto_msg._pb, ack_id, 0, queue.Queue())
2525

2626

2727
def test_init():

0 commit comments

Comments
 (0)