Skip to content

Commit 56f1495

Browse files
committed
feat: add reconfirm_record per RFC 6762 §10.4
Add Zeroconf.reconfirm_record / async_reconfirm_record and the AsyncZeroconf wrapper so callers can hint that a cached record may be stale when an application-level connect fails. Re-queries the record at t=0/1s/3s (RFC 6762 §5.2 doubling intervals) and flushes from the cache at t=10s when no fresh response lands.
1 parent cb81e67 commit 56f1495

8 files changed

Lines changed: 331 additions & 6 deletions

File tree

README.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,21 @@ If you don't know the name of the service you need to browse for, try:
141141
from zeroconf import ZeroconfServiceTypes
142142
print('\n'.join(ZeroconfServiceTypes.find()))
143143
144+
If an application-level connect to a discovered service fails and you
145+
suspect the cached record is stale, ``reconfirm_record`` implements
146+
the cache-flush-on-failure hint from RFC 6762 §10.4 — it re-queries
147+
the record and flushes it from the cache if no response arrives
148+
within ten seconds:
149+
150+
.. code-block:: python
151+
152+
cached = zeroconf.cache.get_by_details(name, type_, class_)
153+
if cached is not None:
154+
zeroconf.reconfirm_record(cached)
155+
156+
The async API exposes the same method as
157+
``AsyncZeroconf.async_reconfirm_record(record)``.
158+
144159
See examples directory for more.
145160

146161
Changelog

src/zeroconf/_core.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from types import TracebackType
3232

