Skip to content

Commit 9569654

Browse files
committed
fix: bound duplicate-packet dedup against alternating-payload floods
Replace the single-slot remembered "last packet" with a bounded recency window (16 most-recent payloads) keyed on the payload bytes, per listener (per-interface). An attacker alternating between two byte-distinct payloads previously slipped past dedup on every packet because each one differed from the immediately-preceding payload, forcing every datagram through DNSIncoming parse and the deferred queue. Per-listener state means a duplicate seen on one interface does not suppress a legitimate QU / unicast reply that arrives on another. Fix the existing disable_duplicate_packet_suppression fixture to also patch _listener — the previous patch only touched const, so the interval rebound at _listener module scope was never overridden; the fixture worked by accident when the alternating-bypass kicked in.
1 parent 0e201f7 commit 9569654

6 files changed

Lines changed: 151 additions & 44 deletions

File tree

src/zeroconf/_listener.pxd

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ cdef bint TYPE_CHECKING
1414

1515
cdef cython.uint _MAX_MSG_ABSOLUTE
1616
cdef cython.uint _DUPLICATE_PACKET_SUPPRESSION_INTERVAL
17+
cdef cython.uint _RECENT_PACKETS_MAX
1718

1819

1920
cdef class AsyncListener:
@@ -22,18 +23,16 @@ cdef class AsyncListener:
2223
cdef ServiceRegistry _registry
2324
cdef RecordManager _record_manager
2425
cdef QueryHandler _query_handler
25-
cdef public cython.bytes data
26-
cdef public double last_time
27-
cdef public DNSIncoming last_message
2826
cdef public object transport
2927
cdef public object sock_description
3028
cdef public cython.dict _deferred
3129
cdef public cython.dict _timers
30+
cdef public cython.dict _recent_packets
3231

3332
@cython.locals(now=double, debug=cython.bint)
3433
cpdef datagram_received(self, cython.bytes bytes, cython.tuple addrs)
3534

36-
@cython.locals(msg=DNSIncoming)
35+
@cython.locals(msg=DNSIncoming, recent_packets=cython.dict, recent=cython.tuple)
3736
cpdef _process_datagram_at_time(self, bint debug, cython.uint data_len, double now, bytes data, cython.tuple addrs)
3837

3938
cdef _cancel_any_timers_for_addr(self, object addr)

src/zeroconf/_listener.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@
3939

4040
_TC_DELAY_RANDOM_INTERVAL = (400, 500)
4141

42+
# Bounded recency window so an alternating (A, B, A, B, ...) flood can't
43+
# slip past single-slot dedup. State lives on the listener (one per
44+
# interface) so a duplicate seen on one interface does not suppress a
45+
# legitimate QU / unicast reply that arrives on a different interface.
46+
_RECENT_PACKETS_MAX = 16
47+
4248

