Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ replacements:
session.run(
"py.test",
"-n",
"auto",
"10",
"--quiet",
f"--junitxml=system_{session.python}_sponge_log.xml",
system_test_path,
Expand All @@ -629,7 +629,7 @@ replacements:
session.run(
"py.test",
"-n",
"auto",
"10",
"--quiet",
f"--junitxml=system_{session.python}_sponge_log.xml",
system_test_folder_path,
Expand Down
4 changes: 2 additions & 2 deletions packages/google-cloud-firestore/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def system(session):
session.run(
"py.test",
"-n",
"auto",
"10",
"--quiet",
f"--junitxml=system_{session.python}_sponge_log.xml",
system_test_path,
Expand All @@ -414,7 +414,7 @@ def system(session):
session.run(
"py.test",
"-n",
"auto",
"10",
"--quiet",
f"--junitxml=system_{session.python}_sponge_log.xml",
system_test_folder_path,
Expand Down
121 changes: 42 additions & 79 deletions packages/google-cloud-firestore/tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import datetime
import itertools
import math
import operator
from time import sleep
from typing import Callable, Dict, List, Optional

import google.auth
Expand Down Expand Up @@ -80,8 +80,8 @@ def cleanup():
operations = []
yield operations.append

for operation in operations:
operation()
with concurrent.futures.ThreadPoolExecutor() as executor:
list(executor.map(lambda op: op(), operations))


@pytest.fixture
Expand Down Expand Up @@ -389,15 +389,18 @@ def test_create_document_w_vector(client, cleanup, database):
for v in client.collection(collection_id).order_by("embedding").get()
] == [data3, data1, data2]

vector_query_future = concurrent.futures.Future()

def on_snapshot(docs, changes, read_time):
on_snapshot.results += docs
if len(on_snapshot.results) >= 3 and not vector_query_future.done():
vector_query_future.set_result(True)
Comment on lines 394 to +397

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using += to accumulate results in on_snapshot can lead to duplicate documents if multiple snapshots are delivered. Instead, assign the latest docs list directly to on_snapshot.results and check its length to resolve the future safely.

Suggested change
def on_snapshot(docs, changes, read_time):
on_snapshot.results += docs
if len(on_snapshot.results) >= 3 and not vector_query_future.done():
vector_query_future.set_result(True)
def on_snapshot(docs, changes, read_time):
on_snapshot.results = docs
if len(docs) >= 3 and not vector_query_future.done():
vector_query_future.set_result(True)


on_snapshot.results = []
client.collection(collection_id).order_by("embedding").on_snapshot(on_snapshot)

# delay here so initial on_snapshot occurs and isn't combined with set
sleep(1)
assert [v.to_dict() for v in on_snapshot.results] == [data3, data1, data2]
vector_query_future.result(timeout=60.0)
assert [v.to_dict() for v in on_snapshot.results[:3]] == [data3, data1, data2]


@pytest.mark.skipif(FIRESTORE_EMULATOR, reason="Require index and seed data")
Expand Down Expand Up @@ -2336,15 +2339,13 @@ def test_watch_document(client, cleanup, database):
doc_ref.set({"first": "Jane", "last": "Doe", "born": 1900})
cleanup(doc_ref.delete)

# TODO(https://github.com/googleapis/google-cloud-python/issues/17428):
# Investigate why these sleep/polling delays are needed for listener tests.
# Having arbitrary delays is fragile and can lead to flakiness.
# Explore event-driven synchronization.
sleep(0.2)

# Setup listener
ada_future = concurrent.futures.Future()

def on_snapshot(docs, changes, read_time):
on_snapshot.called_count += 1
if docs and docs[0].get("first") == "Ada" and not ada_future.done():
ada_future.set_result(True)

on_snapshot.called_count = 0

Expand All @@ -2353,12 +2354,8 @@ def on_snapshot(docs, changes, read_time):
# Alter document
doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815})

sleep(0.2)

for _ in range(50):
if on_snapshot.called_count > 0:
break
sleep(0.2)
# Wait for the event-driven callback to resolve the future
ada_future.result(timeout=60.0)

if on_snapshot.called_count not in (1, 2):
raise AssertionError(
Expand All @@ -2378,34 +2375,18 @@ def test_watch_collection(client, cleanup, database):
cleanup(doc_ref.delete)

# Setup listener
born_1815_future = concurrent.futures.Future()

def on_snapshot(docs, changes, read_time):
on_snapshot.called_count += 1
for doc in [doc for doc in docs if doc.id == doc_ref.id]:
on_snapshot.born = doc.get("born")

on_snapshot.called_count = 0
on_snapshot.born = 0
if doc.get("born") == 1815 and not born_1815_future.done():
born_1815_future.set_result(True)

collection_ref.on_snapshot(on_snapshot)

# TODO(https://github.com/googleapis/google-cloud-python/issues/17428):
# Investigate why these sleep/polling delays are needed for listener tests.
# Having arbitrary delays is fragile and can lead to flakiness.
# Explore event-driven synchronization.
# delay here so initial on_snapshot occurs and isn't combined with set
sleep(0.2)

doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815})

