Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/zeroconf/_listener.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ cdef object DEBUG_ENABLED
cdef bint TYPE_CHECKING

cdef cython.uint _MAX_MSG_ABSOLUTE
cdef cython.uint _DUPLICATE_PACKET_SUPPRESSION_INTERVAL
cdef cython.uint _RECENT_PACKETS_MAX

Comment on lines 14 to 17

cdef class AsyncListener:
Expand All @@ -22,18 +22,16 @@ cdef class AsyncListener:
cdef ServiceRegistry _registry
cdef RecordManager _record_manager
cdef QueryHandler _query_handler
cdef public cython.bytes data
cdef public double last_time
cdef public DNSIncoming last_message
cdef public object transport
cdef public object sock_description
cdef public cython.dict _deferred
cdef public cython.dict _timers
cdef public cython.dict _recent_packets

@cython.locals(now=double, debug=cython.bint)
cpdef datagram_received(self, cython.bytes bytes, cython.tuple addrs)

@cython.locals(msg=DNSIncoming)
@cython.locals(msg=DNSIncoming, recent_packets=cython.dict, recent=cython.tuple, was_present=cython.bint)
cpdef _process_datagram_at_time(self, bint debug, cython.uint data_len, double now, bytes data, cython.tuple addrs)

cdef _cancel_any_timers_for_addr(self, object addr)
Expand Down
43 changes: 29 additions & 14 deletions src/zeroconf/_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@
from ._protocol.incoming import DNSIncoming
from ._transport import _WrappedTransport, make_wrapped_transport
from ._utils.time import current_time_millis, millis_to_seconds
from .const import _DUPLICATE_PACKET_SUPPRESSION_INTERVAL, _MAX_MSG_ABSOLUTE
from .const import _DUPLICATE_PACKET_SUPPRESSION_INTERVAL, _MAX_MSG_ABSOLUTE, _RECENT_PACKETS_MAX

if TYPE_CHECKING:
from ._core import Zeroconf

_TC_DELAY_RANDOM_INTERVAL = (400, 500)

# Bounded recency window (`_RECENT_PACKETS_MAX`, imported from `.const`)
# so an alternating (A, B, A, B, ...) flood can't slip past single-slot
# dedup. State lives on the listener (one per interface) so a duplicate
# seen on one interface does not suppress a legitimate QU / unicast
# reply that arrives on a different interface.


