Skip to content

Commit 931358a

Browse files
committed
fix: re-resolve store after eviction and rebuild stale-prefix heap
Address review feedback on #1718. **Orphaned-store bug (blocking).** ``_async_add`` was capturing ``store = self.cache.get(record.key)`` before calling ``_async_evict_oldest``. If the closest-to-expiration victim was the last record under ``record.key`` (e.g. the incoming record shares its key with the victim — A+AAAA pairs, PTR fan-outs, TXT/SRV pairs all hit this in real workloads), ``_remove_key`` would ``del self.cache[record.key]`` during eviction, leaving the captured ``store`` pointing at an orphaned dict. The subsequent ``store[record] = record`` then wrote into a dict no longer reachable through the cache; the new record was effectively lost, and a later ``async_expire`` of that record would ``KeyError`` from ``_remove_key``. Defer the bucket creation: look up ``store`` first (without creating), do the eviction-if-needed pass, then ``cache.get`` again to pick up any deletion eviction caused. The bucket is only created with ``self.cache[record.key] = {}`` after we know eviction has settled. **Stale-prefix scan (suggestion #1).** Under sustained TTL re-adds — e.g. ``async_mark_unique_records_older_than_1s_to_expire`` rewriting TTLs to 1 — ``_expire_heap`` accumulates stale ``(when, record)`` tuples that no longer match ``_expirations[record]``. The previous eviction body would pop those one at a time, doing O(stale_prefix * log n) work per ``_async_add``. Reuse the same ``len(heap) > _MIN_SCHEDULED_RECORD_EXPIRATION and len(heap) > len(expirations) * 2`` heuristic that ``async_expire`` already runs: when the heap is mostly stale, rebuild it once up front, then run the pop loop against only live entries. **Accounting invariant test.** ``_total_records`` is ``cdef public unsigned int``; an off-by-one in ``_async_add`` / ``_async_remove`` would silently underflow in the Cython wheel and pin the cap-check True forever (eviction storm on every add). Rather than defensively zero-clamp the decrement — which would hide the bug — the new ``test_cache_total_records_invariant_under_mixed_ops`` walks the counter through every code path that touches it (fresh inserts, re-adds, DNSService, DNSNsec, shared-key inserts that empty their bucket on removal, full-cap eviction loop, ``async_expire``, ``async_remove_records``) and asserts ``cache._total_records == sum(len(s) for s in cache.cache.values())`` after every step. Any future change that misses an increment or doubles a decrement fails loudly. Regression tests: - ``test_cache_eviction_victim_shares_key_with_new_record`` — reproduces the orphan; would have failed against c24fa39. - ``test_cache_eviction_rebuilds_heap_when_mostly_stale`` — fills to cap then re-adds with two distinct TTLs to push the heap to ~3x MAX, then triggers eviction and asserts the rebuild ran (heap and expirations match after the pass). - ``test_cache_total_records_invariant_under_mixed_ops`` — counter invariant across all touched code paths. The ``.pxd`` adds ``expire_heap_len`` as ``unsigned int`` to ``_async_evict_oldest``'s locals so the new threshold comparison stays a direct C int op in the Cython build.
1 parent 51fbc27 commit 931358a

3 files changed

Lines changed: 237 additions & 5 deletions

File tree

src/zeroconf/_cache.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ cdef class DNSCache:
6767
)
6868
cdef bint _async_add(self, DNSRecord record)
6969

70-
@cython.locals(record=DNSRecord, when_record=tuple)
70+
@cython.locals(record=DNSRecord, when_record=tuple, expire_heap_len="unsigned int")
7171
cdef void _async_evict_oldest(self)
7272

7373
@cython.locals(service_record=DNSService)

