diff --git a/.librarian/generator-input/client-post-processing/firestore-integration.yaml b/.librarian/generator-input/client-post-processing/firestore-integration.yaml index fd5fb175852d..05514bea877c 100644 --- a/.librarian/generator-input/client-post-processing/firestore-integration.yaml +++ b/.librarian/generator-input/client-post-processing/firestore-integration.yaml @@ -619,7 +619,7 @@ replacements: session.run( "py.test", "-n", - "auto", + "10", "--quiet", f"--junitxml=system_{session.python}_sponge_log.xml", system_test_path, @@ -629,7 +629,7 @@ replacements: session.run( "py.test", "-n", - "auto", + "10", "--quiet", f"--junitxml=system_{session.python}_sponge_log.xml", system_test_folder_path, diff --git a/packages/google-cloud-firestore/noxfile.py b/packages/google-cloud-firestore/noxfile.py index 80fe70fa5798..e5c2acdffe3e 100644 --- a/packages/google-cloud-firestore/noxfile.py +++ b/packages/google-cloud-firestore/noxfile.py @@ -404,7 +404,7 @@ def system(session): session.run( "py.test", "-n", - "auto", + "10", "--quiet", f"--junitxml=system_{session.python}_sponge_log.xml", system_test_path, @@ -414,7 +414,7 @@ def system(session): session.run( "py.test", "-n", - "auto", + "10", "--quiet", f"--junitxml=system_{session.python}_sponge_log.xml", system_test_folder_path, diff --git a/packages/google-cloud-firestore/tests/system/test_system.py b/packages/google-cloud-firestore/tests/system/test_system.py index fe475206d37f..7d8394ea5673 100644 --- a/packages/google-cloud-firestore/tests/system/test_system.py +++ b/packages/google-cloud-firestore/tests/system/test_system.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures import datetime import itertools import math import operator -from time import sleep from typing import Callable, Dict, List, Optional import google.auth @@ -80,8 +80,8 @@ def cleanup(): operations = [] yield operations.append - for operation in operations: - operation() + with concurrent.futures.ThreadPoolExecutor() as executor: + list(executor.map(lambda op: op(), operations)) @pytest.fixture @@ -389,15 +389,18 @@ def test_create_document_w_vector(client, cleanup, database): for v in client.collection(collection_id).order_by("embedding").get() ] == [data3, data1, data2] + vector_query_future = concurrent.futures.Future() + def on_snapshot(docs, changes, read_time): on_snapshot.results += docs + if len(on_snapshot.results) >= 3 and not vector_query_future.done(): + vector_query_future.set_result(True) on_snapshot.results = [] client.collection(collection_id).order_by("embedding").on_snapshot(on_snapshot) - # delay here so initial on_snapshot occurs and isn't combined with set - sleep(1) - assert [v.to_dict() for v in on_snapshot.results] == [data3, data1, data2] + vector_query_future.result(timeout=60.0) + assert [v.to_dict() for v in on_snapshot.results[:3]] == [data3, data1, data2] @pytest.mark.skipif(FIRESTORE_EMULATOR, reason="Require index and seed data") @@ -2336,15 +2339,13 @@ def test_watch_document(client, cleanup, database): doc_ref.set({"first": "Jane", "last": "Doe", "born": 1900}) cleanup(doc_ref.delete) - # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): - # Investigate why these sleep/polling delays are needed for listener tests. - # Having arbitrary delays is fragile and can lead to flakiness. - # Explore event-driven synchronization. - sleep(0.2) - # Setup listener + ada_future = concurrent.futures.Future() + def on_snapshot(docs, changes, read_time): on_snapshot.called_count += 1 + if docs and docs[0].get("first") == "Ada" and not ada_future.done(): + ada_future.set_result(True) on_snapshot.called_count = 0 @@ -2353,12 +2354,8 @@ def on_snapshot(docs, changes, read_time): # Alter document doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815}) - sleep(0.2) - - for _ in range(50): - if on_snapshot.called_count > 0: - break - sleep(0.2) + # Wait for the event-driven callback to resolve the future + ada_future.result(timeout=60.0) if on_snapshot.called_count not in (1, 2): raise AssertionError( @@ -2378,34 +2375,18 @@ def test_watch_collection(client, cleanup, database): cleanup(doc_ref.delete) # Setup listener + born_1815_future = concurrent.futures.Future() + def on_snapshot(docs, changes, read_time): - on_snapshot.called_count += 1 for doc in [doc for doc in docs if doc.id == doc_ref.id]: - on_snapshot.born = doc.get("born") - - on_snapshot.called_count = 0 - on_snapshot.born = 0 + if doc.get("born") == 1815 and not born_1815_future.done(): + born_1815_future.set_result(True) collection_ref.on_snapshot(on_snapshot) - # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): - # Investigate why these sleep/polling delays are needed for listener tests. - # Having arbitrary delays is fragile and can lead to flakiness. - # Explore event-driven synchronization. - # delay here so initial on_snapshot occurs and isn't combined with set - sleep(0.2) - doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815}) - for _ in range(50): - if on_snapshot.born == 1815: - break - sleep(0.2) - - if on_snapshot.born != 1815: - raise AssertionError( - "Expected the last document update to update born: " + str(on_snapshot.born) - ) + born_1815_future.result(timeout=60.0) @pytest.mark.parametrize("database", TEST_DATABASES, indirect=True) @@ -2419,20 +2400,21 @@ def test_watch_query(client, cleanup, database): doc_ref.set({"first": "Jane", "last": "Doe", "born": 1900}) cleanup(doc_ref.delete) - # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): - # Investigate why these sleep/polling delays are needed for listener tests. - # Having arbitrary delays is fragile and can lead to flakiness. - # Explore event-driven synchronization. - sleep(0.2) - # Setup listener + one_doc_future = concurrent.futures.Future() + def on_snapshot(docs, changes, read_time): on_snapshot.called_count += 1 - # A snapshot should return the same thing as if a query ran now. - query_ran_query = collection_ref.where(filter=FieldFilter("first", "==", "Ada")) - query_ran = query_ran_query.stream() - assert len(docs) == len([i for i in query_ran]) + if docs: + # A snapshot should return the same thing as if a query ran now. + query_ran_query = collection_ref.where( + filter=FieldFilter("first", "==", "Ada") + ) + query_ran = query_ran_query.stream() + assert len(docs) == len([i for i in query_ran]) + if not one_doc_future.done(): + one_doc_future.set_result(True) on_snapshot.called_count = 0 @@ -2441,14 +2423,11 @@ def on_snapshot(docs, changes, read_time): # Alter document doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815}) - for _ in range(50): - if on_snapshot.called_count == 1: - return - sleep(0.2) + one_doc_future.result(timeout=60.0) - if on_snapshot.called_count != 1: + if on_snapshot.called_count not in (1, 2): raise AssertionError( - "Failed to get exactly one document change: count: " + "Failed to get expected document change count: " + str(on_snapshot.called_count) ) @@ -2788,6 +2767,8 @@ def test_watch_query_order(client, cleanup, database): ) # Setup listener + five_docs_future = concurrent.futures.Future() + def on_snapshot(docs, changes, read_time): try: docs = [i for i in docs if i.id.endswith(UNIQUE_RESOURCE_ID)] @@ -2808,22 +2789,14 @@ def on_snapshot(docs, changes, read_time): assert snapshot.get("born") == query.get("born"), ( "expect the sort order to match, born" ) - on_snapshot.called_count += 1 - on_snapshot.last_doc_count = len(docs) + if not five_docs_future.done(): + five_docs_future.set_result(True) except Exception as e: - on_snapshot.failed = e + if not five_docs_future.done(): + five_docs_future.set_exception(e) - on_snapshot.called_count = 0 - on_snapshot.last_doc_count = 0 - on_snapshot.failed = None query_ref.on_snapshot(on_snapshot) - # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): - # Investigate why these sleep/polling delays are needed for listener tests. - # Having arbitrary delays is fragile and can lead to flakiness. - # Explore event-driven synchronization. - sleep(0.2) - doc_ref1.set({"first": "Ada", "last": "Lovelace", "born": 1815}) cleanup(doc_ref1.delete) @@ -2839,18 +2812,8 @@ def on_snapshot(docs, changes, read_time): doc_ref5.set({"first": "Ada", "last": "lovelace", "born": 1815}) cleanup(doc_ref5.delete) - for _ in range(50): - if on_snapshot.last_doc_count == 5: - break - sleep(0.2) - - if on_snapshot.failed: - raise on_snapshot.failed - - if on_snapshot.last_doc_count != 5: - raise AssertionError( - "5 docs expected in snapshot method " + str(on_snapshot.last_doc_count) - ) + # Wait for the future to be resolved + five_docs_future.result(timeout=60.0) @pytest.mark.parametrize("database", TEST_DATABASES_W_ENTERPRISE, indirect=True) diff --git a/packages/google-cloud-firestore/tests/system/test_system_async.py b/packages/google-cloud-firestore/tests/system/test_system_async.py index ef8ca5b84d5f..6356f24bea8c 100644 --- a/packages/google-cloud-firestore/tests/system/test_system_async.py +++ b/packages/google-cloud-firestore/tests/system/test_system_async.py @@ -160,8 +160,8 @@ async def cleanup(): operations = [] yield operations.append - for operation in operations: - await operation() + if operations: + await asyncio.gather(*[operation() for operation in operations]) @pytest.fixture