Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 30dce66

Browse files
committed
EXPERIMENTAL: speed up streaming pull (circumvent proto wrapper clases)
1 parent d5a6247 commit 30dce66

File tree

4 files changed

+37
-19
lines changed

4 files changed

+37
-19
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -602,9 +602,13 @@ def _on_response(self, response):
602602
)
603603
return
604604

605+
# Circumvent the wrapper class and operate on the raw protobuf message
606+
# to gain attribute access performance.
607+
received_messages = response._pb.received_messages
608+
605609
_LOGGER.debug(
606610
"Processing %s received message(s), currently on hold %s (bytes %s).",
607-
len(response.received_messages),
611+
len(received_messages),
608612
self._messages_on_hold.size,
609613
self._on_hold_bytes,
610614
)
@@ -614,12 +618,12 @@ def _on_response(self, response):
614618
# received them.
615619
items = [
616620
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
617-
for message in response.received_messages
621+
for message in received_messages
618622
]
619623
self._dispatcher.modify_ack_deadline(items)
620624

621625
with self._pause_resume_lock:
622-
for received_message in response.received_messages:
626+
for received_message in received_messages:
623627
message = google.cloud.pubsub_v1.subscriber.message.Message(
624628
received_message.message,
625629
received_message.ack_id,

google/cloud/pubsub_v1/subscriber/message.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414

1515
from __future__ import absolute_import
1616

17+
import datetime as dt
1718
import json
1819
import math
20+
import pytz
1921
import time
2022

2123
from google.cloud.pubsub_v1.subscriber._protocol import requests
@@ -79,7 +81,7 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue):
7981
8082
Args:
8183
message (~.pubsub_v1.types.PubsubMessage): The message received
82-
from Pub/Sub.
84+
from Pub/Sub. TODO: it's the raw protobuf message! for performance
8385
ack_id (str): The ack_id received from Pub/Sub.
8486
delivery_attempt (int): The delivery attempt counter received
8587
from Pub/Sub if a DeadLetterPolicy is set on the subscription,
@@ -99,6 +101,15 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue):
99101
# the default lease deadline.
100102
self._received_timestamp = time.time()
101103

104+
self._attributes = message.attributes
105+
self._data = message.data
106+
self._publish_time = dt.datetime.fromtimestamp(
107+
message.publish_time.seconds + message.publish_time.nanos / 1e9,
108+
tz=pytz.UTC,
109+
)
110+
self._ordering_key = message.ordering_key
111+
self._size = message.ByteSize()
112+
102113
def __repr__(self):
103114
# Get an abbreviated version of the data.
104115
abbv_data = self._message.data
@@ -130,7 +141,7 @@ def attributes(self):
130141
.ScalarMapContainer: The message's attributes. This is a
131142
``dict``-like object provided by ``google.protobuf``.
132143
"""
133-
return self._message.attributes
144+
return self._attributes
134145

135146
@property
136147
def data(self):
@@ -140,7 +151,7 @@ def data(self):
140151
bytes: The message data. This is always a bytestring; if you
141152
want a text string, call :meth:`bytes.decode`.
142153
"""
143-
return self._message.data
154+
return self._data
144155

145156
@property
146157
def publish_time(self):
@@ -149,17 +160,17 @@ def publish_time(self):
149160
Returns:
150161
datetime: The date and time that the message was published.
151162
"""
152-
return self._message.publish_time
163+
return self._publish_time
153164

154165
@property
155166
def ordering_key(self):
156167
"""str: the ordering key used to publish the message."""
157-
return self._message.ordering_key
168+
return self._ordering_key
158169

159170
@property
160171
def size(self):
161172
"""Return the size of the underlying message, in bytes."""
162-
return self._message._pb.ByteSize()
173+
return self._size
163174

164175
@property
165176
def ack_id(self):

tests/unit/pubsub_v1/subscriber/test_message.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,19 @@
3636
def create_message(data, ack_id="ACKID", delivery_attempt=0, ordering_key="", **attrs):
3737
with mock.patch.object(time, "time") as time_:
3838
time_.return_value = RECEIVED_SECONDS
39-
msg = message.Message(
40-
message=gapic_types.PubsubMessage(
41-
attributes=attrs,
42-
data=data,
43-
message_id="message_id",
44-
publish_time=timestamp_pb2.Timestamp(
45-
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
46-
),
47-
ordering_key=ordering_key,
39+
gapic_pubsub_message = gapic_types.PubsubMessage(
40+
attributes=attrs,
41+
data=data,
42+
message_id="message_id",
43+
publish_time=timestamp_pb2.Timestamp(
44+
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
4845
),
46+
ordering_key=ordering_key,
47+
)
48+
msg = message.Message(
49+
# The code under test uses a raw protobuf PubsubMessage, i.e. w/o additional
50+
# Python class wrappers, hence the "_pb"
51+
message=gapic_pubsub_message._pb,
4952
ack_id=ack_id,
5053
delivery_attempt=delivery_attempt,
5154
request_queue=queue.Queue(),

tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
def make_message(ack_id, ordering_key):
2323
proto_msg = gapic_types.PubsubMessage(data=b"Q", ordering_key=ordering_key)
24-
return message.Message(proto_msg, ack_id, 0, queue.Queue())
24+
return message.Message(proto_msg._pb, ack_id, 0, queue.Queue())
2525

2626

2727
def test_init():

0 commit comments

Comments
 (0)