Skip to content

Commit 7c6be63

Browse files
committed
fix: bound _expire_heap growth from re-adds and DRY test helpers
Address remaining review feedback on #1718. **Re-add heap growth (Copilot inline at L100, real bug separate from the orphan).** ``_async_add``'s cap check is gated on ``is_new``, but a peer that just replays cached records with a different ``created``/ TTL hits the ``is_new == False`` path: no cap check, no ``_total_records`` change — yet the changed ``when`` makes ``_expirations.get(record) != when`` true, so a fresh ``(when, record)`` tuple is heappushed while the prior tuple stays behind as stale. ``_expire_heap`` grows unbounded between cleanups even though ``cache`` and ``_total_records`` stay flat. At ~10k pps for 10 s that's ~1 M stale heap entries × ~100 B ≈ 100 MB transient. Factor the rebuild check (``len(heap) > _MIN_SCHEDULED_RECORD_EXPIRATION and len(heap) > 2 * len(expirations)`` → filter-and-heapify) into a new ``_maybe_rebuild_heap`` cdef helper. Call it from three sites: - ``_async_add`` immediately after the conditional heappush, so a re-add flood is bounded at every step (amortized O(1) per push, O(N) rebuild fires at most once per N stale pushes) - ``async_expire`` after the pop loop, replacing the existing inline rebuild — same behavior, slightly more aggressive (uses post-pop heap length instead of pre-pop), no test regression - (removed from ``_async_evict_oldest`` — the eager rebuild in ``_async_add`` keeps the heap bounded before eviction is ever triggered, so the eviction-side call was dead code) Cython codegen: the rebuild call from ``_async_add`` only fires when the conditional heappush already fired, so the hot path on duplicate re-adds (same ``when``) is unchanged. The check inside ``_maybe_rebuild_heap`` is two ``len()`` calls + an integer compare — score-0 in ``cython -a``. **Test DRY-up.** Introduce a module-level ``_addr(name, idx, ...)`` helper that builds the boilerplate ``DNSAddress(name, _TYPE_A, _CLASS_IN, ttl, bytes((idx & 0xFF, (idx >> 8) & 0xFF, 0, 1)), created =...)`` so each new test reads top-down instead of being dominated by constructor noise. Test docstrings collapsed to one sentence per ``CLAUDE.md``. **New regression test:** ``test_cache_re_add_flood_does_not_grow_heap_unbounded`` adds 200 records below the cap, then replays them through 10 cycles of varying TTLs (2000 stale pushes total) and asserts the heap stays near the rebuild threshold. The previous code would have grown it to ~11x the record count; the helper keeps it within ``2 * expirations``. **Removed test:** ``test_cache_eviction_rebuilds_heap_when_mostly_stale`` — its premise (build up stale entries, then trigger eviction-side rebuild) is no longer reachable now that ``_async_add`` rebuilds proactively. The same code path is covered by the re-add flood test. **Existing tests updated:** ``test_cache_heap_multi_name_cleanup`` and ``test_cache_heap_pops_order`` previously asserted pre-cleanup heap size (``min_records_to_cleanup + 5``). With proactive rebuild firing inside ``_async_add``, the heap is cleaned up earlier and the post- cleanup invariant (``1 + 5`` after ``async_expire``) still holds, but the intermediate snapshot is no longer accurate. Replaced the stale intermediate assertion with a comment.
1 parent 931358a commit 7c6be63

3 files changed

Lines changed: 99 additions & 255 deletions

File tree

src/zeroconf/_cache.pxd

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,12 @@ cdef class DNSCache:
6767
)
6868
cdef bint _async_add(self, DNSRecord record)
6969

70-
@cython.locals(record=DNSRecord, when_record=tuple, expire_heap_len="unsigned int")
70+
@cython.locals(record=DNSRecord, when_record=tuple)
7171
cdef void _async_evict_oldest(self)
7272

73+
@cython.locals(expire_heap_len="unsigned int")
74+
cdef void _maybe_rebuild_heap(self)
75+
7376
@cython.locals(service_record=DNSService)
7477
cdef void _async_remove(self, DNSRecord record)
7578

src/zeroconf/_cache.py

Lines changed: 31 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,14 @@ def _async_add(self, record: _DNSRecord) -> bool:
110110
store[record] = record
111111
when = record.created + (record.ttl * 1000)
112112
if self._expirations.get(record) != when:
113-
# Avoid adding duplicates to the heap
114113
heappush(self._expire_heap, (when, record))
115114
self._expirations[record] = when
115+
# Re-adds of an existing record with a new TTL push a fresh
116+
# entry but leave the prior tuple behind as stale, so a peer
117+
# that just replays cached records can grow ``_expire_heap``
118+
# without ever tripping the cap. Rebuild when stale entries
119+
# dominate.
120+
self._maybe_rebuild_heap()
116121

