Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
except ImportError:
bigquery_magics = None

if sys.version_info < (3, 10):
if sys.version_info < (3, 10): # pragma: NO COVER
warnings.warn(
"The python-bigquery library no longer supports Python <= 3.9. "
f"Your Python version is {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}. We "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def close(self):

if self._owns_bqstorage_client:
# There is no close() on the BQ Storage client itself.
self._bqstorage_client._transport.grpc_channel.close()
self._bqstorage_client._transport.close()

for cursor_ in self._cursors_created:
if not cursor_._closed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,4 +773,4 @@ def _close_transports(client, bqstorage_client):
"""
client.close()
if bqstorage_client is not None:
bqstorage_client._transport.grpc_channel.close()
bqstorage_client._transport.close()
Original file line number Diff line number Diff line change
Expand Up @@ -2353,7 +2353,9 @@ def to_arrow(
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client._transport.grpc_channel.close() # type: ignore
# mypy: bqstorage_client is guaranteed to be not None when owns_bqstorage_client is True,
# but mypy cannot infer this correlation. We ignore the union-attr error here.
bqstorage_client._transport.close() # type: ignore[union-attr]

if record_batches and bqstorage_client is not None:
return pyarrow.Table.from_batches(record_batches)
Expand Down
28 changes: 27 additions & 1 deletion packages/google-cloud-bigquery/tests/system/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import datetime
import decimal
import uuid
Expand All @@ -21,7 +22,6 @@

from google.cloud._helpers import UTC


_naive = datetime.datetime(2016, 12, 5, 12, 41, 9)
_naive_microseconds = datetime.datetime(2016, 12, 5, 12, 41, 9, 250000)
_stamp = "%s %s" % (_naive.date().isoformat(), _naive.time().isoformat())
Expand Down Expand Up @@ -104,3 +104,29 @@ def _rate_limit_exceeded(forbidden):
google.api_core.exceptions.Forbidden,
error_predicate=_rate_limit_exceeded,
)


@contextlib.contextmanager
def patch_tracked_requests():
"""Context manager to patch google-auth requests and track/close their HTTP sessions.

This prevents socket leaks in system tests that use Workload Identity or metadata server auth.
"""
import google.auth.transport.requests

original_init = google.auth.transport.requests.Request.__init__
tracked_requests = []

def patched_init(self, session=None):
original_init(self, session=session)
if session is None:
tracked_requests.append(self)

google.auth.transport.requests.Request.__init__ = patched_init
try:
yield tracked_requests
finally:
google.auth.transport.requests.Request.__init__ = original_init
for req in tracked_requests:
if hasattr(req, "session") and req.session is not None:
req.session.close()
75 changes: 43 additions & 32 deletions packages/google-cloud-bigquery/tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@

from . import helpers


JOB_TIMEOUT = 120 # 2 minutes
DATA_PATH = pathlib.Path(__file__).parent.parent / "data"

Expand Down Expand Up @@ -234,23 +233,29 @@ def _create_bucket(self, bucket_name, location=None):

def test_close_releases_open_sockets(self):
current_process = psutil.Process()
conn_count_start = len(current_process.net_connections())
conn_start = current_process.net_connections()
conn_count_start = len(conn_start)

with helpers.patch_tracked_requests():
client = Config.CLIENT
client.query(
"""
SELECT
source_year AS year, COUNT(is_male) AS birth_count
FROM `bigquery-public-data.samples.natality`
GROUP BY year
ORDER BY year DESC
LIMIT 15
"""
)

client = Config.CLIENT
client.query(
"""
SELECT
source_year AS year, COUNT(is_male) AS birth_count
FROM `bigquery-public-data.samples.natality`
GROUP BY year
ORDER BY year DESC
LIMIT 15
"""
)
client.close()

client.close()
import gc

conn_count_end = len(current_process.net_connections())
gc.collect()
conn_end = current_process.net_connections()
conn_count_end = len(conn_end)
self.assertLessEqual(conn_count_end, conn_count_start)

def test_create_dataset(self):
Expand Down Expand Up @@ -2174,25 +2179,31 @@ def test_dbapi_dry_run_query(self):
def test_dbapi_connection_does_not_leak_sockets(self):
pytest.importorskip("google.cloud.bigquery_storage")
current_process = psutil.Process()
conn_count_start = len(current_process.net_connections())

# Provide no explicit clients, so that the connection will create and own them.
connection = dbapi.connect()
cursor = connection.cursor()

cursor.execute(
conn_start = current_process.net_connections()
conn_count_start = len(conn_start)

with helpers.patch_tracked_requests():
# Provide no explicit clients, so that the connection will create and own them.
connection = dbapi.connect()
cursor = connection.cursor()

cursor.execute(
"""
SELECT id, `by`, timestamp
FROM `bigquery-public-data.hacker_news.full`
ORDER BY `id` ASC
LIMIT 100000
"""
SELECT id, `by`, timestamp
FROM `bigquery-public-data.hacker_news.full`
ORDER BY `id` ASC
LIMIT 100000
"""
)
rows = cursor.fetchall()
self.assertEqual(len(rows), 100000)
)
rows = cursor.fetchall()
self.assertEqual(len(rows), 100000)

