Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,33 @@ def quick_timing() -> Generator[None]:
yield


@pytest.fixture
def quick_aggregation_timing() -> Generator[None]:
"""Scale multicast aggregation / network-protection delays 10x for tests.

The aggregation tests in `tests/test_handlers.py` verify timing-
dependent behaviour of `MulticastOutgoingQueue`: aggregation window,
network protection (~1s), and protected aggregation. The behaviour
under test is a ratio of these constants — the exact wall-clock
values are not the contract — so scaling them down and the test
sleeps in lock-step preserves what is tested while dropping each
test from ~3s to ~0.3s.

The patches must be in place before `AsyncZeroconf(...)` is
constructed because `MulticastOutgoingQueue` reads the constants at
init time and stashes them on the instance. The per-queue
`_multicast_delay_random_min` / `_max` jitter (1-5ms here) can
still be set on the queue instance after construction by the test
itself — those slots are `cdef public` in the .pxd.
"""
with (
patch.object(_core, "_AGGREGATION_DELAY", 50),
patch.object(_core, "_PROTECTED_AGGREGATION_DELAY", 20),
patch.object(_core, "_ONE_SECOND", 100),
):
yield


@pytest.fixture
def quick_request_timing() -> Generator[None]:
"""Shorten the initial-query delay used by AsyncServiceInfo.async_request.
Expand Down
82 changes: 54 additions & 28 deletions tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1601,14 +1601,23 @@ async def test_duplicate_goodbye_answers_in_packet():


@pytest.mark.asyncio
async def test_response_aggregation_timings(run_isolated):
"""Verify multicast responses are aggregated."""
@pytest.mark.usefixtures("quick_aggregation_timing")
async def test_response_aggregation_timings(run_isolated: None) -> None:
"""Verify multicast responses are aggregated.

Aggregation / network-protection constants are scaled 10x by
``quick_aggregation_timing``; the asserted ratios are unchanged
but each phase finishes in ~1/10 the wall time.
"""
type_ = "_mservice._tcp.local."
type_2 = "_mservice2._tcp.local."
type_3 = "_mservice3._tcp.local."

aiozc = AsyncZeroconf(interfaces=["127.0.0.1"])
await aiozc.zeroconf.async_wait_for_start()
for queue in (aiozc.zeroconf.out_queue, aiozc.zeroconf.out_delay_queue):
queue._multicast_delay_random_min = 1
queue._multicast_delay_random_max = 5

name = "xxxyyy"
registration_name = f"{name}.{type_}"
Expand Down Expand Up @@ -1673,9 +1682,10 @@ async def test_response_aggregation_timings(run_isolated):
protocol.datagram_received(query.packets()[0], ("127.0.0.1", const._MDNS_PORT))
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
protocol.datagram_received(query.packets()[0], ("127.0.0.1", const._MDNS_PORT))
await asyncio.sleep(0.7)
await asyncio.sleep(0.07)

# Should aggregate into a single answer with up to a 500ms + 120ms delay
# Should aggregate into a single answer with up to a 50ms + 5ms delay
# (scaled from 500ms + 120ms by `quick_aggregation_timing`).
calls = send_mock.mock_calls
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
Expand All @@ -1686,10 +1696,10 @@ async def test_response_aggregation_timings(run_isolated):
send_mock.reset_mock()

protocol.datagram_received(query3.packets()[0], ("127.0.0.1", const._MDNS_PORT))
await asyncio.sleep(0.3)
await asyncio.sleep(0.03)

# Should send within 120ms since there are no other
# answers to aggregate with
# Should send within 12ms (scaled max random delay) since there are
# no other answers to aggregate with.
calls = send_mock.mock_calls
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
Expand All @@ -1698,21 +1708,21 @@ async def test_response_aggregation_timings(run_isolated):
assert info3.dns_pointer() in incoming.answers()
send_mock.reset_mock()

# Because the response was sent in the last second we need to make
# sure the next answer is delayed at least a second
# Because the response was sent in the last 100ms (scaled 1s) we
# need to make sure the next answer is delayed at least that long.
aiozc.zeroconf.engine.protocols[0].datagram_received(
query4.packets()[0], ("127.0.0.1", const._MDNS_PORT)
)
await asyncio.sleep(0.5)
await asyncio.sleep(0.05)

# After 0.5 seconds it should not have been sent
# After 50ms it should not have been sent.
# Protect the network against excessive packet flooding
# https://datatracker.ietf.org/doc/html/rfc6762#section-14
calls = send_mock.mock_calls
assert len(calls) == 0
send_mock.reset_mock()

await asyncio.sleep(1.2)
await asyncio.sleep(0.12)
calls = send_mock.mock_calls
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
Expand All @@ -1723,14 +1733,30 @@ async def test_response_aggregation_timings(run_isolated):


@pytest.mark.asyncio
async def test_response_aggregation_timings_multiple(run_isolated, disable_duplicate_packet_suppression):
"""Verify multicast responses that are aggregated do not take longer than 620ms to send.

