Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions packages/google-cloud-pubsub/CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ In order to add a feature:
documentation.

- The feature must work fully on the following CPython versions:
3.9, 3.10, 3.11, 3.12, 3.13 and 3.14 on both UNIX and Windows.
3.10, 3.11, 3.12, 3.13 and 3.14 on both UNIX and Windows.

- The feature must not add unnecessary dependencies (where
"unnecessary" is of course subjective, but new dependencies should
Expand Down Expand Up @@ -221,14 +221,12 @@ Supported Python Versions

We support:

- `Python 3.9`_
- `Python 3.10`_
- `Python 3.11`_
- `Python 3.12`_
- `Python 3.13`_
- `Python 3.14`_

.. _Python 3.9: https://docs.python.org/3.9/
.. _Python 3.10: https://docs.python.org/3.10/
.. _Python 3.11: https://docs.python.org/3.11/
.. _Python 3.12: https://docs.python.org/3.12/
Expand All @@ -241,18 +239,6 @@ Supported versions can be found in our ``noxfile.py`` `config`_.
.. _config: https://github.com/googleapis/python-pubsub/blob/main/noxfile.py


We also explicitly decided to support Python 3 beginning with version 3.7.
Reasons for this include:

- Encouraging use of newest versions of Python 3
- Taking the lead of `prominent`_ open-source `projects`_
- `Unicode literal support`_ which allows for a cleaner codebase that
works in both Python 2 and Python 3

.. _prominent: https://docs.djangoproject.com/en/1.9/faq/install/#what-python-version-can-i-use-with-django
.. _projects: http://flask.pocoo.org/docs/0.10/python3/
.. _Unicode literal support: https://www.python.org/dev/peps/pep-0414/

**********
Versioning
**********
Expand Down
4 changes: 0 additions & 4 deletions packages/google-cloud-pubsub/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ Python >= 3.10, including 3.14
.. _active: https://devguide.python.org/devcycle/#in-development-main-branch
.. _maintenance: https://devguide.python.org/devcycle/#maintenance-branches

Unsupported Python Versions
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Python <= 3.9
Comment thread
ohmayr marked this conversation as resolved.


If you are using an `end-of-life`_
version of Python, we recommend that you update as soon as possible to an actively supported version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import copy
import logging
import os
import sys
import threading
import time
import typing
Expand Down Expand Up @@ -165,18 +164,6 @@ def __init__(
self._open_telemetry_enabled = (
self.publisher_options.enable_open_telemetry_tracing
)
# OpenTelemetry features used by the library are not supported in Python versions <= 3.7.
# Refer https://github.com/open-telemetry/opentelemetry-python/issues/3993#issuecomment-2211976389
if (
self.publisher_options.enable_open_telemetry_tracing
and sys.version_info.major == 3
and sys.version_info.minor < 8
):
warnings.warn(
message="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.",
category=RuntimeWarning,
)
self._open_telemetry_enabled = False

@classmethod
def from_service_account_file( # type: ignore[override]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@
)
from google.cloud.pubsub_v1.subscriber._protocol.dispatcher import _MAX_BATCH_LATENCY

try:
from collections.abc import KeysView

KeysView[None] # KeysView is only subscriptable in Python 3.9+
except TypeError:
# Deprecated since Python 3.9, thus only use as a fallback in older Python versions
from typing import KeysView
from collections.abc import KeysView

from google.cloud.pubsub_v1.subscriber._protocol import requests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from __future__ import absolute_import

import os
import sys
import typing
import warnings
from typing import Any, Callable, Optional, Sequence, Union, cast
Expand Down Expand Up @@ -102,18 +101,6 @@ def __init__(
self._open_telemetry_enabled = (
self.subscriber_options.enable_open_telemetry_tracing
)
# OpenTelemetry features used by the library are not supported in Python versions <= 3.7.
# Refer https://github.com/open-telemetry/opentelemetry-python/issues/3993#issuecomment-2211976389
if (
self.subscriber_options.enable_open_telemetry_tracing
and sys.version_info.major == 3
and sys.version_info.minor < 8
):
warnings.warn(
message="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.",
category=RuntimeWarning,
)
self._open_telemetry_enabled = False

@property
def open_telemetry_enabled(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ def shutdown(
"""
dropped_messages = []

# Drop all pending item from the executor. Without this, the executor will also
# try to process any pending work items before termination, which is undesirable.
#
# TODO: Replace the logic below by passing `cancel_futures=True` to shutdown()
# once we only need to support Python 3.9+.
Comment thread
ohmayr marked this conversation as resolved.
try:
while True:
work_item = self._executor._work_queue.get(block=False)
Expand Down
2 changes: 0 additions & 2 deletions packages/google-cloud-pubsub/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ filterwarnings =
# Remove once the minimum supported version of googleapis-common-protos is 1.62.0
ignore:.*pkg_resources.declare_namespace:DeprecationWarning
ignore:.*pkg_resources is deprecated as an API:DeprecationWarning
# Remove once https://github.com/googleapis/gapic-generator-python/issues/2303 is fixed
ignore:The python-bigquery library will stop supporting Python 3.7:PendingDeprecationWarning
# Remove once we move off credential files https://github.com/googleapis/google-auth-library-python/pull/1812
# Note that these are used in tests only
ignore:Your config file at [/home/kbuilder/.docker/config.json] contains these credential helper entries:DeprecationWarning
Expand Down
21 changes: 19 additions & 2 deletions packages/google-cloud-pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,11 +447,19 @@ def test_subscriber_not_leaking_open_sockets(
subscriber = pubsub_v1.SubscriberClient(transport="grpc")
subscriber_2 = pubsub_v1.SubscriberClient(transport="grpc")

# Construct a secondary publisher client to clean up the topic,
# so we can safely close the main publisher client inside the test.
if "Rest" in type(publisher._transport).__name__:
publisher_2 = pubsub_v1.PublisherClient(transport="rest")
else:
publisher_2 = pubsub_v1.PublisherClient(transport="grpc")

cleanup.append(
(subscriber_2.delete_subscription, (), {"subscription": subscription_path})
)
cleanup.append((subscriber_2.close, (), {}))
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append((publisher_2.delete_topic, (), {"topic": topic_path}))
cleanup.append((publisher_2._transport.close, (), {}))

# Create topic before starting to track connection count (any sockets opened
# by the publisher client are not counted by this test).
Expand All @@ -477,7 +485,16 @@ def test_subscriber_not_leaking_open_sockets(
response = subscriber.pull(subscription=subscription_path, max_messages=3)
assert len(response.received_messages) == 3

conn_count_end = len(current_process.net_connections())
# Close the publisher client's transport used in the test to ensure all its socket connections
# (including any opened asynchronously on the background publisher threads) are closed.
publisher._transport.close()

# Wait a bit for the asynchronous channel teardown to complete and the socket to be closed.
for _ in range(30):
conn_count_end = len(current_process.net_connections())
if conn_count_end <= conn_count_start:
break
time.sleep(0.1)

# To avoid flakiness, use <= in the assertion, since on rare occasions additional
# sockets are closed, causing the == assertion to fail.
Expand Down
18 changes: 17 additions & 1 deletion packages/google-cloud-pubsub/tests/unit/pubsub_v1/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,27 @@ def set_trace_provider():

@pytest.fixture(scope="function")
def span_exporter():
"""Provides an InMemorySpanExporter for testing OpenTelemetry traces.

Registers a SimpleSpanProcessor with the global TracerProvider at start,
and removes it during teardown to prevent trace/span processor accumulation
and test pollution across tests.
"""
exporter = InMemorySpanExporter()
processor = SimpleSpanProcessor(exporter)
provider = trace.get_tracer_provider()
provider.add_span_processor(processor)
yield exporter
try:
yield exporter
finally:
if hasattr(provider, "_active_span_processor") and hasattr(
provider._active_span_processor, "_span_processors"
):
processors = provider._active_span_processor._span_processors
if isinstance(processors, tuple):
provider._active_span_processor._span_processors = tuple(
p for p in processors if p is not processor
)


@pytest.fixture()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import datetime
import sys
import threading
import time
from unittest import mock
Expand Down Expand Up @@ -723,9 +722,6 @@ def test_batch_done_callback_called_on_publish_response_invalid():


# Refer https://opentelemetry.io/docs/languages/python/#version-support
@pytest.mark.skipif(
sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher"
)
def test_open_telemetry_commit_publish_rpc_span_none(span_exporter):
"""
Test scenario where OpenTelemetry is enabled, publish RPC
Expand Down Expand Up @@ -771,9 +767,6 @@ def test_open_telemetry_commit_publish_rpc_span_none(span_exporter):


# Refer https://opentelemetry.io/docs/languages/python/#version-support
@pytest.mark.skipif(
sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher"
)
def test_open_telemetry_commit_publish_rpc_exception(span_exporter):
TOPIC = "projects/projectID/topics/topicID"
batch = create_batch(topic=TOPIC, enable_open_telemetry=True)
Expand Down Expand Up @@ -819,9 +812,6 @@ def test_open_telemetry_commit_publish_rpc_exception(span_exporter):


# Refer https://opentelemetry.io/docs/languages/python/#version-support
@pytest.mark.skipif(
sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher"
)
def test_opentelemetry_commit_sampling(span_exporter):
TOPIC = "projects/projectID/topics/topic"
batch = create_batch(
Expand Down Expand Up @@ -886,9 +876,6 @@ def test_opentelemetry_commit_sampling(span_exporter):
assert span.events[1].name == "publish end"


@pytest.mark.skipif(
sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher"
)
def test_opentelemetry_commit(span_exporter):
TOPIC = "projects/projectID/topics/topic"
batch = create_batch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import inspect
import math
import sys
import time
from typing import Any, Callable, TypeVar, cast
from unittest import mock
Expand Down Expand Up @@ -154,28 +153,13 @@ def test_init_w_custom_transport(creds):
)
@typed_flaky
def test_open_telemetry_publisher_options(creds, enable_open_telemetry):
if sys.version_info >= (3, 8) or enable_open_telemetry is False:
client = publisher.Client(
publisher_options=types.PublisherOptions(
enable_open_telemetry_tracing=enable_open_telemetry
),
credentials=creds,
)
assert client._open_telemetry_enabled == enable_open_telemetry
else:
# Open Telemetry is not supported and hence disabled for Python
# versions 3.7 or below
with pytest.warns(
RuntimeWarning,
match="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.",
):
client = publisher.Client(
publisher_options=types.PublisherOptions(
enable_open_telemetry_tracing=enable_open_telemetry
),
credentials=creds,
)
assert client._open_telemetry_enabled is False
client = publisher.Client(
publisher_options=types.PublisherOptions(
enable_open_telemetry_tracing=enable_open_telemetry
),
credentials=creds,
)
assert client._open_telemetry_enabled == enable_open_telemetry


def test_opentelemetry_context_setter():
Expand All @@ -185,10 +169,6 @@ def test_opentelemetry_context_setter():
assert "googclient_key" in msg.attributes.keys()


@pytest.mark.skipif(
sys.version_info < (3, 8),
reason="Open Telemetry not supported below Python version 3.8",
)
def test_opentelemetry_context_propagation(creds, span_exporter):
TOPIC = "projects/projectID/topics/topicID"
client = publisher.Client(
Expand All @@ -208,10 +188,6 @@ def test_opentelemetry_context_propagation(creds, span_exporter):
assert "googclient_traceparent" in args[0].attributes


@pytest.mark.skipif(
sys.version_info < (3, 8),
reason="Open Telemetry not supported below Python version 3.8",
)
@pytest.mark.parametrize(
"enable_open_telemetry",
[
Expand Down Expand Up @@ -271,10 +247,6 @@ def test_opentelemetry_publisher_batching_exception(
assert len(spans) == 0


@pytest.mark.skipif(
sys.version_info < (3, 8),
reason="Open Telemetry not supported below Python version 3.8",
)
def test_opentelemetry_flow_control_exception(creds, span_exporter):
publisher_options = types.PublisherOptions(
flow_control=types.PublishFlowControl(
Expand All @@ -299,16 +271,30 @@ def test_opentelemetry_flow_control_exception(creds, span_exporter):

spans = span_exporter.get_finished_spans()

# Find the spans related to the second, failing publish call
failed_create_span = None
# Find the spans related to the second, failing publish call.
# We first find the failed flow control span.
failed_fc_span = None
for span in spans:
if span.name == "topicID create":
if span.status.status_code == trace.StatusCode.ERROR:
if (
span.name == "publisher flow control"
and span.status.status_code == trace.StatusCode.ERROR
):
failed_fc_span = span
break

# Next, we find the corresponding 'create' span.
# Crucially, to prevent matching late-arriving or concurrent publish spans from other tests
# (e.g. background batch/sequencer threads from previous tests executing late),
# we filter for the 'topicID create' span that shares the EXACT same trace ID.
failed_create_span = None
if failed_fc_span:
for span in spans:
if (
span.name == "topicID create"
and span.context.trace_id == failed_fc_span.context.trace_id
):
failed_create_span = span
elif span.name == "publisher flow control":
if span.status.status_code == trace.StatusCode.ERROR:
failed_fc_span = span
break

assert failed_create_span is not None, "Failed 'topicID create' span not found"
assert failed_fc_span is not None, "Failed 'publisher flow control' span not found"
Expand All @@ -332,10 +318,6 @@ def test_opentelemetry_flow_control_exception(creds, span_exporter):
assert has_exception_event, "Exception event not found in failed create span"


@pytest.mark.skipif(
sys.version_info < (3, 8),
reason="Open Telemetry not supported below Python version 3.8",
)
def test_opentelemetry_publish(creds, span_exporter):
TOPIC = "projects/projectID/topics/topicID"
client = publisher.Client(
Expand Down
Loading
Loading