3333
from ._cache import DNSCache
34-
from ._dns import DNSQuestion, DNSQuestionType
34+
from ._dns import DNSQuestion, DNSQuestionType, DNSRecord
3535
from ._engine import AsyncEngine
3636
from ._exceptions import NonUniqueNameException, NotRunningException
3737
from ._handlers.multicast_outgoing_queue import MulticastOutgoingQueue
@@ -595,6 +595,37 @@ async def async_check_service(
595595
i += 1
596596
next_time += _CHECK_TIME
597597

598+
def async_reconfirm_record(self, record: DNSRecord) -> bool:
599+
"""Hint that ``record`` may be stale (RFC 6762 §10.4).
600+
601+
Schedules a background reconfirmation: re-queries the record
602+
and flushes it from the cache if no response is received within
603+
ten seconds, even when the TTL has not yet expired.
604+
605+
Returns ``True`` if reconfirmation was scheduled; ``False`` if
606+
the record is not in the cache or a reconfirmation is already
607+
in flight for it.
608+
609+
This method is not threadsafe and must be called from the
610+
event loop. Use :meth:`reconfirm_record` from other threads.
611+
"""
612+
return self.record_manager.async_reconfirm_record(record)
613+
614+
def reconfirm_record(self, record: DNSRecord) -> None:
615+
"""Hint that ``record`` may be stale (RFC 6762 §10.4).
616+
617+
Threadsafe variant of :meth:`async_reconfirm_record` — schedules
618+
a background reconfirmation on the event loop and returns
619+
immediately. Caller does not learn whether the record was
620+
already absent or a reconfirmation was already in flight; use
621+
:meth:`async_reconfirm_record` from the loop if that signal
622+
matters.
623+
"""
624+
if self.done:
625+
return
626+
assert self.loop is not None
627+
self.loop.call_soon_threadsafe(self.record_manager.async_reconfirm_record, record)
628+
598629
def add_listener(
599630
self,
600631
listener: RecordUpdateListener,

src/zeroconf/_handlers/record_manager.pxd

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ cdef class RecordManager:
2020
cdef public object zc
2121
cdef public DNSCache cache
2222
cdef public cython.set listeners
23+
cdef public cython.dict _reconfirm_tasks
2324

2425
cpdef void async_updates(self, object now, list records)
2526

@@ -38,5 +39,7 @@ cdef class RecordManager:
3839

3940
cpdef void async_remove_listener(self, RecordUpdateListener listener)
4041

42+
cpdef bint async_reconfirm_record(self, DNSRecord record)
43+
4144
@cython.locals(question=DNSQuestion, record=DNSRecord)
4245
cdef void _async_update_matching_records(self, RecordUpdateListener listener, cython.list questions)

src/zeroconf/_handlers/record_manager.py

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,26 @@
2222

2323
from __future__ import annotations
2424

25+
import asyncio
26+
from functools import partial
2527
from typing import TYPE_CHECKING, cast
2628

2729
from .._cache import _UniqueRecordsType
2830
from .._dns import DNSQuestion, DNSRecord
2931
from .._logger import log
3032
from .._protocol.incoming import DNSIncoming
33+
from .._protocol.outgoing import DNSOutgoing
3134
from .._record_update import RecordUpdate
3235
from .._updates import RecordUpdateListener
33-
from .._utils.time import current_time_millis
34-
from ..const import _ADDRESS_RECORD_TYPES, _DNS_PTR_MIN_TTL, _TYPE_PTR
36+
from .._utils.time import current_time_millis, millis_to_seconds
37+
from ..const import (
38+
_ADDRESS_RECORD_TYPES,
39+
_DNS_PTR_MIN_TTL,
40+
_FLAGS_QR_QUERY,
41+
_RECONFIRM_QUERY_INTERVALS_MS,
42+
_RECONFIRM_TIMEOUT_MS,
43+
_TYPE_PTR,
44+
)
3545

3646
if TYPE_CHECKING:
3747
from .._core import Zeroconf
@@ -42,13 +52,17 @@
4252
class RecordManager:
4353
"""Process records into the cache and notify listeners."""
4454

45-
__slots__ = ("cache", "listeners", "zc")
55+
__slots__ = ("_reconfirm_tasks", "cache", "listeners", "zc")
4656

4757
def __init__(self, zeroconf: Zeroconf) -> None:
4858
"""Init the record manager."""
4959
self.zc = zeroconf
5060
self.cache = zeroconf.cache
5161
self.listeners: set[RecordUpdateListener] = set()
62+
# Active per-record reconfirmations. Keyed by the cache entry
63+
# so that repeated calls for the same record while one is in
64+
# flight are no-ops (RFC 6762 §10.4).
65+
self._reconfirm_tasks: dict[DNSRecord, asyncio.Task] = {}
5266

5367
def async_updates(self, now: _float, records: list[RecordUpdate]) -> None:
5468
"""Used to notify listeners of new information that has updated
@@ -219,3 +233,83 @@ def async_remove_listener(self, listener: RecordUpdateListener) -> None:
219233
self.zc.async_notify_all()
220234
except ValueError as e:
221235
log.exception("Failed to remove listener: %r", e)
236+
237+
def async_reconfirm_record(self, record: DNSRecord) -> bool:
238+
"""Schedule RFC 6762 §10.4 reconfirmation for ``record``."""
239+
cached = self.cache.get(record)
240+
if cached is None:
241+
return False
242+
if cached in self._reconfirm_tasks:
243+
return False
244+
loop = self.zc.loop
245+
if loop is None:
246+
return False
247+
task = loop.create_task(self._async_reconfirm(cached))
248+
self._reconfirm_tasks[cached] = task
249+
task.add_done_callback(partial(self._reconfirm_done, cached))
250+
return True
251+
252+
def _reconfirm_done(self, record: DNSRecord, _task: asyncio.Task) -> None:
253+
"""Drop ``record`` from the active reconfirmation set."""
254+
self._reconfirm_tasks.pop(record, None)
255+
256+
async def _async_reconfirm(self, record: DNSRecord) -> None:
257+
"""Re-query ``record`` and flush from cache if not refreshed.
258+
259+
RFC 6762 §10.4: send two or more queries, then flush the
260+
record if no response arrives within ten seconds.
261+
"""
262+
start = current_time_millis()
263+
original_created = record.created
264+
zc = self.zc
265+
question = DNSQuestion(record.name, record.type, record.class_)
266+
267+
prev_delay_ms = 0
268+
for delay_ms in _RECONFIRM_QUERY_INTERVALS_MS:
269+
wait_ms = delay_ms - prev_delay_ms
270+
if wait_ms > 0:
271+
await asyncio.sleep(millis_to_seconds(wait_ms))
272+
prev_delay_ms = delay_ms
273+
if zc.done:
274+
return
275+
if self._record_refreshed_since(record, original_created):
276+
return
277+
out = DNSOutgoing(_FLAGS_QR_QUERY)
278+
out.add_question(question)
279+
zc.async_send(out)
280+
281+
remaining_ms = _RECONFIRM_TIMEOUT_MS - prev_delay_ms
282+
if remaining_ms > 0:
283+
await asyncio.sleep(millis_to_seconds(remaining_ms))
284+
if zc.done:
285+
return
286+
if self._record_refreshed_since(record, original_created):
287+
return
288+
289+
now = current_time_millis()
290+
elapsed_secs = max(0, int((now - start) / 1000))
291+
log.debug(
292+
"Reconfirmation of %s timed out after %ds; flushing from cache",
293+
record,
294+
elapsed_secs,
295+
)
296+
cached = self.cache.get(record)
297+
if cached is None:
298+
return
299+
# Mark expired so listeners interpret this as a goodbye when
300+
# they re-check ``is_expired(now)`` from inside
301+
# ``async_update_records``. Mirrors the goodbye path in
302+
# ``async_updates_from_response``.
303+
cached._set_created_ttl(now - 1000, 0)
304+
update = RecordUpdate.__new__(RecordUpdate)
305+
update._fast_init(cached, cached)
306+
self.async_updates(now, [update])
307+
self.cache.async_remove_records([cached])
308+
self.async_updates_complete(True)
309+
310+
def _record_refreshed_since(self, record: DNSRecord, original_created: float) -> bool:
311+
"""Return True if the cache holds a newer copy of ``record``."""
312+
cached = self.cache.get(record)
313+
if cached is None:
314+
return True
315+
return cached.created > original_created

src/zeroconf/asyncio.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from types import TracebackType # used in type hints
2929

3030
from ._core import Zeroconf
31-
from ._dns import DNSQuestionType
31+
from ._dns import DNSQuestionType, DNSRecord
3232
from ._exceptions import NotRunningException
3333
from ._services import ServiceListener
3434
from ._services.browser import _ServiceBrowserBase
@@ -224,6 +224,19 @@ async def async_update_service(self, info: ServiceInfo) -> Awaitable:
224224
"""
225225
return await self.zeroconf.async_update_service(info)
226226

227+
def async_reconfirm_record(self, record: DNSRecord) -> bool:
228+
"""Hint that ``record`` may be stale (RFC 6762 §10.4).
229+
230+
Schedules a background reconfirmation: re-queries the record
231+
and flushes it from the cache if no response is received within
232+
ten seconds.
233+
234+
Returns ``True`` if reconfirmation was scheduled; ``False`` if
235+
the record is not in the cache or a reconfirmation is already
236+
in flight for it.
237+
"""
238+
return self.zeroconf.async_reconfirm_record(record)
239+
227240
async def async_close(self) -> None:
228241
"""Ends the background threads, and prevent this instance from
229242
servicing further queries."""

src/zeroconf/const.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@
6565
# level of rate limit and safe guards so we use 1/4 of the recommended value
6666
_DNS_PTR_MIN_TTL = 1125
6767

68+
# RFC 6762 §10.4 cache-flush-on-failure timing.
69+
# Issue queries at t=0, t=1s, t=3s (intervals doubling per
70+
# §5.2), then flush at t=10s if the record was not refreshed
71+
# in the cache. Schedule is fixed by the RFC; do not tighten
72+
# without an interop note.
73+
_RECONFIRM_QUERY_INTERVALS_MS = (0, 1000, 3000)
74+
_RECONFIRM_TIMEOUT_MS = 10000
75+
6876
# Upper bound on the number of records the DNSCache will hold before it
6977
# starts evicting the closest-to-expiration entry to make room for new
7078
# arrivals. Bounds the memory a malicious LAN peer can force the cache

tests/conftest.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import pytest_asyncio
1111

1212
from zeroconf import Zeroconf, _core, const
13-
from zeroconf._handlers import query_handler
13+
from zeroconf._handlers import query_handler, record_manager
1414
from zeroconf._services import browser as service_browser
1515
from zeroconf._services import info as service_info
1616
from zeroconf.asyncio import AsyncZeroconf
@@ -188,6 +188,23 @@ def quick_aggregation_timing() -> Generator[None]:
188188
yield
189189

190190

191+
@pytest.fixture
192+
def quick_reconfirm_timing() -> Generator[None]:
193+
"""Shrink RFC 6762 §10.4 reconfirm intervals for loopback tests.
194+
195+
The production values (queries at t=0/1s/3s, flush at t=10s) come
196+
straight from the RFC and exist for real-network interop. Loopback
197+
tests can compress them aggressively without losing what the test
198+
pins. Opt in via fixture arg or
199+
`@pytest.mark.usefixtures("quick_reconfirm_timing")`.
200+
"""
201+
with (
202+
patch.object(record_manager, "_RECONFIRM_QUERY_INTERVALS_MS", (0, 10, 30)),
203+
patch.object(record_manager, "_RECONFIRM_TIMEOUT_MS", 100),
204+
):
205+
yield
206+
207+
191208
@pytest.fixture
192209
def quick_request_timing() -> Generator[None]:
193210
"""Shorten the initial-query delay used by AsyncServiceInfo.async_request.

0 commit comments

Comments
 (0)