4349
_bytes = bytes
4450
_str = str
@@ -59,12 +65,10 @@ class AsyncListener:
5965
__slots__ = (
6066
"_deferred",
6167
"_query_handler",
68+
"_recent_packets",
6269
"_record_manager",
6370
"_registry",
6471
"_timers",
65-
"data",
66-
"last_message",
67-
"last_time",
6872
"sock_description",
6973
"transport",
7074
"zc",
@@ -75,13 +79,13 @@ def __init__(self, zc: Zeroconf) -> None:
7579
self._registry = zc.registry
7680
self._record_manager = zc.record_manager
7781
self._query_handler = zc.query_handler
78-
self.data: bytes | None = None
79-
self.last_time: float = 0
80-
self.last_message: DNSIncoming | None = None
8182
self.transport: _WrappedTransport | None = None
8283
self.sock_description: str | None = None
8384
self._deferred: dict[str, list[DNSIncoming]] = {}
8485
self._timers: dict[str, asyncio.TimerHandle] = {}
86+
# data -> (arrival_time_ms, has_qu_question). Relies on dict
87+
# insertion order so the oldest entry is evicted first.
88+
self._recent_packets: dict[bytes, tuple[float, bool]] = {}
8589
super().__init__()
8690

8791
def datagram_received(self, data: _bytes, addrs: tuple[str, int] | tuple[str, int, int, int]) -> None:
@@ -110,11 +114,12 @@ def _process_datagram_at_time(
110114
data: _bytes,
111115
addrs: tuple[str, int] | tuple[str, int, int, int],
112116
) -> None:
117+
recent_packets = self._recent_packets
118+
recent = recent_packets.get(data)
113119
if (
114-
self.data == data
115-
and (now - _DUPLICATE_PACKET_SUPPRESSION_INTERVAL) < self.last_time
116-
and self.last_message is not None
117-
and not self.last_message.has_qu_question()
120+
recent is not None
121+
and (now - _DUPLICATE_PACKET_SUPPRESSION_INTERVAL) < recent[0]
122+
and not recent[1]
118123
):
119124
# Guard against duplicate packets
120125
if debug:
@@ -145,9 +150,13 @@ def _process_datagram_at_time(
145150
addr_port = (addr, port)
146151

147152
msg = DNSIncoming(data, addr_port, scope, now)
148-
self.data = data
149-
self.last_time = now
150-
self.last_message = msg
153+
# Move-to-end so a repeat payload refreshes its position and only
154+
# cold entries are evicted when the window is full.
155+
if data in recent_packets:
156+
del recent_packets[data]
157+
elif len(recent_packets) >= _RECENT_PACKETS_MAX:
158+
del recent_packets[next(iter(recent_packets))]
159+
recent_packets[data] = (now, msg.has_qu_question())
151160
if msg.valid is True:
152161
if debug:
153162
log.debug(

tests/conftest.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
import pytest
1010

11-
from zeroconf import _core, const
11+
from zeroconf import _core, _listener, const
1212
from zeroconf._handlers import query_handler
1313
from zeroconf._services import browser as service_browser
1414
from zeroconf._services import info as service_info
@@ -36,12 +36,13 @@ def run_isolated():
3636

3737
@pytest.fixture
3838
def disable_duplicate_packet_suppression():
39-
"""Disable duplicate packet suppress.
40-
41-
Some tests run too slowly because of the duplicate
42-
packet suppression.
43-
"""
44-
with patch.object(const, "_DUPLICATE_PACKET_SUPPRESSION_INTERVAL", 0):
39+
"""Disable duplicate packet suppression."""
40+
# _listener rebinds the interval at module scope, so const-only
41+
# patching does not reach the hot path.
42+
with (
43+
patch.object(const, "_DUPLICATE_PACKET_SUPPRESSION_INTERVAL", 0),
44+
patch.object(_listener, "_DUPLICATE_PACKET_SUPPRESSION_INTERVAL", 0),
45+
):
4546
yield
4647

4748

tests/test_asyncio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,7 @@ async def test_async_unregister_all_services(quick_timing: None) -> None:
901901

902902

903903
@pytest.mark.asyncio
904-
async def test_async_zeroconf_service_types(quick_timing: None) -> None:
904+
async def test_async_zeroconf_service_types(quick_timing: None, disable_duplicate_packet_suppression) -> None:
905905
type_ = "_test-srvc-type._tcp.local."
906906
name = "xxxyyy"
907907
registration_name = f"{name}.{type_}"

tests/test_handlers.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ def test_name_conflicts(self):
190190
zc.register_service(conflicting_info)
191191
zc.close()
192192

193-
@pytest.mark.usefixtures("quick_timing")
193+
@pytest.mark.usefixtures("quick_timing", "disable_duplicate_packet_suppression")
194194
def test_register_and_lookup_type_by_uppercase_name(self):
195195
# instantiate a zeroconf instance
196196
zc = Zeroconf(interfaces=["127.0.0.1"])
@@ -1758,7 +1758,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
17581758
with patch.object(aiozc.zeroconf, "async_send") as send_mock:
17591759
send_mock.reset_mock()
17601760
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
1761-
protocol.last_time = 0 # manually reset the last time to avoid duplicate packet suppression
1761+
protocol._recent_packets.clear() # manually reset to avoid duplicate packet suppression
17621762
await asyncio.sleep(0.2)
17631763
calls = send_mock.mock_calls
17641764
assert len(calls) == 1
@@ -1769,7 +1769,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
17691769

17701770
send_mock.reset_mock()
17711771
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
1772-
protocol.last_time = 0 # manually reset the last time to avoid duplicate packet suppression
1772+
protocol._recent_packets.clear() # manually reset to avoid duplicate packet suppression
17731773
await asyncio.sleep(1.2)
17741774
calls = send_mock.mock_calls
17751775
assert len(calls) == 1
@@ -1780,9 +1780,9 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
17801780

17811781
send_mock.reset_mock()
17821782
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
1783-
protocol.last_time = 0 # manually reset the last time to avoid duplicate packet suppression
1783+
protocol._recent_packets.clear() # manually reset to avoid duplicate packet suppression
17841784
protocol.datagram_received(query2.packets()[0], ("127.0.0.1", const._MDNS_PORT))
1785-
protocol.last_time = 0 # manually reset the last time to avoid duplicate packet suppression
1785+
protocol._recent_packets.clear() # manually reset to avoid duplicate packet suppression
17861786
# The minimum protected send_after is 1000ms + 20ms random; sleep
17871787
# well under that so coarse timers on slow runners cannot push the
17881788
# send into this window and flake the assertion.

tests/test_listener.py

Lines changed: 112 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -222,15 +222,16 @@ def handle_query_or_defer(
222222
_handle_query_or_defer.assert_called_once()
223223
_handle_query_or_defer.reset_mock()
224224

225-
# Now call with the different packet and handle_query_or_defer should fire
225+
# Replay the first packet — the recency window remembers more than
226+
# just the most recent payload, so this is a duplicate.
226227
listener._process_datagram_at_time(
227228
False,
228229
len(packet_with_qm_question),
229230
new_time,
230231
packet_with_qm_question,
231232
addrs,
232233
)
233-
_handle_query_or_defer.assert_called_once()
234+
_handle_query_or_defer.assert_not_called()
234235
_handle_query_or_defer.reset_mock()
235236

236237
# Now call with the different packet with qu question and handle_query_or_defer should fire
@@ -257,18 +258,8 @@ def handle_query_or_defer(
257258

258259
log.setLevel(logging.WARNING)
259260

260-
# Call with the QM packet again
261-
listener._process_datagram_at_time(
262-
False,
263-
len(packet_with_qm_question),
264-
new_time,
265-
packet_with_qm_question,
266-
addrs,
267-
)
268-
_handle_query_or_defer.assert_called_once()
269-
_handle_query_or_defer.reset_mock()
270-
271-
# Now call with the same packet again and handle_query_or_defer should not fire
261+
# Replay the QM packet with debug disabled — suppression must hold
262+
# off the debug-log path too.
272263
listener._process_datagram_at_time(
273264
False,
274265
len(packet_with_qm_question),
@@ -285,3 +276,110 @@ def handle_query_or_defer(
285276
_handle_query_or_defer.reset_mock()
286277

287278
zc.close()
279+
280+
281+
def test_guard_against_alternating_duplicate_packets():
282+
"""Alternating two distinct payloads must not bypass duplicate suppression."""
283+
zc = Zeroconf(interfaces=["127.0.0.1"])
284+
zc.registry.async_add(
285+
ServiceInfo(
286+
"_http._tcp.local.",
287+
"Test._http._tcp.local.",
288+
server="Test._http._tcp.local.",
289+
port=4,
290+
)
291+
)
292+
zc.question_history = QuestionHistoryWithoutSuppression()
293+
294+
class SubListener(_listener.AsyncListener):
295+
def handle_query_or_defer(
296+
self,
297+
msg: DNSIncoming,
298+
addr: str,
299+
port: int,
300+
transport: _engine._WrappedTransport,
301+
v6_flow_scope: tuple[()] | tuple[int, int] = (),
302+
) -> None:
303+
super().handle_query_or_defer(msg, addr, port, transport, v6_flow_scope)
304+
305+
listener = SubListener(zc)
306+
listener.transport = MagicMock()
307+
308+
query_a = r.DNSOutgoing(const._FLAGS_QR_QUERY, multicast=True)
309+
query_a.add_question(r.DNSQuestion("a._http._tcp.local.", const._TYPE_PTR, const._CLASS_IN))
310+
packet_a = query_a.packets()[0]
311+
312+
query_b = r.DNSOutgoing(const._FLAGS_QR_QUERY, multicast=True)
313+
query_b.add_question(r.DNSQuestion("b._http._tcp.local.", const._TYPE_PTR, const._CLASS_IN))
314+
packet_b = query_b.packets()[0]
315+
316+
assert packet_a != packet_b
317+
318+
addrs = ("1.2.3.4", 43)
319+
320+
with patch.object(listener, "handle_query_or_defer") as _handle_query_or_defer:
321+
now = current_time_millis()
322+
323+
# Prime both payloads.
324+
listener._process_datagram_at_time(False, len(packet_a), now, packet_a, addrs)
325+
listener._process_datagram_at_time(False, len(packet_b), now, packet_b, addrs)
326+
assert _handle_query_or_defer.call_count == 2
327+
_handle_query_or_defer.reset_mock()
328+
329+
for _ in range(4):
330+
listener._process_datagram_at_time(False, len(packet_a), now, packet_a, addrs)
331+
listener._process_datagram_at_time(False, len(packet_b), now, packet_b, addrs)
332+
_handle_query_or_defer.assert_not_called()
333+
334+
zc.close()
335+
336+
337+
def test_recent_packets_window_is_bounded():
338+
"""Distinct payloads beyond the recency window evict oldest entries."""
339+
zc = Zeroconf(interfaces=["127.0.0.1"])
340+
zc.registry.async_add(
341+
ServiceInfo(
342+
"_http._tcp.local.",
343+
"Test._http._tcp.local.",
344+
server="Test._http._tcp.local.",
345+
port=4,
346+
)
347+
)
348+
zc.question_history = QuestionHistoryWithoutSuppression()
349+
350+
class SubListener(_listener.AsyncListener):
351+
def handle_query_or_defer(
352+
self,
353+
msg: DNSIncoming,
354+
addr: str,
355+
port: int,
356+
transport: _engine._WrappedTransport,
357+
v6_flow_scope: tuple[()] | tuple[int, int] = (),
358+
) -> None:
359+
super().handle_query_or_defer(msg, addr, port, transport, v6_flow_scope)
360+
361+
listener = SubListener(zc)
362+
listener.transport = MagicMock()
363+
364+
addrs = ("1.2.3.4", 43)
365+
now = current_time_millis()
366+
367+
packets = []
368+
for i in range(_listener._RECENT_PACKETS_MAX + 4):
369+
query = r.DNSOutgoing(const._FLAGS_QR_QUERY, multicast=True)
370+
query.add_question(r.DNSQuestion(f"n{i}._http._tcp.local.", const._TYPE_PTR, const._CLASS_IN))
371+
packets.append(query.packets()[0])
372+
373+
with patch.object(listener, "handle_query_or_defer") as _handle_query_or_defer:
374+
for packet in packets:
375+
listener._process_datagram_at_time(False, len(packet), now, packet, addrs)
376+
assert _handle_query_or_defer.call_count == len(packets)
377+
_handle_query_or_defer.reset_mock()
378+
379+
# The oldest packets should have been evicted and now replay.
380+
evicted = packets[: len(packets) - _listener._RECENT_PACKETS_MAX]
381+
for packet in evicted:
382+
listener._process_datagram_at_time(False, len(packet), now, packet, addrs)
383+
assert _handle_query_or_defer.call_count == len(evicted)
384+
385+
zc.close()

0 commit comments

Comments
 (0)