Skip to content

Commit 541fd8b

Browse files
committed
fix: bound TC-deferral assembly window to first-arrival + max delay
A peer streaming truncated queries faster than the 400–500 ms random delay used to rearm `_timers[addr]` could keep extending the flush deadline indefinitely, so the per-addr deferred list never drained (RFC 6762 §18.5 expects the window to be a fixed reassembly budget starting at first arrival, not a sliding heartbeat). Track first-arrival in `_deferred_deadlines` and cap each scheduled fire time at `first_arrival + _TC_DELAY_RANDOM_INTERVAL[1]`. If the next packet would push the timer past that deadline, leave the existing TimerHandle in place so the queue flushes on schedule.
1 parent 31194a3 commit 541fd8b

3 files changed

Lines changed: 65 additions & 3 deletions

File tree

src/zeroconf/_listener.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ cdef class AsyncListener:
2929
cdef public object sock_description
3030
cdef public cython.dict _deferred
3131
cdef public cython.dict _timers
32+
cdef public cython.dict _deferred_deadlines
3233

3334
@cython.locals(now=double, debug=cython.bint)
3435
cpdef datagram_received(self, cython.bytes bytes, cython.tuple addrs)
@@ -38,7 +39,7 @@ cdef class AsyncListener:
3839

3940
cdef _cancel_any_timers_for_addr(self, object addr)
4041

41-
@cython.locals(incoming=DNSIncoming, deferred=list)
42+
@cython.locals(incoming=DNSIncoming, deferred=list, now=double, delay=double, deadline=object, fire_at=double)
4243
cpdef handle_query_or_defer(
4344
self,
4445
DNSIncoming msg,

src/zeroconf/_listener.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class AsyncListener:
5858

5959
__slots__ = (
6060
"_deferred",
61+
"_deferred_deadlines",
6162
"_query_handler",
6263
"_record_manager",
6364
"_registry",
@@ -82,6 +83,7 @@ def __init__(self, zc: Zeroconf) -> None:
8283
self.sock_description: str | None = None
8384
self._deferred: dict[str, list[DNSIncoming]] = {}
8485
self._timers: dict[str, asyncio.TimerHandle] = {}
86+
self._deferred_deadlines: dict[str, float] = {}
8587
super().__init__()
8688

8789
def datagram_received(self, data: _bytes, addrs: tuple[str, int] | tuple[str, int, int, int]) -> None:
@@ -203,12 +205,25 @@ def handle_query_or_defer(
203205
if incoming.data == msg.data:
204206
return
205207
deferred.append(msg)
206-
delay = millis_to_seconds(random.randint(*_TC_DELAY_RANDOM_INTERVAL)) # noqa: S311
207208
loop = self.zc.loop
208209
assert loop is not None
210+
now = loop.time()
211+
delay = millis_to_seconds(random.randint(*_TC_DELAY_RANDOM_INTERVAL)) # noqa: S311
212+
# Bound the assembly window to first_arrival + max delay so a peer
213+
# streaming TC packets cannot keep deferring the flush indefinitely.
214+
deadline = self._deferred_deadlines.get(addr)
215+
if deadline is None:
216+
deadline = now + millis_to_seconds(_TC_DELAY_RANDOM_INTERVAL[1])
217+
self._deferred_deadlines[addr] = deadline
218+
fire_at = now + delay
219+
if fire_at >= deadline:
220+
# Existing timer (if any) already fires at or before the deadline.
221+
if addr in self._timers:
222+
return
223+
fire_at = deadline
209224
self._cancel_any_timers_for_addr(addr)
210225
self._timers[addr] = loop.call_at(
211-
loop.time() + delay,
226+
fire_at,
212227
self._respond_query,
213228
None,
214229
addr,
@@ -232,6 +247,7 @@ def _respond_query(
232247
) -> None:
233248
"""Respond to a query and reassemble any truncated deferred packets."""
234249
self._cancel_any_timers_for_addr(addr)
250+
self._deferred_deadlines.pop(addr, None)
235251
packets = self._deferred.pop(addr, [])
236252
if msg:
237253
packets.append(msg)

tests/test_core.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,51 @@ def test_tc_bit_defers_last_response_missing():
743743
zc.close()
744744

745745

746+
def test_tc_bit_defer_window_is_bounded():
747+
"""TC-deferral assembly window must not slide past first_arrival + max delay."""
748+
zc = Zeroconf(interfaces=["127.0.0.1"])
749+
_wait_for_start(zc)
750+
type_ = "_boundeddefer._tcp.local."
751+
registration_name = f"knownname.{type_}"
752+
753+
info = r.ServiceInfo(
754+
type_,
755+
registration_name,
756+
80,
757+
0,
758+
0,
759+
{"path": "/~paulsm/"},
760+
"ash-2.local.",
761+
addresses=[socket.inet_aton("10.0.1.2")],
762+
)
763+
zc.registry.async_add(info)
764+
765+
protocol = zc.engine.protocols[0]
766+
now_ms = r.current_time_millis()
767+
_clear_cache(zc)
768+
source_ip = "203.0.113.99"
769+
770+
generated = r.DNSOutgoing(const._FLAGS_QR_QUERY)
771+
generated.add_question(r.DNSQuestion(type_, const._TYPE_PTR, const._CLASS_IN))
772+
for _ in range(300):
773+
generated.add_answer_at_time(info.dns_pointer(), now_ms)
774+
packets = generated.packets()
775+
assert len(packets) >= 3
776+
777+
# Pin the per-packet delay at its maximum so any subsequent reset would
778+
# land past the deadline established by the first packet.
779+
with patch("zeroconf._listener.random.randint", return_value=500):
780+
threadsafe_query(zc, protocol, r.DNSIncoming(packets[0]), source_ip, const._MDNS_PORT, Mock(), ())
781+
first_when = protocol._timers[source_ip].when()
782+
783+
for raw in packets[1:-1]:
784+
threadsafe_query(zc, protocol, r.DNSIncoming(raw), source_ip, const._MDNS_PORT, Mock(), ())
785+
assert protocol._timers[source_ip].when() <= first_when
786+
787+
zc.registry.async_remove(info)
788+
zc.close()
789+
790+
746791
@pytest.mark.asyncio
747792
async def test_open_close_twice_from_async() -> None:
748793
"""Test we can close twice from a coroutine when using Zeroconf.

0 commit comments

Comments
 (0)