117122
if isinstance(record, DNSService):
118123
service_record = record
@@ -122,16 +127,28 @@ def _async_add(self, record: _DNSRecord) -> bool:
122127
return new
123128

124129
def _async_evict_oldest(self) -> None:
125-
"""Drop the closest-to-expiration record to make room for a new one.
126-
127-
Skips stale heap entries (records re-added with a different TTL),
128-
which mirrors the staleness check in ``async_expire``. If the
129-
heap is mostly stale (long stale prefix from sustained TTL
130-
re-adds), rebuild it once up front so the pop loop below
131-
doesn't do O(stale_prefix * log n) work on a single add. Same
132-
heuristic / threshold as ``async_expire``.
130+
"""Drop the closest-to-expiration record to make room for a new one."""
131+
while self._expire_heap:
132+
when_record = heappop(self._expire_heap)
133+
record = when_record[1]
134+
if self._expirations.get(record) != when_record[0]:
135+
continue
136+
self._async_remove(record)
137+
return
133138

134-
This function must be run in from event loop.
139+
def _maybe_rebuild_heap(self) -> None:
140+
"""Rebuild ``_expire_heap`` if stale entries dominate.
141+
142+
Re-adds of an existing record with a new TTL append a fresh
143+
``(when, record)`` and leave the prior tuple behind as stale;
144+
eviction's stale-skip loop and ``async_expire`` already absorb
145+
these, but unchecked accumulation lets a peer that just replays
146+
cached records grow the heap arbitrarily between cleanups.
147+
Same threshold as the long-standing rebuild in ``async_expire``:
148+
only fire when stale entries outweigh live ones (heap > 2 x
149+
expirations), and only above a minimum floor so a small cache
150+
isn't rebuilt for nothing. Amortized cost is O(1) per push;
151+
the O(N) rebuild fires at most once per N stale pushes.
135152
"""
136153
expire_heap_len = len(self._expire_heap)
137154
if (
@@ -142,13 +159,6 @@ def _async_evict_oldest(self) -> None:
142159
entry for entry in self._expire_heap if self._expirations.get(entry[1]) == entry[0]
143160
]
144161
heapify(self._expire_heap)
145-
while self._expire_heap:
146-
when_record = heappop(self._expire_heap)
147-
record = when_record[1]
148-
if self._expirations.get(record) != when_record[0]:
149-
continue
150-
self._async_remove(record)
151-
return
152162

153163
def async_add_records(self, entries: Iterable[DNSRecord]) -> bool:
154164
"""Add multiple records.
@@ -190,43 +200,23 @@ def async_expire(self, now: _float) -> list[DNSRecord]:
190200
191201
:param now: The current time in milliseconds.
192202
"""
193-
if not (expire_heap_len := len(self._expire_heap)):
203+
if not self._expire_heap:
194204
return []
195205

196206
expired: list[DNSRecord] = []
197-
# Find any expired records and add them to the to-delete list
198207
while self._expire_heap:
199208
when_record = self._expire_heap[0]
200209
when = when_record[0]
201210
if when > now:
202211
break
203212
heappop(self._expire_heap)
204-
# Check if the record hasn't been re-added to the heap
205-
# with a different expiration time as it will be removed
206-
# later when it reaches the top of the heap and its
207-
# expiration time is met.
213+
# Skip entries left behind by a TTL re-add; the live tuple is
214+
# later in the heap and will be removed when it reaches the top.
208215
record = when_record[1]
209216
if self._expirations.get(record) == when:
210217
expired.append(record)
211218

212-
# If the expiration heap grows larger than the number expirations
213-
# times two, we clean it up to avoid keeping expired entries in
214-
# the heap and consuming memory. We guard this with a minimum
215-
# threshold to avoid cleaning up the heap too often when there are
216-
# only a few scheduled expirations.
217-
if (
218-
expire_heap_len > _MIN_SCHEDULED_RECORD_EXPIRATION
219-
and expire_heap_len > len(self._expirations) * 2
220-
):
221-
# Remove any expired entries from the expiration heap
222-
# that do not match the expiration time in the expirations
223-
# as it means the record has been re-added to the heap
224-
# with a different expiration time.
225-
self._expire_heap = [
226-
entry for entry in self._expire_heap if self._expirations.get(entry[1]) == entry[0]
227-
]
228-
heapify(self._expire_heap)
229-
219+
self._maybe_rebuild_heap()
230220
self.async_remove_records(expired)
231221
return expired
232222

0 commit comments

Comments
 (0)