src/zeroconf/_cache.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,20 @@ def _async_add(self, record: _DNSRecord) -> bool:
9090
# replaces any existing records that are __eq__ to each other which
9191
# removes the risk that accessing the cache from the wrong
9292
# direction would return the old incorrect entry.
93-
if (store := self.cache.get(record.key)) is None:
94-
store = self.cache[record.key] = {}
95-
is_new = record not in store
93+
store = self.cache.get(record.key)
94+
is_new = store is None or record not in store
9695
# Bound total cache size; evict closest-to-expiration entry to
9796
# make room before inserting a new record. Prevents a LAN-local
9897
# flood of unique-name records from growing the cache without
9998
# bound (RFC 6762 §10 advisory caching, defense-in-depth).
10099
if is_new and self._total_records >= _MAX_CACHE_RECORDS:
101100
self._async_evict_oldest()
101+
# The victim may have been the last record under
102+
# ``record.key``, in which case ``_remove_key`` deleted
103+
# the bucket. Re-fetch before creating below.
104+
store = self.cache.get(record.key)
105+
if store is None:
106+
store = self.cache[record.key] = {}
102107
new = is_new and not isinstance(record, DNSNsec)
103108
if is_new:
104109
self._total_records += 1
@@ -120,10 +125,23 @@ def _async_evict_oldest(self) -> None:
120125
"""Drop the closest-to-expiration record to make room for a new one.
121126
122127
Skips stale heap entries (records re-added with a different TTL),
123-
which mirrors the staleness check in ``async_expire``.
128+
which mirrors the staleness check in ``async_expire``. If the
129+
heap is mostly stale (long stale prefix from sustained TTL
130+
re-adds), rebuild it once up front so the pop loop below
131+
doesn't do O(stale_prefix * log n) work on a single add. Same
132+
heuristic / threshold as ``async_expire``.
124133
125134
This function must be run in from event loop.
126135
"""
136+
expire_heap_len = len(self._expire_heap)
137+
if (
138+
expire_heap_len > _MIN_SCHEDULED_RECORD_EXPIRATION
139+
and expire_heap_len > len(self._expirations) * 2
140+
):
141+
self._expire_heap = [
142+
entry for entry in self._expire_heap if self._expirations.get(entry[1]) == entry[0]
143+
]
144+
heapify(self._expire_heap)
127145
while self._expire_heap:
128146
when_record = heappop(self._expire_heap)
129147
record = when_record[1]

tests/test_cache.py

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,128 @@ def test_cache_eviction_skips_stale_heap_entries() -> None:
567567
assert "stale-1.local." not in cache.cache
568568

569569

