Skip to content

Commit 7a8102f

Browse files
authored
Merge branch 'master' into koan/fix-issue-1567
2 parents ead2b2d + 4ffba87 commit 7a8102f

5 files changed

Lines changed: 175 additions & 35 deletions

File tree

poetry.lock

Lines changed: 29 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ cython = "^3.2.4"
9191
setuptools = ">=65.6.3,<83.0.0"
9292
pytest-timeout = "^2.1.0"
9393
pytest-codspeed = ">=5.0.2,<6.0"
94+
blockbuster = ">=1.5.5,<2.0.0"
9495

9596
[tool.poetry.group.docs.dependencies]
9697
sphinx = "^7.4.7 || ^8.1.3"

tests/conftest.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from __future__ import annotations
44

55
import threading
6-
from collections.abc import AsyncGenerator, Generator
6+
from collections.abc import AsyncGenerator, Generator, Iterator
77
from unittest.mock import patch
88

99
import pytest
@@ -15,6 +15,64 @@
1515
from zeroconf._services import info as service_info
1616
from zeroconf.asyncio import AsyncZeroconf
1717

18+
try:
19+
from blockbuster import BlockBuster, blockbuster_ctx
20+
except ImportError: # platforms without blockbuster (e.g. PyPy under QEMU)
21+
BlockBuster = None # type: ignore[assignment,misc]
22+
blockbuster_ctx = None # type: ignore[assignment]
23+
24+
_BENCHMARKS_DIR = "tests/benchmarks"
25+
26+
# Tests that perform sync IO inside the asyncio event loop and trip
27+
# blockbuster. Marked xfail (strict=False) so CI stays green; pop
28+
# entries as the underlying blocking calls get fixed. Most of the
29+
# `test_async_service_registration*` and `test_async_tasks` entries
30+
# share a single root cause: `Zeroconf.async_close()` -> ... ->
31+
# `ServiceBrowser.cancel()` calls `Thread.join()` to drain the
32+
# dedicated browser thread, and on Python 3.10-3.12 the thread is
33+
# still alive when the join happens. `test_use_asyncio_false_*` is
34+
# by design (sync bootstrap when `use_asyncio=False` is requested from
35+
# inside a running loop); `test_run_coro_with_timeout` exercises the
36+
# sync-from-thread bridge intentionally. The strict=False marker keeps
37+
# the suite green on the Python versions where the race resolves the
38+
# other way.
39+
_KNOWN_BLOCKING: frozenset[str] = frozenset(
40+
{
41+
"tests/test_asyncio.py::test_async_service_registration",
42+
"tests/test_asyncio.py::test_async_service_registration_with_server_missing",
43+
"tests/test_asyncio.py::test_async_service_registration_same_server_different_ports",
44+
"tests/test_asyncio.py::test_async_service_registration_same_server_same_ports",
45+
"tests/test_asyncio.py::test_async_tasks",
46+
"tests/test_core.py::Framework::test_use_asyncio_false_forces_thread_when_loop_running",
47+
"tests/utils/test_asyncio.py::test_run_coro_with_timeout",
48+
}
49+
)
50+
51+
52+
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
53+
"""Mark known-blocking tests xfail so blockbuster doesn't fail the suite."""
54+
if blockbuster_ctx is None:
55+
return
56+
marker = pytest.mark.xfail(
57+
reason="blockbuster: blocking call in asyncio path",
58+
strict=False,
59+
)
60+
for item in items:
61+
if item.nodeid in _KNOWN_BLOCKING:
62+
item.add_marker(marker)
63+
64+
65+
@pytest.fixture(autouse=True)
66+
def blockbuster(
67+
request: pytest.FixtureRequest,
68+
) -> Iterator[BlockBuster | None]:
69+
"""Fail any test that performs a blocking call inside the asyncio loop."""
70+
if blockbuster_ctx is None or _BENCHMARKS_DIR in str(request.node.fspath):
71+
yield None
72+
return
73+
with blockbuster_ctx() as bb:
74+
yield bb
75+
1876

