From 3cbbc98111ae63f3740d64b7e836d7e28f37a63a Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 12 Jun 2026 01:53:44 +0000 Subject: [PATCH 1/5] chore(firestore): fix firestore flaky tests --- .../tests/system/test_system.py | 117 ++++++------------ 1 file changed, 39 insertions(+), 78 deletions(-) diff --git a/packages/google-cloud-firestore/tests/system/test_system.py b/packages/google-cloud-firestore/tests/system/test_system.py index fe475206d37f..a06b39d200d4 100644 --- a/packages/google-cloud-firestore/tests/system/test_system.py +++ b/packages/google-cloud-firestore/tests/system/test_system.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures import datetime import itertools import math import operator -from time import sleep from typing import Callable, Dict, List, Optional import google.auth @@ -389,15 +389,18 @@ def test_create_document_w_vector(client, cleanup, database): for v in client.collection(collection_id).order_by("embedding").get() ] == [data3, data1, data2] + vector_query_future = concurrent.futures.Future() + def on_snapshot(docs, changes, read_time): on_snapshot.results += docs + if len(on_snapshot.results) >= 3 and not vector_query_future.done(): + vector_query_future.set_result(True) on_snapshot.results = [] client.collection(collection_id).order_by("embedding").on_snapshot(on_snapshot) - # delay here so initial on_snapshot occurs and isn't combined with set - sleep(1) - assert [v.to_dict() for v in on_snapshot.results] == [data3, data1, data2] + vector_query_future.result(timeout=60.0) + assert [v.to_dict() for v in on_snapshot.results[:3]] == [data3, data1, data2] @pytest.mark.skipif(FIRESTORE_EMULATOR, reason="Require index and seed data") @@ -2336,15 +2339,13 @@ def test_watch_document(client, cleanup, database): doc_ref.set({"first": "Jane", "last": "Doe", "born": 1900}) cleanup(doc_ref.delete) - # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): - # Investigate why these sleep/polling delays are needed for listener tests. - # Having arbitrary delays is fragile and can lead to flakiness. - # Explore event-driven synchronization. - sleep(0.2) - # Setup listener + ada_future = concurrent.futures.Future() + def on_snapshot(docs, changes, read_time): on_snapshot.called_count += 1 + if docs and docs[0].get("first") == "Ada" and not ada_future.done(): + ada_future.set_result(True) on_snapshot.called_count = 0 @@ -2353,12 +2354,8 @@ def on_snapshot(docs, changes, read_time): # Alter document doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815}) - sleep(0.2) - - for _ in range(50): - if on_snapshot.called_count > 0: - break - sleep(0.2) + # Wait for the event-driven callback to resolve the future + ada_future.result(timeout=60.0) if on_snapshot.called_count not in (1, 2): raise AssertionError( @@ -2378,34 +2375,18 @@ def test_watch_collection(client, cleanup, database): cleanup(doc_ref.delete) # Setup listener + born_1815_future = concurrent.futures.Future() + def on_snapshot(docs, changes, read_time): - on_snapshot.called_count += 1 for doc in [doc for doc in docs if doc.id == doc_ref.id]: - on_snapshot.born = doc.get("born") - - on_snapshot.called_count = 0 - on_snapshot.born = 0 + if doc.get("born") == 1815 and not born_1815_future.done(): + born_1815_future.set_result(True) collection_ref.on_snapshot(on_snapshot) - # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): - # Investigate why these sleep/polling delays are needed for listener tests. - # Having arbitrary delays is fragile and can lead to flakiness. - # Explore event-driven synchronization. - # delay here so initial on_snapshot occurs and isn't combined with set - sleep(0.2) - doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815}) - for _ in range(50): - if on_snapshot.born == 1815: - break - sleep(0.2) - - if on_snapshot.born != 1815: - raise AssertionError( - "Expected the last document update to update born: " + str(on_snapshot.born) - ) + born_1815_future.result(timeout=60.0) @pytest.mark.parametrize("database", TEST_DATABASES, indirect=True) @@ -2419,20 +2400,19 @@ def test_watch_query(client, cleanup, database): doc_ref.set({"first": "Jane", "last": "Doe", "born": 1900}) cleanup(doc_ref.delete) - # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): - # Investigate why these sleep/polling delays are needed for listener tests. - # Having arbitrary delays is fragile and can lead to flakiness. - # Explore event-driven synchronization. - sleep(0.2) - # Setup listener + one_doc_future = concurrent.futures.Future() + def on_snapshot(docs, changes, read_time): on_snapshot.called_count += 1 - - # A snapshot should return the same thing as if a query ran now. - query_ran_query = collection_ref.where(filter=FieldFilter("first", "==", "Ada")) - query_ran = query_ran_query.stream() - assert len(docs) == len([i for i in query_ran]) + + if docs: + # A snapshot should return the same thing as if a query ran now. + query_ran_query = collection_ref.where(filter=FieldFilter("first", "==", "Ada")) + query_ran = query_ran_query.stream() + assert len(docs) == len([i for i in query_ran]) + if not one_doc_future.done(): + one_doc_future.set_result(True) on_snapshot.called_count = 0 @@ -2441,14 +2421,11 @@ def on_snapshot(docs, changes, read_time): # Alter document doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815}) - for _ in range(50): - if on_snapshot.called_count == 1: - return - sleep(0.2) + one_doc_future.result(timeout=60.0) - if on_snapshot.called_count != 1: + if on_snapshot.called_count not in (1, 2): raise AssertionError( - "Failed to get exactly one document change: count: " + "Failed to get expected document change count: " + str(on_snapshot.called_count) ) @@ -2788,6 +2765,8 @@ def test_watch_query_order(client, cleanup, database): ) # Setup listener + five_docs_future = concurrent.futures.Future() + def on_snapshot(docs, changes, read_time): try: docs = [i for i in docs if i.id.endswith(UNIQUE_RESOURCE_ID)] @@ -2808,22 +2787,14 @@ def on_snapshot(docs, changes, read_time): assert snapshot.get("born") == query.get("born"), ( "expect the sort order to match, born" ) - on_snapshot.called_count += 1 - on_snapshot.last_doc_count = len(docs) + if not five_docs_future.done(): + five_docs_future.set_result(True) except Exception as e: - on_snapshot.failed = e + if not five_docs_future.done(): + five_docs_future.set_exception(e) - on_snapshot.called_count = 0 - on_snapshot.last_doc_count = 0 - on_snapshot.failed = None query_ref.on_snapshot(on_snapshot) - # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): - # Investigate why these sleep/polling delays are needed for listener tests. - # Having arbitrary delays is fragile and can lead to flakiness. - # Explore event-driven synchronization. - sleep(0.2) - doc_ref1.set({"first": "Ada", "last": "Lovelace", "born": 1815}) cleanup(doc_ref1.delete) @@ -2839,18 +2810,8 @@ def on_snapshot(docs, changes, read_time): doc_ref5.set({"first": "Ada", "last": "lovelace", "born": 1815}) cleanup(doc_ref5.delete) - for _ in range(50): - if on_snapshot.last_doc_count == 5: - break - sleep(0.2) - - if on_snapshot.failed: - raise on_snapshot.failed - - if on_snapshot.last_doc_count != 5: - raise AssertionError( - "5 docs expected in snapshot method " + str(on_snapshot.last_doc_count) - ) + # Wait for the future to be resolved + five_docs_future.result(timeout=60.0) @pytest.mark.parametrize("database", TEST_DATABASES_W_ENTERPRISE, indirect=True) From 84f6a80c6104310a6126c565cdf2f15ffdbf84d7 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 12 Jun 2026 02:30:00 +0000 Subject: [PATCH 2/5] make cleanup concurrent --- packages/google-cloud-firestore/tests/system/test_system.py | 5 +++-- .../google-cloud-firestore/tests/system/test_system_async.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/google-cloud-firestore/tests/system/test_system.py b/packages/google-cloud-firestore/tests/system/test_system.py index a06b39d200d4..b77e3441a322 100644 --- a/packages/google-cloud-firestore/tests/system/test_system.py +++ b/packages/google-cloud-firestore/tests/system/test_system.py @@ -80,8 +80,9 @@ def cleanup(): operations = [] yield operations.append - for operation in operations: - operation() + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as executor: + list(executor.map(lambda op: op(), operations)) @pytest.fixture diff --git a/packages/google-cloud-firestore/tests/system/test_system_async.py b/packages/google-cloud-firestore/tests/system/test_system_async.py index ef8ca5b84d5f..d163c1127a8f 100644 --- a/packages/google-cloud-firestore/tests/system/test_system_async.py +++ b/packages/google-cloud-firestore/tests/system/test_system_async.py @@ -160,8 +160,9 @@ async def cleanup(): operations = [] yield operations.append - for operation in operations: - await operation() + import asyncio + if operations: + await asyncio.gather(*[operation() for operation in operations]) @pytest.fixture From 0a631805f008e658b8937ae025172a2170bc8686 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 12 Jun 2026 03:00:00 +0000 Subject: [PATCH 3/5] use strictly 10 parallel workers --- .../client-post-processing/firestore-integration.yaml | 4 ++-- packages/google-cloud-firestore/noxfile.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.librarian/generator-input/client-post-processing/firestore-integration.yaml b/.librarian/generator-input/client-post-processing/firestore-integration.yaml index fd5fb175852d..05514bea877c 100644 --- a/.librarian/generator-input/client-post-processing/firestore-integration.yaml +++ b/.librarian/generator-input/client-post-processing/firestore-integration.yaml @@ -619,7 +619,7 @@ replacements: session.run( "py.test", "-n", - "auto", + "10", "--quiet", f"--junitxml=system_{session.python}_sponge_log.xml", system_test_path, @@ -629,7 +629,7 @@ replacements: session.run( "py.test", "-n", - "auto", + "10", "--quiet", f"--junitxml=system_{session.python}_sponge_log.xml", system_test_folder_path, diff --git a/packages/google-cloud-firestore/noxfile.py b/packages/google-cloud-firestore/noxfile.py index 80fe70fa5798..e5c2acdffe3e 100644 --- a/packages/google-cloud-firestore/noxfile.py +++ b/packages/google-cloud-firestore/noxfile.py @@ -404,7 +404,7 @@ def system(session): session.run( "py.test", "-n", - "auto", + "10", "--quiet", f"--junitxml=system_{session.python}_sponge_log.xml", system_test_path, @@ -414,7 +414,7 @@ def system(session): session.run( "py.test", "-n", - "auto", + "10", "--quiet", f"--junitxml=system_{session.python}_sponge_log.xml", system_test_folder_path, From 49df1b384d137dc69632375d7605e7ea5b19a93d Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 12 Jun 2026 03:23:23 +0000 Subject: [PATCH 4/5] remove inline imports --- packages/google-cloud-firestore/tests/system/test_system.py | 1 - .../google-cloud-firestore/tests/system/test_system_async.py | 1 - 2 files changed, 2 deletions(-) diff --git a/packages/google-cloud-firestore/tests/system/test_system.py b/packages/google-cloud-firestore/tests/system/test_system.py index b77e3441a322..f042a379b243 100644 --- a/packages/google-cloud-firestore/tests/system/test_system.py +++ b/packages/google-cloud-firestore/tests/system/test_system.py @@ -80,7 +80,6 @@ def cleanup(): operations = [] yield operations.append - import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: list(executor.map(lambda op: op(), operations)) diff --git a/packages/google-cloud-firestore/tests/system/test_system_async.py b/packages/google-cloud-firestore/tests/system/test_system_async.py index d163c1127a8f..6356f24bea8c 100644 --- a/packages/google-cloud-firestore/tests/system/test_system_async.py +++ b/packages/google-cloud-firestore/tests/system/test_system_async.py @@ -160,7 +160,6 @@ async def cleanup(): operations = [] yield operations.append - import asyncio if operations: await asyncio.gather(*[operation() for operation in operations]) From 4cccfda2c4fa1544a597e2cc044a49459dad58c3 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 12 Jun 2026 05:41:44 +0000 Subject: [PATCH 5/5] fix lint --- .../tests/system/test_system.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/google-cloud-firestore/tests/system/test_system.py b/packages/google-cloud-firestore/tests/system/test_system.py index f042a379b243..7d8394ea5673 100644 --- a/packages/google-cloud-firestore/tests/system/test_system.py +++ b/packages/google-cloud-firestore/tests/system/test_system.py @@ -390,7 +390,7 @@ def test_create_document_w_vector(client, cleanup, database): ] == [data3, data1, data2] vector_query_future = concurrent.futures.Future() - + def on_snapshot(docs, changes, read_time): on_snapshot.results += docs if len(on_snapshot.results) >= 3 and not vector_query_future.done(): @@ -2341,7 +2341,7 @@ def test_watch_document(client, cleanup, database): # Setup listener ada_future = concurrent.futures.Future() - + def on_snapshot(docs, changes, read_time): on_snapshot.called_count += 1 if docs and docs[0].get("first") == "Ada" and not ada_future.done(): @@ -2376,7 +2376,7 @@ def test_watch_collection(client, cleanup, database): # Setup listener born_1815_future = concurrent.futures.Future() - + def on_snapshot(docs, changes, read_time): for doc in [doc for doc in docs if doc.id == doc_ref.id]: if doc.get("born") == 1815 and not born_1815_future.done(): @@ -2402,13 +2402,15 @@ def test_watch_query(client, cleanup, database): # Setup listener one_doc_future = concurrent.futures.Future() - + def on_snapshot(docs, changes, read_time): on_snapshot.called_count += 1 - + if docs: # A snapshot should return the same thing as if a query ran now. - query_ran_query = collection_ref.where(filter=FieldFilter("first", "==", "Ada")) + query_ran_query = collection_ref.where( + filter=FieldFilter("first", "==", "Ada") + ) query_ran = query_ran_query.stream() assert len(docs) == len([i for i in query_ran]) if not one_doc_future.done(): @@ -2766,7 +2768,7 @@ def test_watch_query_order(client, cleanup, database): # Setup listener five_docs_future = concurrent.futures.Future() - + def on_snapshot(docs, changes, read_time): try: docs = [i for i in docs if i.id.endswith(UNIQUE_RESOURCE_ID)]