570+
def test_cache_eviction_victim_shares_key_with_new_record() -> None:
571+
"""Eviction must not orphan the new record when it collides on ``record.key``.
572+
573+
If the closest-to-expiration record is the *last* one stored under
574+
``record.key`` and the incoming record uses the same key,
575+
``_remove_key`` deletes ``self.cache[record.key]`` during eviction.
576+
A previous capture of ``store = self.cache.get(record.key)`` would
577+
then write the new record into an orphaned dict not reachable via
578+
the cache. Pin that the new record is reachable.
579+
"""
580+
cache = r.DNSCache()
581+
now = r.current_time_millis()
582+
# Fill the cache to one shy of the cap with unique-name records, each
583+
# with a later created time than the shared-key victim below.
584+
cache.async_add_records(
585+
r.DNSAddress(
586+
f"filler-{i}.local.",
587+
const._TYPE_A,
588+
const._CLASS_IN,
589+
120,
590+
bytes((i & 0xFF, (i >> 8) & 0xFF, 0, 1)),
591+
created=now + 1000 + i,
592+
)
593+
for i in range(const._MAX_CACHE_RECORDS - 1)
594+
)
595+
596+
# Insert a record at "shared.local." with the earliest expiration —
597+
# this is the one closest_to_expiration eviction will pick.
598+
shared_key = "shared.local."
599+
old_shared = r.DNSAddress(
600+
shared_key,
601+
const._TYPE_A,
602+
const._CLASS_IN,
603+
120,
604+
b"\x01\x02\x03\x04",
605+
created=now,
606+
)
607+
cache.async_add_records([old_shared])
608+
assert cache._total_records == const._MAX_CACHE_RECORDS
609+
assert shared_key in cache.cache
610+
611+
# Add a new record with the SAME key. Eviction will remove old_shared,
612+
# which empties cache[shared_key], so _remove_key deletes that bucket.
613+
# If _async_add captured the store before eviction it will now be
614+
# writing into an orphaned dict.
615+
new_shared = r.DNSAddress(
616+
shared_key,
617+
const._TYPE_A,
618+
const._CLASS_IN,
619+
120,
620+
b"\x05\x06\x07\x08",
621+
created=now + 999,
622+
)
623+
cache.async_add_records([new_shared])
624+
625+
# The new record must be reachable via the cache.
626+
assert shared_key in cache.cache, "new record orphaned: cache bucket missing"
627+
assert new_shared in cache.cache[shared_key]
628+
assert cache.async_get_unique(new_shared) == new_shared
629+
# And the counter accounting must still match observable state.
630+
total = sum(len(store) for store in cache.cache.values())
631+
assert total == cache._total_records
632+
633+
634+
def test_cache_eviction_rebuilds_heap_when_mostly_stale() -> None:
635+
"""Eviction rebuilds ``_expire_heap`` up front when stale entries dominate.
636+
637+
Without the rebuild, a single ``_async_add`` at cap could pop a long
638+
stale prefix one entry at a time — O(stale_prefix * log n). The
639+
rebuild is the same heuristic ``async_expire`` already uses: when
640+
``len(heap) > 2 * len(expirations)``, the heap is reconstructed
641+
from only the valid entries in one O(n) pass before the pop loop.
642+
"""
643+
cache = r.DNSCache()
644+
now = r.current_time_millis()
645+
base_kwargs = {"type_": const._TYPE_A, "class_": const._CLASS_IN}
646+
647+
cache.async_add_records(
648+
r.DNSAddress(
649+
f"r-{i}.local.",
650+
ttl=120,
651+
address=bytes((i & 0xFF, (i >> 8) & 0xFF, 0, 1)),
652+
created=now + i,
653+
**base_kwargs,
654+
)
655+
for i in range(const._MAX_CACHE_RECORDS)
656+
)
657+
# Two re-adds with distinct TTLs leave two stale entries per record
658+
# in ``_expire_heap`` while ``_expirations`` stays at MAX. Heap reaches
659+
# ~3x MAX, which trips the ``len(heap) > 2 * len(expirations)``
660+
# threshold on the next eviction.
661+
for ttl in (7200, 14400):
662+
cache.async_add_records(
663+
r.DNSAddress(
664+
f"r-{i}.local.",
665+
ttl=ttl,
666+
address=bytes((i & 0xFF, (i >> 8) & 0xFF, 0, 1)),
667+
created=now + i,
668+
**base_kwargs,
669+
)
670+
for i in range(const._MAX_CACHE_RECORDS)
671+
)
672+
assert len(cache._expire_heap) > 2 * len(cache._expirations)
673+
674+
# One more insert pushes us over the cap; eviction fires and must
675+
# rebuild before scanning.
676+
cache.async_add_records(
677+
[
678+
r.DNSAddress(
679+
"trigger.local.",
680+
ttl=120,
681+
address=b"\xff\xff\xff\xff",
682+
created=now + const._MAX_CACHE_RECORDS,
683+
**base_kwargs,
684+
)
685+
]
686+
)
687+
# Rebuild dropped the stale entries; heap and expirations now match.
688+
assert len(cache._expire_heap) == len(cache._expirations) == const._MAX_CACHE_RECORDS
689+
assert cache._total_records == const._MAX_CACHE_RECORDS
690+
691+
570692
def test_cache_eviction_decrements_total_records() -> None:
571693
"""Natural removal (goodbyes, expirations) must keep ``_total_records`` in sync."""
572694
cache = r.DNSCache()
@@ -592,3 +714,95 @@ def test_cache_eviction_decrements_total_records() -> None:
592714
cache.async_expire(now + (200 * 1000))
593715
assert cache._total_records == 0
594716
assert not cache.cache
717+
718+
719+
def test_cache_total_records_invariant_under_mixed_ops() -> None:
720+
"""``_total_records`` must equal ``sum(len(store) for store in cache.values())``.
721+
722+
Walks the counter through every code path that touches it — new inserts,
723+
re-adds of an existing record (no increment), DNSService (extra
724+
service_cache write), DNSNsec (stored but not counted as "new" by the
725+
flag), shared-key inserts that empty their bucket on removal, full-cap
726+
eviction, async_expire, manual async_remove_records — and asserts the
727+
invariant after every step. Any future change that misses an increment
728+
or doubles a decrement will trip this immediately; ``_total_records`` is
729+
``cdef unsigned int`` in the Cython build, so silent underflow would
730+
otherwise propagate as a permanent eviction-on-every-add storm.
731+
"""
732+
cache = r.DNSCache()
733+
now = r.current_time_millis()
734+
735+
def actual() -> int:
736+
return sum(len(store) for store in cache.cache.values())
737+
738+
# Fresh inserts: counter follows insert count exactly.
739+
addrs = [
740+
r.DNSAddress(
741+
f"mix-{i}.local.", const._TYPE_A, const._CLASS_IN, 120, bytes((i, 0, 0, 1)), created=now + i
742+
)
743+
for i in range(20)
744+
]
745+
cache.async_add_records(addrs)
746+
assert cache._total_records == actual() == 20
747+
748+
# Re-add of an identical record: no increment (already present).
749+
cache.async_add_records([addrs[0]])
750+
assert cache._total_records == actual() == 20
751+
752+
# DNSService writes service_cache too — counter must still match cache size.
753+
svc = r.DNSService("svc.local.", const._TYPE_SRV, const._CLASS_IN, 120, 0, 0, 80, "host.local.")
754+
cache.async_add_records([svc])
755+
assert cache._total_records == actual() == 21
756+
cache.async_remove_records([svc])
757+
assert cache._total_records == actual() == 20
758+
759+
# DNSNsec is stored but excluded from the "new" return value; the counter
760+
# still tracks it because it occupies a bucket slot.
761+
nsec = r.DNSNsec(
762+
"nsec.local.",
763+
const._TYPE_NSEC,
764+
const._CLASS_IN,
765+
120,
766+
"nsec.local.",
767+
[const._TYPE_A],
768+
)
769+
cache.async_add_records([nsec])
770+
assert cache._total_records == actual() == 21
771+
cache.async_remove_records([nsec])
772+
assert cache._total_records == actual() == 20
773+
774+
# Shared-key insert/remove: emptying a bucket removes the cache key, but
775+
# the counter must still decrement only by the number of records gone.
776+
shared_a = r.DNSAddress(
777+
"shared.local.", const._TYPE_A, const._CLASS_IN, 120, b"\x01\x01\x01\x01", created=now
778+
)
779+
shared_b = r.DNSAddress(
780+
"shared.local.", const._TYPE_A, const._CLASS_IN, 120, b"\x02\x02\x02\x02", created=now
781+
)
782+
cache.async_add_records([shared_a, shared_b])
783+
assert cache._total_records == actual() == 22
784+
cache.async_remove_records([shared_a, shared_b])
785+
assert cache._total_records == actual() == 20
786+
assert "shared.local." not in cache.cache
787+
788+
# async_expire path: counter must follow the records dropped.
789+
cache.async_expire(now + (200 * 1000))
790+
assert cache._total_records == actual() == 0
791+
assert not cache.cache
792+
793+
# Full-cap eviction loop: counter never grows past the cap, never drifts.
794+
cap_records = [
795+
r.DNSAddress(
796+
f"cap-{i}.local.",
797+
const._TYPE_A,
798+
const._CLASS_IN,
799+
120,
800+
bytes((i & 0xFF, (i >> 8) & 0xFF, 0, 1)),
801+
created=now + i,
802+
)
803+
for i in range(const._MAX_CACHE_RECORDS + 50)
804+
]
805+
for rec in cap_records:
806+
cache.async_add_records([rec])
807+
assert cache._total_records == actual()
808+
assert cache._total_records == const._MAX_CACHE_RECORDS

0 commit comments

Comments
 (0)