-
Notifications
You must be signed in to change notification settings - Fork 1.7k
chore(firestore): fix firestore flaky tests #17439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -12,11 +12,11 @@ | |||||||||
| # See the License for the specific language governing permissions and | ||||||||||
| # limitations under the License. | ||||||||||
|
|
||||||||||
| import concurrent.futures | ||||||||||
| import datetime | ||||||||||
| import itertools | ||||||||||
| import math | ||||||||||
| import operator | ||||||||||
| from time import sleep | ||||||||||
| from typing import Callable, Dict, List, Optional | ||||||||||
|
|
||||||||||
| import google.auth | ||||||||||
|
|
@@ -80,8 +80,8 @@ def cleanup(): | |||||||||
| operations = [] | ||||||||||
| yield operations.append | ||||||||||
|
|
||||||||||
| for operation in operations: | ||||||||||
| operation() | ||||||||||
| with concurrent.futures.ThreadPoolExecutor() as executor: | ||||||||||
| list(executor.map(lambda op: op(), operations)) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| @pytest.fixture | ||||||||||
|
|
@@ -389,15 +389,18 @@ def test_create_document_w_vector(client, cleanup, database): | |||||||||
| for v in client.collection(collection_id).order_by("embedding").get() | ||||||||||
| ] == [data3, data1, data2] | ||||||||||
|
|
||||||||||
| vector_query_future = concurrent.futures.Future() | ||||||||||
|
|
||||||||||
| def on_snapshot(docs, changes, read_time): | ||||||||||
| on_snapshot.results += docs | ||||||||||
| if len(on_snapshot.results) >= 3 and not vector_query_future.done(): | ||||||||||
| vector_query_future.set_result(True) | ||||||||||
|
|
||||||||||
| on_snapshot.results = [] | ||||||||||
| client.collection(collection_id).order_by("embedding").on_snapshot(on_snapshot) | ||||||||||
|
|
||||||||||
| # delay here so initial on_snapshot occurs and isn't combined with set | ||||||||||
| sleep(1) | ||||||||||
| assert [v.to_dict() for v in on_snapshot.results] == [data3, data1, data2] | ||||||||||
| vector_query_future.result(timeout=60.0) | ||||||||||
| assert [v.to_dict() for v in on_snapshot.results[:3]] == [data3, data1, data2] | ||||||||||
|
|
||||||||||
|
|
||||||||||
| @pytest.mark.skipif(FIRESTORE_EMULATOR, reason="Require index and seed data") | ||||||||||
|
|
@@ -2336,15 +2339,13 @@ def test_watch_document(client, cleanup, database): | |||||||||
| doc_ref.set({"first": "Jane", "last": "Doe", "born": 1900}) | ||||||||||
| cleanup(doc_ref.delete) | ||||||||||
|
|
||||||||||
| # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): | ||||||||||
| # Investigate why these sleep/polling delays are needed for listener tests. | ||||||||||
| # Having arbitrary delays is fragile and can lead to flakiness. | ||||||||||
| # Explore event-driven synchronization. | ||||||||||
| sleep(0.2) | ||||||||||
|
|
||||||||||
| # Setup listener | ||||||||||
| ada_future = concurrent.futures.Future() | ||||||||||
|
|
||||||||||
| def on_snapshot(docs, changes, read_time): | ||||||||||
| on_snapshot.called_count += 1 | ||||||||||
| if docs and docs[0].get("first") == "Ada" and not ada_future.done(): | ||||||||||
| ada_future.set_result(True) | ||||||||||
|
|
||||||||||
| on_snapshot.called_count = 0 | ||||||||||
|
|
||||||||||
|
|
@@ -2353,12 +2354,8 @@ def on_snapshot(docs, changes, read_time): | |||||||||
| # Alter document | ||||||||||
| doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815}) | ||||||||||
|
|
||||||||||
| sleep(0.2) | ||||||||||
|
|
||||||||||
| for _ in range(50): | ||||||||||
| if on_snapshot.called_count > 0: | ||||||||||
| break | ||||||||||
| sleep(0.2) | ||||||||||
| # Wait for the event-driven callback to resolve the future | ||||||||||
| ada_future.result(timeout=60.0) | ||||||||||
|
|
||||||||||
| if on_snapshot.called_count not in (1, 2): | ||||||||||
| raise AssertionError( | ||||||||||
|
|
@@ -2378,34 +2375,18 @@ def test_watch_collection(client, cleanup, database): | |||||||||
| cleanup(doc_ref.delete) | ||||||||||
|
|
||||||||||
| # Setup listener | ||||||||||
| born_1815_future = concurrent.futures.Future() | ||||||||||
|
|
||||||||||
| def on_snapshot(docs, changes, read_time): | ||||||||||
| on_snapshot.called_count += 1 | ||||||||||
| for doc in [doc for doc in docs if doc.id == doc_ref.id]: | ||||||||||
| on_snapshot.born = doc.get("born") | ||||||||||
|
|
||||||||||
| on_snapshot.called_count = 0 | ||||||||||
| on_snapshot.born = 0 | ||||||||||
| if doc.get("born") == 1815 and not born_1815_future.done(): | ||||||||||
| born_1815_future.set_result(True) | ||||||||||
|
|
||||||||||
| collection_ref.on_snapshot(on_snapshot) | ||||||||||
|
|
||||||||||
| # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): | ||||||||||
| # Investigate why these sleep/polling delays are needed for listener tests. | ||||||||||
| # Having arbitrary delays is fragile and can lead to flakiness. | ||||||||||
| # Explore event-driven synchronization. | ||||||||||
| # delay here so initial on_snapshot occurs and isn't combined with set | ||||||||||
| sleep(0.2) | ||||||||||
|
|
||||||||||
| doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815}) | ||||||||||
|
|
||||||||||
| for _ in range(50): | ||||||||||
| if on_snapshot.born == 1815: | ||||||||||
| break | ||||||||||
| sleep(0.2) | ||||||||||
|
|
||||||||||
| if on_snapshot.born != 1815: | ||||||||||
| raise AssertionError( | ||||||||||
| "Expected the last document update to update born: " + str(on_snapshot.born) | ||||||||||
| ) | ||||||||||
| born_1815_future.result(timeout=60.0) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| @pytest.mark.parametrize("database", TEST_DATABASES, indirect=True) | ||||||||||
|
|
@@ -2419,20 +2400,21 @@ def test_watch_query(client, cleanup, database): | |||||||||
| doc_ref.set({"first": "Jane", "last": "Doe", "born": 1900}) | ||||||||||
| cleanup(doc_ref.delete) | ||||||||||
|
|
||||||||||
| # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): | ||||||||||
| # Investigate why these sleep/polling delays are needed for listener tests. | ||||||||||
| # Having arbitrary delays is fragile and can lead to flakiness. | ||||||||||
| # Explore event-driven synchronization. | ||||||||||
| sleep(0.2) | ||||||||||
|
|
||||||||||
| # Setup listener | ||||||||||
| one_doc_future = concurrent.futures.Future() | ||||||||||
|
|
||||||||||
| def on_snapshot(docs, changes, read_time): | ||||||||||
| on_snapshot.called_count += 1 | ||||||||||
|
|
||||||||||
| # A snapshot should return the same thing as if a query ran now. | ||||||||||
| query_ran_query = collection_ref.where(filter=FieldFilter("first", "==", "Ada")) | ||||||||||
| query_ran = query_ran_query.stream() | ||||||||||
| assert len(docs) == len([i for i in query_ran]) | ||||||||||
| if docs: | ||||||||||
| # A snapshot should return the same thing as if a query ran now. | ||||||||||
| query_ran_query = collection_ref.where( | ||||||||||
| filter=FieldFilter("first", "==", "Ada") | ||||||||||
| ) | ||||||||||
| query_ran = query_ran_query.stream() | ||||||||||
| assert len(docs) == len([i for i in query_ran]) | ||||||||||
| if not one_doc_future.done(): | ||||||||||
| one_doc_future.set_result(True) | ||||||||||
|
|
||||||||||
| on_snapshot.called_count = 0 | ||||||||||
|
|
||||||||||
|
|
@@ -2441,14 +2423,11 @@ def on_snapshot(docs, changes, read_time): | |||||||||
| # Alter document | ||||||||||
| doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815}) | ||||||||||
|
|
||||||||||
| for _ in range(50): | ||||||||||
| if on_snapshot.called_count == 1: | ||||||||||
| return | ||||||||||
| sleep(0.2) | ||||||||||
| one_doc_future.result(timeout=60.0) | ||||||||||
|
|
||||||||||
| if on_snapshot.called_count != 1: | ||||||||||
| if on_snapshot.called_count not in (1, 2): | ||||||||||
| raise AssertionError( | ||||||||||
| "Failed to get exactly one document change: count: " | ||||||||||
| "Failed to get expected document change count: " | ||||||||||
| + str(on_snapshot.called_count) | ||||||||||
| ) | ||||||||||
|
|
||||||||||
|
|
@@ -2788,6 +2767,8 @@ def test_watch_query_order(client, cleanup, database): | |||||||||
| ) | ||||||||||
|
|
||||||||||
| # Setup listener | ||||||||||
| five_docs_future = concurrent.futures.Future() | ||||||||||
|
|
||||||||||
| def on_snapshot(docs, changes, read_time): | ||||||||||
| try: | ||||||||||
| docs = [i for i in docs if i.id.endswith(UNIQUE_RESOURCE_ID)] | ||||||||||
|
|
@@ -2808,22 +2789,14 @@ def on_snapshot(docs, changes, read_time): | |||||||||
| assert snapshot.get("born") == query.get("born"), ( | ||||||||||
| "expect the sort order to match, born" | ||||||||||
| ) | ||||||||||
| on_snapshot.called_count += 1 | ||||||||||
| on_snapshot.last_doc_count = len(docs) | ||||||||||
| if not five_docs_future.done(): | ||||||||||
| five_docs_future.set_result(True) | ||||||||||
|
Comment on lines
+2792
to
+2793
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The future is currently resolved on the very first snapshot callback, regardless of how many documents are returned. Since the test writes 5 documents and expects to verify their ordering, resolving the future prematurely when
Suggested change
|
||||||||||
| except Exception as e: | ||||||||||
| on_snapshot.failed = e | ||||||||||
| if not five_docs_future.done(): | ||||||||||
| five_docs_future.set_exception(e) | ||||||||||
|
|
||||||||||
| on_snapshot.called_count = 0 | ||||||||||
| on_snapshot.last_doc_count = 0 | ||||||||||
| on_snapshot.failed = None | ||||||||||
| query_ref.on_snapshot(on_snapshot) | ||||||||||
|
|
||||||||||
| # TODO(https://github.com/googleapis/google-cloud-python/issues/17428): | ||||||||||
| # Investigate why these sleep/polling delays are needed for listener tests. | ||||||||||
| # Having arbitrary delays is fragile and can lead to flakiness. | ||||||||||
| # Explore event-driven synchronization. | ||||||||||
| sleep(0.2) | ||||||||||
|
|
||||||||||
| doc_ref1.set({"first": "Ada", "last": "Lovelace", "born": 1815}) | ||||||||||
| cleanup(doc_ref1.delete) | ||||||||||
|
|
||||||||||
|
|
@@ -2839,18 +2812,8 @@ def on_snapshot(docs, changes, read_time): | |||||||||
| doc_ref5.set({"first": "Ada", "last": "lovelace", "born": 1815}) | ||||||||||
| cleanup(doc_ref5.delete) | ||||||||||
|
|
||||||||||
| for _ in range(50): | ||||||||||
| if on_snapshot.last_doc_count == 5: | ||||||||||
| break | ||||||||||
| sleep(0.2) | ||||||||||
|
|
||||||||||
| if on_snapshot.failed: | ||||||||||
| raise on_snapshot.failed | ||||||||||
|
|
||||||||||
| if on_snapshot.last_doc_count != 5: | ||||||||||
| raise AssertionError( | ||||||||||
| "5 docs expected in snapshot method " + str(on_snapshot.last_doc_count) | ||||||||||
| ) | ||||||||||
| # Wait for the future to be resolved | ||||||||||
| five_docs_future.result(timeout=60.0) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| @pytest.mark.parametrize("database", TEST_DATABASES_W_ENTERPRISE, indirect=True) | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using
+=to accumulate results inon_snapshotcan lead to duplicate documents if multiple snapshots are delivered. Instead, assign the latestdocslist directly toon_snapshot.resultsand check its length to resolve the future safely.