connection.close()
import gc

connection.close()
conn_count_end = len(current_process.net_connections())
gc.collect()
conn_end = current_process.net_connections()
conn_count_end = len(conn_end)
self.assertLessEqual(conn_count_end, conn_count_start)

def _load_table_for_dml(self, rows, dataset_id, table_id):
Expand Down
46 changes: 25 additions & 21 deletions packages/google-cloud-bigquery/tests/system/test_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pytest
import psutil

from . import helpers

IPython = pytest.importorskip("IPython")
io = pytest.importorskip("IPython.utils.io")
Expand Down Expand Up @@ -48,27 +49,30 @@ def ipython_interactive(ipython):
def test_bigquery_magic(ipython_interactive):
ip = IPython.get_ipython()
current_process = psutil.Process()
conn_count_start = len(current_process.net_connections())

# Deprecated, but should still work in google-cloud-bigquery 3.x.
with pytest.warns(FutureWarning, match="bigquery_magics"):
ip.extension_manager.load_extension("google.cloud.bigquery")

sql = """
SELECT
CONCAT(
'https://stackoverflow.com/questions/',
CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10
"""
with io.capture_output() as captured:
result = ip.run_cell_magic("bigquery", "--use_rest_api", sql)

conn_count_end = len(current_process.net_connections())
conn_start = current_process.net_connections()
conn_count_start = len(conn_start)

with helpers.patch_tracked_requests():
# Deprecated, but should still work in google-cloud-bigquery 3.x.
with pytest.warns(FutureWarning, match="bigquery_magics"):
ip.extension_manager.load_extension("google.cloud.bigquery")

sql = """
SELECT
CONCAT(
'https://stackoverflow.com/questions/',
CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10
"""
with io.capture_output() as captured:
result = ip.run_cell_magic("bigquery", "--use_rest_api", sql)

conn_end = current_process.net_connections()
conn_count_end = len(conn_end)

lines = re.split("\n|\r", captured.stdout)
# Removes blanks & terminal code (result of display clearing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _mock_bqstorage_client(self):
from google.cloud import bigquery_storage

mock_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
mock_client._transport = mock.Mock(spec=["channel"])
mock_client._transport = mock.Mock(spec=["channel", "close"])
mock_client._transport.grpc_channel = mock.Mock(spec=["close"])
return mock_client

Expand Down Expand Up @@ -176,7 +176,7 @@ def test_close_closes_all_created_bigquery_clients(self):
connection.close()

self.assertTrue(client.close.called)
self.assertTrue(bqstorage_client._transport.grpc_channel.close.called)
self.assertTrue(bqstorage_client._transport.close.called)

def test_close_does_not_close_bigquery_clients_passed_to_it(self):
pytest.importorskip("google.cloud.bigquery_storage")
Expand All @@ -187,7 +187,7 @@ def test_close_does_not_close_bigquery_clients_passed_to_it(self):
connection.close()

self.assertFalse(client.close.called)
self.assertFalse(bqstorage_client._transport.grpc_channel.close.called)
self.assertFalse(bqstorage_client._transport.close.called)

def test_close_closes_all_created_cursors(self):
connection = self._make_one(client=self._mock_client())
Expand Down
7 changes: 2 additions & 5 deletions packages/google-cloud-bigquery/tests/unit/test_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

@pytest.fixture()
def use_local_magics_context(monkeypatch):
if magics is not None:
if magics is not None: # pragma: NO COVER
local_context = magics.Context()
local_context._project = "unit-test-project"
mock_credentials = mock.create_autospec(
Expand Down Expand Up @@ -2195,13 +2195,10 @@ def test_bigquery_magic_create_dataset_fails(monkeypatch):


@pytest.mark.usefixtures("ipython_interactive")
def test_bigquery_magic_with_location(monkeypatch):
def test_bigquery_magic_with_location(monkeypatch, use_local_magics_context):
ip = IPython.get_ipython()
monkeypatch.setattr(bigquery, "bigquery_magics", None)
bigquery.load_ipython_extension(ip)
magics.context.credentials = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
)

run_query_patch = mock.patch(
"google.cloud.bigquery.magics.magics._run_query", autospec=True
Expand Down
Loading
Loading