620ms is the maximum random delay of 120ms and 500ms additional for aggregation."""
@pytest.mark.usefixtures("quick_aggregation_timing")
async def test_response_aggregation_timings_multiple(
run_isolated: None, disable_duplicate_packet_suppression: None
) -> None:
"""Verify multicast responses that are aggregated do not take longer than 62ms to send.

Aggregation / network-protection constants are scaled 10x by
``quick_aggregation_timing`` (500ms→50ms, 200ms→20ms, 1000ms→100ms)
and the per-queue jitter is set to 1-5ms below. The asserted
ratios are the same as the production behaviour the test pins —
aggregation window, network protection, protected aggregation —
only the absolute durations are scaled.
"""
type_2 = "_mservice2._tcp.local."

aiozc = AsyncZeroconf(interfaces=["127.0.0.1"])
await aiozc.zeroconf.async_wait_for_start()
# Scale the queues' random jitter to match the 10x scaled
# additional / aggregation delays; without this, the 20-120ms
# jitter would dominate the scaled window and make timing assertions
# unreliable.
for queue in (aiozc.zeroconf.out_queue, aiozc.zeroconf.out_delay_queue):
queue._multicast_delay_random_min = 1
queue._multicast_delay_random_max = 5

name = "xxxyyy"
registration_name2 = f"{name}.{type_2}"
Expand Down Expand Up @@ -1760,7 +1786,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
protocol.last_time = 0 # manually reset to avoid duplicate packet suppression
protocol._recent_packets.clear()
await asyncio.sleep(0.2)
await asyncio.sleep(0.02)
calls = send_mock.mock_calls
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
Expand All @@ -1772,7 +1798,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
protocol.last_time = 0 # manually reset to avoid duplicate packet suppression
protocol._recent_packets.clear()
await asyncio.sleep(1.2)
await asyncio.sleep(0.12)
calls = send_mock.mock_calls
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
Expand All @@ -1787,19 +1813,19 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
protocol.last_time = 0 # manually reset to avoid duplicate packet suppression
protocol._recent_packets.clear()
# The minimum protected send_after is 1000ms + 20ms random; sleep
# well under that so coarse timers on slow runners cannot push the
# send into this window and flake the assertion.
await asyncio.sleep(0.5)
# Scaled: minimum protected send_after is 100ms + 1-5ms random;
# sleep well under that so coarse timers on slow runners cannot
# push the send into this window and flake the assertion.
await asyncio.sleep(0.05)
calls = send_mock.mock_calls
assert len(calls) == 0

# 1000ms (1s network protection delays)
# - 500ms (already slept)
# + 120ms (maximum random delay)
# + 200ms (maximum protected aggregation delay)
# + 20ms (execution time)
await asyncio.sleep(millis_to_seconds(1000 - 500 + 120 + 200 + 20))
# 100ms (scaled 1s network protection)
# - 50ms (already slept)
# + 5ms (scaled maximum random delay)
# + 20ms (scaled protected aggregation delay)
# + 5ms (execution slack)
await asyncio.sleep(millis_to_seconds(100 - 50 + 5 + 20 + 5))
calls = send_mock.mock_calls
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
Expand Down
Loading