1977
@pytest.fixture(autouse=True)
2078
def verify_threads_ended():
@@ -103,6 +161,33 @@ def quick_timing() -> Generator[None]:
103161
yield
104162

105163

164+
@pytest.fixture
165+
def quick_aggregation_timing() -> Generator[None]:
166+
"""Scale multicast aggregation / network-protection delays 10x for tests.
167+
168+
The aggregation tests in `tests/test_handlers.py` verify timing-
169+
dependent behaviour of `MulticastOutgoingQueue`: aggregation window,
170+
network protection (~1s), and protected aggregation. The behaviour
171+
under test is a ratio of these constants — the exact wall-clock
172+
values are not the contract — so scaling them down and the test
173+
sleeps in lock-step preserves what is tested while dropping each
174+
test from ~3s to ~0.3s.
175+
176+
The patches must be in place before `AsyncZeroconf(...)` is
177+
constructed because `MulticastOutgoingQueue` reads the constants at
178+
init time and stashes them on the instance. The per-queue
179+
`_multicast_delay_random_min` / `_max` jitter (1-5ms here) can
180+
still be set on the queue instance after construction by the test
181+
itself — those slots are `cdef public` in the .pxd.
182+
"""
183+
with (
184+
patch.object(_core, "_AGGREGATION_DELAY", 50),
185+
patch.object(_core, "_PROTECTED_AGGREGATION_DELAY", 20),
186+
patch.object(_core, "_ONE_SECOND", 100),
187+
):
188+
yield
189+
190+
106191
@pytest.fixture
107192
def quick_request_timing() -> Generator[None]:
108193
"""Shorten the initial-query delay used by AsyncServiceInfo.async_request.

tests/test_asyncio.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444

4545
from . import (
4646
LOOPBACK_FIND_TIMEOUT,
47-
QUICK_REQUEST_TIMEOUT_MS,
4847
QuestionHistoryWithoutSuppression,
4948
_clear_cache,
5049
has_working_ipv6,
@@ -1191,10 +1190,12 @@ def send(out, addr=const._MDNS_ADDR, port=const._MDNS_PORT):
11911190
with patch.object(zeroconf_info, "async_send", send):
11921191
aiosinfo = AsyncServiceInfo(type_, registration_name)
11931192
# Patch _is_complete so we send multiple times. Under
1194-
# `quick_request_timing` both the QU query at 0ms and the QM
1195-
# query at ~15ms land well inside QUICK_REQUEST_TIMEOUT_MS.
1193+
# `quick_request_timing` the QU query fires at 0ms and the QM
1194+
# follow-up at ~11-15ms (10ms _LISTENER_TIME + 1-5ms jitter);
1195+
# 300ms absorbs macOS short-sleep quantization so the QM wake
1196+
# lands before the loop times out.
11961197
with patch("zeroconf.asyncio.AsyncServiceInfo._is_complete", False):
1197-
await aiosinfo.async_request(aiozc.zeroconf, QUICK_REQUEST_TIMEOUT_MS)
1198+
await aiosinfo.async_request(aiozc.zeroconf, 300)
11981199
try:
11991200
assert first_outgoing.questions[0].unicast is True # type: ignore[union-attr]
12001201
assert second_outgoing.questions[0].unicast is False # type: ignore[attr-defined]

tests/test_handlers.py

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,14 +1601,23 @@ async def test_duplicate_goodbye_answers_in_packet():
16011601

16021602

16031603
@pytest.mark.asyncio
1604-
async def test_response_aggregation_timings(run_isolated):
1605-
"""Verify multicast responses are aggregated."""
1604+
@pytest.mark.usefixtures("quick_aggregation_timing")
1605+
async def test_response_aggregation_timings(run_isolated: None) -> None:
1606+
"""Verify multicast responses are aggregated.
1607+
1608+
Aggregation / network-protection constants are scaled 10x by
1609+
``quick_aggregation_timing``; the asserted ratios are unchanged
1610+
but each phase finishes in ~1/10 the wall time.
1611+
"""
16061612
type_ = "_mservice._tcp.local."
16071613
type_2 = "_mservice2._tcp.local."
16081614
type_3 = "_mservice3._tcp.local."
16091615

16101616
aiozc = AsyncZeroconf(interfaces=["127.0.0.1"])
16111617
await aiozc.zeroconf.async_wait_for_start()
1618+
for queue in (aiozc.zeroconf.out_queue, aiozc.zeroconf.out_delay_queue):
1619+
queue._multicast_delay_random_min = 1
1620+
queue._multicast_delay_random_max = 5
16121621

16131622
name = "xxxyyy"
16141623
registration_name = f"{name}.{type_}"
@@ -1673,9 +1682,10 @@ async def test_response_aggregation_timings(run_isolated):
16731682
protocol.datagram_received(query.packets()[0], ("127.0.0.1", const._MDNS_PORT))
16741683
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
16751684
protocol.datagram_received(query.packets()[0], ("127.0.0.1", const._MDNS_PORT))
1676-
await asyncio.sleep(0.7)
1685+
await asyncio.sleep(0.07)
16771686

1678-
# Should aggregate into a single answer with up to a 500ms + 120ms delay
1687+
# Should aggregate into a single answer with up to a 50ms + 5ms delay
1688+
# (scaled from 500ms + 120ms by `quick_aggregation_timing`).
16791689
calls = send_mock.mock_calls
16801690
assert len(calls) == 1
16811691
outgoing = send_mock.call_args[0][0]
@@ -1686,10 +1696,10 @@ async def test_response_aggregation_timings(run_isolated):
16861696
send_mock.reset_mock()
16871697

16881698
protocol.datagram_received(query3.packets()[0], ("127.0.0.1", const._MDNS_PORT))
1689-
await asyncio.sleep(0.3)
1699+
await asyncio.sleep(0.03)
16901700

1691-
# Should send within 120ms since there are no other
1692-
# answers to aggregate with
1701+
# Should send within 12ms (scaled max random delay) since there are
1702+
# no other answers to aggregate with.
16931703
calls = send_mock.mock_calls
16941704
assert len(calls) == 1
16951705
outgoing = send_mock.call_args[0][0]
@@ -1698,21 +1708,21 @@ async def test_response_aggregation_timings(run_isolated):
16981708
assert info3.dns_pointer() in incoming.answers()
16991709
send_mock.reset_mock()
17001710

1701-
# Because the response was sent in the last second we need to make
1702-
# sure the next answer is delayed at least a second
1711+
# Because the response was sent in the last 100ms (scaled 1s) we
1712+
# need to make sure the next answer is delayed at least that long.
17031713
aiozc.zeroconf.engine.protocols[0].datagram_received(
17041714
query4.packets()[0], ("127.0.0.1", const._MDNS_PORT)
17051715
)
1706-
await asyncio.sleep(0.5)
1716+
await asyncio.sleep(0.05)
17071717

1708-
# After 0.5 seconds it should not have been sent
1718+
# After 50ms it should not have been sent.
17091719
# Protect the network against excessive packet flooding
17101720
# https://datatracker.ietf.org/doc/html/rfc6762#section-14
17111721
calls = send_mock.mock_calls
17121722
assert len(calls) == 0
17131723
send_mock.reset_mock()
17141724

1715-
await asyncio.sleep(1.2)
1725+
await asyncio.sleep(0.12)
17161726
calls = send_mock.mock_calls
17171727
assert len(calls) == 1
17181728
outgoing = send_mock.call_args[0][0]
@@ -1723,14 +1733,30 @@ async def test_response_aggregation_timings(run_isolated):
17231733

17241734

17251735
@pytest.mark.asyncio
1726-
async def test_response_aggregation_timings_multiple(run_isolated, disable_duplicate_packet_suppression):
1727-
"""Verify multicast responses that are aggregated do not take longer than 620ms to send.
1728-
1729-
620ms is the maximum random delay of 120ms and 500ms additional for aggregation."""
1736+
@pytest.mark.usefixtures("quick_aggregation_timing")
1737+
async def test_response_aggregation_timings_multiple(
1738+
run_isolated: None, disable_duplicate_packet_suppression: None
1739+
) -> None:
1740+
"""Verify multicast responses that are aggregated do not take longer than 62ms to send.
1741+
1742+
Aggregation / network-protection constants are scaled 10x by
1743+
``quick_aggregation_timing`` (500ms→50ms, 200ms→20ms, 1000ms→100ms)
1744+
and the per-queue jitter is set to 1-5ms below. The asserted
1745+
ratios are the same as the production behaviour the test pins —
1746+
aggregation window, network protection, protected aggregation —
1747+
only the absolute durations are scaled.
1748+
"""
17301749
type_2 = "_mservice2._tcp.local."
17311750

17321751
aiozc = AsyncZeroconf(interfaces=["127.0.0.1"])
17331752
await aiozc.zeroconf.async_wait_for_start()
1753+
# Scale the queues' random jitter to match the 10x scaled
1754+
# additional / aggregation delays; without this, the 20-120ms
1755+
# jitter would dominate the scaled window and make timing assertions
1756+
# unreliable.
1757+
for queue in (aiozc.zeroconf.out_queue, aiozc.zeroconf.out_delay_queue):
1758+
queue._multicast_delay_random_min = 1
1759+
queue._multicast_delay_random_max = 5
17341760

17351761
name = "xxxyyy"
17361762
registration_name2 = f"{name}.{type_2}"
@@ -1760,7 +1786,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
17601786
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
17611787
protocol.last_time = 0 # manually reset to avoid duplicate packet suppression
17621788
protocol._recent_packets.clear()
1763-
await asyncio.sleep(0.2)
1789+
await asyncio.sleep(0.02)
17641790
calls = send_mock.mock_calls
17651791
assert len(calls) == 1
17661792
outgoing = send_mock.call_args[0][0]
@@ -1772,7 +1798,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
17721798
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
17731799
protocol.last_time = 0 # manually reset to avoid duplicate packet suppression
17741800
protocol._recent_packets.clear()
1775-
await asyncio.sleep(1.2)
1801+
await asyncio.sleep(0.12)
17761802
calls = send_mock.mock_calls
17771803
assert len(calls) == 1
17781804
outgoing = send_mock.call_args[0][0]
@@ -1787,19 +1813,19 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
17871813
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
17881814
protocol.last_time = 0 # manually reset to avoid duplicate packet suppression
17891815
protocol._recent_packets.clear()
1790-
# The minimum protected send_after is 1000ms + 20ms random; sleep
1791-
# well under that so coarse timers on slow runners cannot push the
1792-
# send into this window and flake the assertion.
1793-
await asyncio.sleep(0.5)
1816+
# Scaled: minimum protected send_after is 100ms + 1-5ms random;
1817+
# sleep well under that so coarse timers on slow runners cannot
1818+
# push the send into this window and flake the assertion.
1819+
await asyncio.sleep(0.05)
17941820
calls = send_mock.mock_calls
17951821
assert len(calls) == 0
17961822

1797-
# 1000ms (1s network protection delays)
1798-
# - 500ms (already slept)
1799-
# + 120ms (maximum random delay)
1800-
# + 200ms (maximum protected aggregation delay)
1801-
# + 20ms (execution time)
1802-
await asyncio.sleep(millis_to_seconds(1000 - 500 + 120 + 200 + 20))
1823+
# 100ms (scaled 1s network protection)
1824+
# - 50ms (already slept)
1825+
# + 5ms (scaled maximum random delay)
1826+
# + 20ms (scaled protected aggregation delay)
1827+
# + 5ms (execution slack)
1828+
await asyncio.sleep(millis_to_seconds(100 - 50 + 5 + 20 + 5))
18031829
calls = send_mock.mock_calls
18041830
assert len(calls) == 1
18051831
outgoing = send_mock.call_args[0][0]

0 commit comments

Comments
 (0)