_bytes = bytes
_str = str
Expand All @@ -59,12 +65,10 @@ class AsyncListener:
__slots__ = (
"_deferred",
"_query_handler",
"_recent_packets",
"_record_manager",
"_registry",
"_timers",
"data",
"last_message",
"last_time",
"sock_description",
"transport",
"zc",
Expand All @@ -75,13 +79,13 @@ def __init__(self, zc: Zeroconf) -> None:
self._registry = zc.registry
self._record_manager = zc.record_manager
self._query_handler = zc.query_handler
self.data: bytes | None = None
self.last_time: float = 0
self.last_message: DNSIncoming | None = None
self.transport: _WrappedTransport | None = None
self.sock_description: str | None = None
self._deferred: dict[str, list[DNSIncoming]] = {}
self._timers: dict[str, asyncio.TimerHandle] = {}
# data -> (arrival_time_ms, has_qu_question). Relies on dict
# insertion order so the oldest entry is evicted first.
self._recent_packets: dict[bytes, tuple[float, bool]] = {}
super().__init__()

def datagram_received(self, data: _bytes, addrs: tuple[str, int] | tuple[str, int, int, int]) -> None:
Expand Down Expand Up @@ -110,12 +114,20 @@ def _process_datagram_at_time(
data: _bytes,
addrs: tuple[str, int] | tuple[str, int, int, int],
) -> None:
recent_packets = self._recent_packets
recent = recent_packets.get(data)
if (
self.data == data
and (now - _DUPLICATE_PACKET_SUPPRESSION_INTERVAL) < self.last_time
and self.last_message is not None
and not self.last_message.has_qu_question()
recent is not None
and (now - _DUPLICATE_PACKET_SUPPRESSION_INTERVAL) < recent[0]
and not recent[1]
):
# Refresh LRU position so an actively-replayed payload is not
# evicted by _RECENT_PACKETS_MAX distinct neighbours arriving
# within the suppression interval. Preserve the original
# (now, qu) tuple so the suppression window stays bounded
# from the first observation rather than extending forever.
del recent_packets[data]
recent_packets[data] = recent
# Guard against duplicate packets
if debug:
log.debug(
Expand Down Expand Up @@ -145,9 +157,12 @@ def _process_datagram_at_time(
addr_port = (addr, port)

msg = DNSIncoming(data, addr_port, scope, now)
self.data = data
self.last_time = now
self.last_message = msg
# Move-to-end so a repeat payload refreshes its position and only
# cold entries are evicted when the window is full.
was_present = recent_packets.pop(data, None) is not None
if not was_present and len(recent_packets) >= _RECENT_PACKETS_MAX:
del recent_packets[next(iter(recent_packets))]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is hot, you need to cythonize with -A and look at the html to see how bad

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do — running REQUIRE_CYTHON=1 poetry install --only=main,dev then cython -a src/zeroconf/_listener.py and inspecting the generated .html for yellow lines on the hot path (the dedup check at the top of _process_datagram_at_time and the move-to-end block at the bottom). Will also look at whether recent = recent_packets.get(data) with cython.locals(recent=cython.tuple) produces a clean typed access for recent[0] / recent[1], or whether we need an explicit cdef tuple recent declaration in the .pxd to skip the generic-object indexing path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to this — three specific spots to verify on the generated .html are worth more than the rest:

  1. The dedup check at L113-118. recent_packets.get(data) should compile to a direct PyDict_GetItem (the cdef'd _recent_packets should make the attribute access C-typed). The recent[0] / recent[1] indexing depends on whether recent=cython.tuple in @cython.locals is enough to get PyTuple_GET_ITEM — if not, you'll see yellow indexing through the generic object protocol and may need an explicit cdef tuple recent in the .pxd locals.

  2. The freshness compare. (now - _DUPLICATE_PACKET_SUPPRESSION_INTERVAL) < recent[0] — with the cdef on _DUPLICATE_PACKET_SUPPRESSION_INTERVAL removed from the .pxd, this now goes through PyLong_AsLong on every packet. Either re-cdef via the public + cdef alias pattern (so tests can still patch it) or accept the regression and capture the CodSpeed delta in the PR body so reviewers know the trade-off was deliberate.

  3. The eviction line. del recent_packets[next(iter(recent_packets))] allocates an iter each call when the window is full — worth checking if Cython can flatten that, or if it stays yellow. If yellow and CodSpeed regresses, a popitem on an OrderedDict or caching the head externally are options.

If any of those come out deep yellow on the hot path, paste the relevant block into the PR for a second look before landing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CodSpeed delta on the PR report (+57.04% improvement) is a strong signal that the dict-based approach beats the single-slot version net-net even with the move-to-end mutations on hit. But the cython -a audit is still worth getting on record before merge — three specific lines worth a close look in the generated .html:

  1. recent = recent_packets.get(data) — should be a direct PyDict_GetItem given self._recent_packets is cdef'd as cython.dict. Confirm no yellow.
  2. recent[0] / recent[1] in the dedup check — these depend on whether the @cython.locals(recent=cython.tuple) in the .pxd is sufficient to get PyTuple_GET_ITEM. If they show yellow, an explicit cdef tuple recent line in the .pxd will fix it.
  3. (now - _DUPLICATE_PACKET_SUPPRESSION_INTERVAL) < recent[0] — with the cdef'd unsigned int on the constant and double on now, this should compile to a clean C compare; verify no PyNumber_Subtract slow path.

For the move-to-end on hit (del recent_packets[data]; recent_packets[data] = recent), this is the per-duplicate hot line; verify both mutations stay on the PyDict_* C path. Worth pasting a screenshot of the relevant lines into the PR thread for future reference, since this is now load-bearing performance code.

recent_packets[data] = (now, msg.has_qu_question())
if msg.valid is True:
if debug:
log.debug(
Expand Down
1 change: 1 addition & 0 deletions src/zeroconf/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
_LISTENER_TIME = 200 # ms
_BROWSER_TIME = 10000 # ms
_DUPLICATE_PACKET_SUPPRESSION_INTERVAL = 1000 # ms
_RECENT_PACKETS_MAX = 16 # Bounded recency window for per-listener dedup
_DUPLICATE_QUESTION_INTERVAL = 999 # ms # Must be 1ms less than _DUPLICATE_PACKET_SUPPRESSION_INTERVAL
_CACHE_CLEANUP_INTERVAL = 10 # s
_LOADED_SYSTEM_TIMEOUT = 10 # s
Expand Down
17 changes: 9 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import pytest

from zeroconf import _core, const
from zeroconf import _core, _listener, const
from zeroconf._handlers import query_handler
from zeroconf._services import browser as service_browser
from zeroconf._services import info as service_info
Expand All @@ -35,13 +35,14 @@ def run_isolated():


@pytest.fixture
def disable_duplicate_packet_suppression():
"""Disable duplicate packet suppress.

Some tests run too slowly because of the duplicate
packet suppression.
"""
with patch.object(const, "_DUPLICATE_PACKET_SUPPRESSION_INTERVAL", 0):
def disable_duplicate_packet_suppression() -> Generator[None]:
"""Disable duplicate packet suppression."""
# _listener rebinds the interval at module scope, so const-only
# patching does not reach the hot path.
with (
patch.object(const, "_DUPLICATE_PACKET_SUPPRESSION_INTERVAL", 0),
patch.object(_listener, "_DUPLICATE_PACKET_SUPPRESSION_INTERVAL", 0),
):
yield


Expand Down
4 changes: 3 additions & 1 deletion tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,9 @@ async def test_async_unregister_all_services(quick_timing: None) -> None:


@pytest.mark.asyncio
async def test_async_zeroconf_service_types(quick_timing: None) -> None:
async def test_async_zeroconf_service_types(
quick_timing: None, disable_duplicate_packet_suppression: None
) -> None:
type_ = "_test-srvc-type._tcp.local."
name = "xxxyyy"
registration_name = f"{name}.{type_}"
Expand Down
10 changes: 5 additions & 5 deletions tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def test_name_conflicts(self):
zc.register_service(conflicting_info)
zc.close()

@pytest.mark.usefixtures("quick_timing")
@pytest.mark.usefixtures("quick_timing", "disable_duplicate_packet_suppression")
def test_register_and_lookup_type_by_uppercase_name(self):
# instantiate a zeroconf instance
zc = Zeroconf(interfaces=["127.0.0.1"])
Expand Down Expand Up @@ -1758,7 +1758,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
with patch.object(aiozc.zeroconf, "async_send") as send_mock:
send_mock.reset_mock()
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
protocol.last_time = 0 # manually reset the last time to avoid duplicate packet suppression
protocol._recent_packets.clear() # manually reset to avoid duplicate packet suppression
await asyncio.sleep(0.2)
calls = send_mock.mock_calls
assert len(calls) == 1
Expand All @@ -1769,7 +1769,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli

send_mock.reset_mock()
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
protocol.last_time = 0 # manually reset the last time to avoid duplicate packet suppression
protocol._recent_packets.clear() # manually reset to avoid duplicate packet suppression
await asyncio.sleep(1.2)
calls = send_mock.mock_calls
assert len(calls) == 1
Expand All @@ -1780,9 +1780,9 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli

send_mock.reset_mock()
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
protocol.last_time = 0 # manually reset the last time to avoid duplicate packet suppression
protocol._recent_packets.clear() # manually reset to avoid duplicate packet suppression
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
protocol.last_time = 0 # manually reset the last time to avoid duplicate packet suppression
protocol._recent_packets.clear() # manually reset to avoid duplicate packet suppression
# The minimum protected send_after is 1000ms + 20ms random; sleep
# well under that so coarse timers on slow runners cannot push the
# send into this window and flake the assertion.
Expand Down
126 changes: 112 additions & 14 deletions tests/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,16 @@ def handle_query_or_defer(
_handle_query_or_defer.assert_called_once()
_handle_query_or_defer.reset_mock()

# Now call with the different packet and handle_query_or_defer should fire
# Replay the first packet — the recency window remembers more than
# just the most recent payload, so this is a duplicate.
listener._process_datagram_at_time(
False,
len(packet_with_qm_question),
new_time,
packet_with_qm_question,
addrs,
)
_handle_query_or_defer.assert_called_once()
_handle_query_or_defer.assert_not_called()
_handle_query_or_defer.reset_mock()

# Now call with the different packet with qu question and handle_query_or_defer should fire
Expand All @@ -257,18 +258,8 @@ def handle_query_or_defer(

log.setLevel(logging.WARNING)

# Call with the QM packet again
listener._process_datagram_at_time(
False,
len(packet_with_qm_question),
new_time,
packet_with_qm_question,
addrs,
)
_handle_query_or_defer.assert_called_once()
_handle_query_or_defer.reset_mock()

# Now call with the same packet again and handle_query_or_defer should not fire
# Replay the QM packet with debug disabled — suppression must hold
# off the debug-log path too.
listener._process_datagram_at_time(
False,
len(packet_with_qm_question),
Expand All @@ -285,3 +276,110 @@ def handle_query_or_defer(
_handle_query_or_defer.reset_mock()

zc.close()


def test_guard_against_alternating_duplicate_packets() -> None:
"""Alternating two distinct payloads must not bypass duplicate suppression."""
zc = Zeroconf(interfaces=["127.0.0.1"])
zc.registry.async_add(
ServiceInfo(
"_http._tcp.local.",
"Test._http._tcp.local.",
server="Test._http._tcp.local.",
port=4,
)
)
zc.question_history = QuestionHistoryWithoutSuppression()

class SubListener(_listener.AsyncListener):
def handle_query_or_defer(
self,
msg: DNSIncoming,
addr: str,
port: int,
transport: _engine._WrappedTransport,
v6_flow_scope: tuple[()] | tuple[int, int] = (),
) -> None:
super().handle_query_or_defer(msg, addr, port, transport, v6_flow_scope)

listener = SubListener(zc)
listener.transport = MagicMock()

query_a = r.DNSOutgoing(const._FLAGS_QR_QUERY, multicast=True)
query_a.add_question(r.DNSQuestion("a._http._tcp.local.", const._TYPE_PTR, const._CLASS_IN))
packet_a = query_a.packets()[0]

query_b = r.DNSOutgoing(const._FLAGS_QR_QUERY, multicast=True)
query_b.add_question(r.DNSQuestion("b._http._tcp.local.", const._TYPE_PTR, const._CLASS_IN))
packet_b = query_b.packets()[0]

assert packet_a != packet_b

addrs = ("1.2.3.4", 43)

with patch.object(listener, "handle_query_or_defer") as _handle_query_or_defer:
now = current_time_millis()

# Prime both payloads.
listener._process_datagram_at_time(False, len(packet_a), now, packet_a, addrs)
listener._process_datagram_at_time(False, len(packet_b), now, packet_b, addrs)
assert _handle_query_or_defer.call_count == 2
_handle_query_or_defer.reset_mock()

for _ in range(4):
listener._process_datagram_at_time(False, len(packet_a), now, packet_a, addrs)
listener._process_datagram_at_time(False, len(packet_b), now, packet_b, addrs)
_handle_query_or_defer.assert_not_called()

zc.close()


def test_recent_packets_window_is_bounded() -> None:
"""Distinct payloads beyond the recency window evict oldest entries."""
zc = Zeroconf(interfaces=["127.0.0.1"])
zc.registry.async_add(
ServiceInfo(
"_http._tcp.local.",
"Test._http._tcp.local.",
server="Test._http._tcp.local.",
port=4,
)
)
zc.question_history = QuestionHistoryWithoutSuppression()

class SubListener(_listener.AsyncListener):
def handle_query_or_defer(
self,
msg: DNSIncoming,
addr: str,
port: int,
transport: _engine._WrappedTransport,
v6_flow_scope: tuple[()] | tuple[int, int] = (),
) -> None:
super().handle_query_or_defer(msg, addr, port, transport, v6_flow_scope)

listener = SubListener(zc)
listener.transport = MagicMock()

addrs = ("1.2.3.4", 43)
now = current_time_millis()

packets = []
for i in range(const._RECENT_PACKETS_MAX + 4):
query = r.DNSOutgoing(const._FLAGS_QR_QUERY, multicast=True)
query.add_question(r.DNSQuestion(f"n{i}._http._tcp.local.", const._TYPE_PTR, const._CLASS_IN))
packets.append(query.packets()[0])

with patch.object(listener, "handle_query_or_defer") as _handle_query_or_defer:
for packet in packets:
listener._process_datagram_at_time(False, len(packet), now, packet, addrs)
assert _handle_query_or_defer.call_count == len(packets)
_handle_query_or_defer.reset_mock()

# The oldest packets should have been evicted and now replay.
evicted = packets[: len(packets) - const._RECENT_PACKETS_MAX]
for packet in evicted:
listener._process_datagram_at_time(False, len(packet), now, packet, addrs)
assert _handle_query_or_defer.call_count == len(evicted)

zc.close()
Loading