for _ in range(50):
if on_snapshot.born == 1815:
break
sleep(0.2)

if on_snapshot.born != 1815:
raise AssertionError(
"Expected the last document update to update born: " + str(on_snapshot.born)
)
born_1815_future.result(timeout=60.0)


@pytest.mark.parametrize("database", TEST_DATABASES, indirect=True)
Expand All @@ -2419,20 +2400,21 @@ def test_watch_query(client, cleanup, database):
doc_ref.set({"first": "Jane", "last": "Doe", "born": 1900})
cleanup(doc_ref.delete)

# TODO(https://github.com/googleapis/google-cloud-python/issues/17428):
# Investigate why these sleep/polling delays are needed for listener tests.
# Having arbitrary delays is fragile and can lead to flakiness.
# Explore event-driven synchronization.
sleep(0.2)

# Setup listener
one_doc_future = concurrent.futures.Future()

def on_snapshot(docs, changes, read_time):
on_snapshot.called_count += 1

# A snapshot should return the same thing as if a query ran now.
query_ran_query = collection_ref.where(filter=FieldFilter("first", "==", "Ada"))
query_ran = query_ran_query.stream()
assert len(docs) == len([i for i in query_ran])
if docs:
# A snapshot should return the same thing as if a query ran now.
query_ran_query = collection_ref.where(
filter=FieldFilter("first", "==", "Ada")
)
query_ran = query_ran_query.stream()
assert len(docs) == len([i for i in query_ran])
if not one_doc_future.done():
one_doc_future.set_result(True)

on_snapshot.called_count = 0

Expand All @@ -2441,14 +2423,11 @@ def on_snapshot(docs, changes, read_time):
# Alter document
doc_ref.set({"first": "Ada", "last": "Lovelace", "born": 1815})

for _ in range(50):
if on_snapshot.called_count == 1:
return
sleep(0.2)
one_doc_future.result(timeout=60.0)

if on_snapshot.called_count != 1:
if on_snapshot.called_count not in (1, 2):
raise AssertionError(
"Failed to get exactly one document change: count: "
"Failed to get expected document change count: "
+ str(on_snapshot.called_count)
)

Expand Down Expand Up @@ -2788,6 +2767,8 @@ def test_watch_query_order(client, cleanup, database):
)

# Setup listener
five_docs_future = concurrent.futures.Future()

def on_snapshot(docs, changes, read_time):
try:
docs = [i for i in docs if i.id.endswith(UNIQUE_RESOURCE_ID)]
Expand All @@ -2808,22 +2789,14 @@ def on_snapshot(docs, changes, read_time):
assert snapshot.get("born") == query.get("born"), (
"expect the sort order to match, born"
)
on_snapshot.called_count += 1
on_snapshot.last_doc_count = len(docs)
if not five_docs_future.done():
five_docs_future.set_result(True)
Comment on lines +2792 to +2793

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The future is currently resolved on the very first snapshot callback, regardless of how many documents are returned. Since the test writes 5 documents and expects to verify their ordering, resolving the future prematurely when len(docs) < 5 will cause the test to pass without actually verifying the ordering of all 5 documents. We should only resolve the future when len(docs) == 5.

Suggested change
if not five_docs_future.done():
five_docs_future.set_result(True)
if len(docs) == 5 and not five_docs_future.done():
five_docs_future.set_result(True)

except Exception as e:
on_snapshot.failed = e
if not five_docs_future.done():
five_docs_future.set_exception(e)

on_snapshot.called_count = 0
on_snapshot.last_doc_count = 0
on_snapshot.failed = None
query_ref.on_snapshot(on_snapshot)

# TODO(https://github.com/googleapis/google-cloud-python/issues/17428):
# Investigate why these sleep/polling delays are needed for listener tests.
# Having arbitrary delays is fragile and can lead to flakiness.
# Explore event-driven synchronization.
sleep(0.2)

doc_ref1.set({"first": "Ada", "last": "Lovelace", "born": 1815})
cleanup(doc_ref1.delete)

Expand All @@ -2839,18 +2812,8 @@ def on_snapshot(docs, changes, read_time):
doc_ref5.set({"first": "Ada", "last": "lovelace", "born": 1815})
cleanup(doc_ref5.delete)

for _ in range(50):
if on_snapshot.last_doc_count == 5:
break
sleep(0.2)

if on_snapshot.failed:
raise on_snapshot.failed

if on_snapshot.last_doc_count != 5:
raise AssertionError(
"5 docs expected in snapshot method " + str(on_snapshot.last_doc_count)
)
# Wait for the future to be resolved
five_docs_future.result(timeout=60.0)


@pytest.mark.parametrize("database", TEST_DATABASES_W_ENTERPRISE, indirect=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ async def cleanup():
operations = []
yield operations.append

for operation in operations:
await operation()
if operations:
await asyncio.gather(*[operation() for operation in operations])


@pytest.fixture
Expand Down
Loading