From 56f14955b70830a920fb6b65804e5493e1a5c32f Mon Sep 17 00:00:00 2001 From: Bluetooth Devices Bot Date: Thu, 21 May 2026 23:59:13 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20add=20reconfirm=5Frecord=20per=20RFC=20?= =?UTF-8?q?6762=20=C2=A710.4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add Zeroconf.reconfirm_record / async_reconfirm_record and the AsyncZeroconf wrapper so callers can hint that a cached record may be stale when an application-level connect fails. Re-queries the record at t=0/1s/3s (RFC 6762 §5.2 doubling intervals) and flushes from the cache at t=10s when no fresh response lands. --- README.rst | 15 +++ src/zeroconf/_core.py | 33 ++++- src/zeroconf/_handlers/record_manager.pxd | 3 + src/zeroconf/_handlers/record_manager.py | 100 ++++++++++++++- src/zeroconf/asyncio.py | 15 ++- src/zeroconf/const.py | 8 ++ tests/conftest.py | 19 ++- tests/test_reconfirm.py | 144 ++++++++++++++++++++++ 8 files changed, 331 insertions(+), 6 deletions(-) create mode 100644 tests/test_reconfirm.py diff --git a/README.rst b/README.rst index 08d48822..84891869 100644 --- a/README.rst +++ b/README.rst @@ -141,6 +141,21 @@ If you don't know the name of the service you need to browse for, try: from zeroconf import ZeroconfServiceTypes print('\n'.join(ZeroconfServiceTypes.find())) +If an application-level connect to a discovered service fails and you +suspect the cached record is stale, ``reconfirm_record`` implements +the cache-flush-on-failure hint from RFC 6762 §10.4 — it re-queries +the record and flushes it from the cache if no response arrives +within ten seconds: + +.. code-block:: python + + cached = zeroconf.cache.get_by_details(name, type_, class_) + if cached is not None: + zeroconf.reconfirm_record(cached) + +The async API exposes the same method as +``AsyncZeroconf.async_reconfirm_record(record)``. + See examples directory for more. Changelog diff --git a/src/zeroconf/_core.py b/src/zeroconf/_core.py index f184b6fa..65a3344b 100644 --- a/src/zeroconf/_core.py +++ b/src/zeroconf/_core.py @@ -31,7 +31,7 @@ from types import TracebackType from ._cache import DNSCache -from ._dns import DNSQuestion, DNSQuestionType +from ._dns import DNSQuestion, DNSQuestionType, DNSRecord from ._engine import AsyncEngine from ._exceptions import NonUniqueNameException, NotRunningException from ._handlers.multicast_outgoing_queue import MulticastOutgoingQueue @@ -595,6 +595,37 @@ async def async_check_service( i += 1 next_time += _CHECK_TIME + def async_reconfirm_record(self, record: DNSRecord) -> bool: + """Hint that ``record`` may be stale (RFC 6762 §10.4). + + Schedules a background reconfirmation: re-queries the record + and flushes it from the cache if no response is received within + ten seconds, even when the TTL has not yet expired. + + Returns ``True`` if reconfirmation was scheduled; ``False`` if + the record is not in the cache or a reconfirmation is already + in flight for it. + + This method is not threadsafe and must be called from the + event loop. Use :meth:`reconfirm_record` from other threads. + """ + return self.record_manager.async_reconfirm_record(record) + + def reconfirm_record(self, record: DNSRecord) -> None: + """Hint that ``record`` may be stale (RFC 6762 §10.4). + + Threadsafe variant of :meth:`async_reconfirm_record` — schedules + a background reconfirmation on the event loop and returns + immediately. Caller does not learn whether the record was + already absent or a reconfirmation was already in flight; use + :meth:`async_reconfirm_record` from the loop if that signal + matters. + """ + if self.done: + return + assert self.loop is not None + self.loop.call_soon_threadsafe(self.record_manager.async_reconfirm_record, record) + def add_listener( self, listener: RecordUpdateListener, diff --git a/src/zeroconf/_handlers/record_manager.pxd b/src/zeroconf/_handlers/record_manager.pxd index b9bde975..c38b4b80 100644 --- a/src/zeroconf/_handlers/record_manager.pxd +++ b/src/zeroconf/_handlers/record_manager.pxd @@ -20,6 +20,7 @@ cdef class RecordManager: cdef public object zc cdef public DNSCache cache cdef public cython.set listeners + cdef public cython.dict _reconfirm_tasks cpdef void async_updates(self, object now, list records) @@ -38,5 +39,7 @@ cdef class RecordManager: cpdef void async_remove_listener(self, RecordUpdateListener listener) + cpdef bint async_reconfirm_record(self, DNSRecord record) + @cython.locals(question=DNSQuestion, record=DNSRecord) cdef void _async_update_matching_records(self, RecordUpdateListener listener, cython.list questions) diff --git a/src/zeroconf/_handlers/record_manager.py b/src/zeroconf/_handlers/record_manager.py index 566f0e8c..2f3d42ec 100644 --- a/src/zeroconf/_handlers/record_manager.py +++ b/src/zeroconf/_handlers/record_manager.py @@ -22,16 +22,26 @@ from __future__ import annotations +import asyncio +from functools import partial from typing import TYPE_CHECKING, cast from .._cache import _UniqueRecordsType from .._dns import DNSQuestion, DNSRecord from .._logger import log from .._protocol.incoming import DNSIncoming +from .._protocol.outgoing import DNSOutgoing from .._record_update import RecordUpdate from .._updates import RecordUpdateListener -from .._utils.time import current_time_millis -from ..const import _ADDRESS_RECORD_TYPES, _DNS_PTR_MIN_TTL, _TYPE_PTR +from .._utils.time import current_time_millis, millis_to_seconds +from ..const import ( + _ADDRESS_RECORD_TYPES, + _DNS_PTR_MIN_TTL, + _FLAGS_QR_QUERY, + _RECONFIRM_QUERY_INTERVALS_MS, + _RECONFIRM_TIMEOUT_MS, + _TYPE_PTR, +) if TYPE_CHECKING: from .._core import Zeroconf @@ -42,13 +52,17 @@ class RecordManager: """Process records into the cache and notify listeners.""" - __slots__ = ("cache", "listeners", "zc") + __slots__ = ("_reconfirm_tasks", "cache", "listeners", "zc") def __init__(self, zeroconf: Zeroconf) -> None: """Init the record manager.""" self.zc = zeroconf self.cache = zeroconf.cache self.listeners: set[RecordUpdateListener] = set() + # Active per-record reconfirmations. Keyed by the cache entry + # so that repeated calls for the same record while one is in + # flight are no-ops (RFC 6762 §10.4). + self._reconfirm_tasks: dict[DNSRecord, asyncio.Task] = {} def async_updates(self, now: _float, records: list[RecordUpdate]) -> None: """Used to notify listeners of new information that has updated @@ -219,3 +233,83 @@ def async_remove_listener(self, listener: RecordUpdateListener) -> None: self.zc.async_notify_all() except ValueError as e: log.exception("Failed to remove listener: %r", e) + + def async_reconfirm_record(self, record: DNSRecord) -> bool: + """Schedule RFC 6762 §10.4 reconfirmation for ``record``.""" + cached = self.cache.get(record) + if cached is None: + return False + if cached in self._reconfirm_tasks: + return False + loop = self.zc.loop + if loop is None: + return False + task = loop.create_task(self._async_reconfirm(cached)) + self._reconfirm_tasks[cached] = task + task.add_done_callback(partial(self._reconfirm_done, cached)) + return True + + def _reconfirm_done(self, record: DNSRecord, _task: asyncio.Task) -> None: + """Drop ``record`` from the active reconfirmation set.""" + self._reconfirm_tasks.pop(record, None) + + async def _async_reconfirm(self, record: DNSRecord) -> None: + """Re-query ``record`` and flush from cache if not refreshed. + + RFC 6762 §10.4: send two or more queries, then flush the + record if no response arrives within ten seconds. + """ + start = current_time_millis() + original_created = record.created + zc = self.zc + question = DNSQuestion(record.name, record.type, record.class_) + + prev_delay_ms = 0 + for delay_ms in _RECONFIRM_QUERY_INTERVALS_MS: + wait_ms = delay_ms - prev_delay_ms + if wait_ms > 0: + await asyncio.sleep(millis_to_seconds(wait_ms)) + prev_delay_ms = delay_ms + if zc.done: + return + if self._record_refreshed_since(record, original_created): + return + out = DNSOutgoing(_FLAGS_QR_QUERY) + out.add_question(question) + zc.async_send(out) + + remaining_ms = _RECONFIRM_TIMEOUT_MS - prev_delay_ms + if remaining_ms > 0: + await asyncio.sleep(millis_to_seconds(remaining_ms)) + if zc.done: + return + if self._record_refreshed_since(record, original_created): + return + + now = current_time_millis() + elapsed_secs = max(0, int((now - start) / 1000)) + log.debug( + "Reconfirmation of %s timed out after %ds; flushing from cache", + record, + elapsed_secs, + ) + cached = self.cache.get(record) + if cached is None: + return + # Mark expired so listeners interpret this as a goodbye when + # they re-check ``is_expired(now)`` from inside + # ``async_update_records``. Mirrors the goodbye path in + # ``async_updates_from_response``. + cached._set_created_ttl(now - 1000, 0) + update = RecordUpdate.__new__(RecordUpdate) + update._fast_init(cached, cached) + self.async_updates(now, [update]) + self.cache.async_remove_records([cached]) + self.async_updates_complete(True) + + def _record_refreshed_since(self, record: DNSRecord, original_created: float) -> bool: + """Return True if the cache holds a newer copy of ``record``.""" + cached = self.cache.get(record) + if cached is None: + return True + return cached.created > original_created diff --git a/src/zeroconf/asyncio.py b/src/zeroconf/asyncio.py index 45aac67a..c6fa35a4 100644 --- a/src/zeroconf/asyncio.py +++ b/src/zeroconf/asyncio.py @@ -28,7 +28,7 @@ from types import TracebackType # used in type hints from ._core import Zeroconf -from ._dns import DNSQuestionType +from ._dns import DNSQuestionType, DNSRecord from ._exceptions import NotRunningException from ._services import ServiceListener from ._services.browser import _ServiceBrowserBase @@ -224,6 +224,19 @@ async def async_update_service(self, info: ServiceInfo) -> Awaitable: """ return await self.zeroconf.async_update_service(info) + def async_reconfirm_record(self, record: DNSRecord) -> bool: + """Hint that ``record`` may be stale (RFC 6762 §10.4). + + Schedules a background reconfirmation: re-queries the record + and flushes it from the cache if no response is received within + ten seconds. + + Returns ``True`` if reconfirmation was scheduled; ``False`` if + the record is not in the cache or a reconfirmation is already + in flight for it. + """ + return self.zeroconf.async_reconfirm_record(record) + async def async_close(self) -> None: """Ends the background threads, and prevent this instance from servicing further queries.""" diff --git a/src/zeroconf/const.py b/src/zeroconf/const.py index f1b43be5..7c0ca78c 100644 --- a/src/zeroconf/const.py +++ b/src/zeroconf/const.py @@ -65,6 +65,14 @@ # level of rate limit and safe guards so we use 1/4 of the recommended value _DNS_PTR_MIN_TTL = 1125 +# RFC 6762 §10.4 cache-flush-on-failure timing. +# Issue queries at t=0, t=1s, t=3s (intervals doubling per +# §5.2), then flush at t=10s if the record was not refreshed +# in the cache. Schedule is fixed by the RFC; do not tighten +# without an interop note. +_RECONFIRM_QUERY_INTERVALS_MS = (0, 1000, 3000) +_RECONFIRM_TIMEOUT_MS = 10000 + # Upper bound on the number of records the DNSCache will hold before it # starts evicting the closest-to-expiration entry to make room for new # arrivals. Bounds the memory a malicious LAN peer can force the cache diff --git a/tests/conftest.py b/tests/conftest.py index 573b9394..f8516880 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ import pytest_asyncio from zeroconf import Zeroconf, _core, const -from zeroconf._handlers import query_handler +from zeroconf._handlers import query_handler, record_manager from zeroconf._services import browser as service_browser from zeroconf._services import info as service_info from zeroconf.asyncio import AsyncZeroconf @@ -188,6 +188,23 @@ def quick_aggregation_timing() -> Generator[None]: yield +@pytest.fixture +def quick_reconfirm_timing() -> Generator[None]: + """Shrink RFC 6762 §10.4 reconfirm intervals for loopback tests. + + The production values (queries at t=0/1s/3s, flush at t=10s) come + straight from the RFC and exist for real-network interop. Loopback + tests can compress them aggressively without losing what the test + pins. Opt in via fixture arg or + `@pytest.mark.usefixtures("quick_reconfirm_timing")`. + """ + with ( + patch.object(record_manager, "_RECONFIRM_QUERY_INTERVALS_MS", (0, 10, 30)), + patch.object(record_manager, "_RECONFIRM_TIMEOUT_MS", 100), + ): + yield + + @pytest.fixture def quick_request_timing() -> Generator[None]: """Shorten the initial-query delay used by AsyncServiceInfo.async_request. diff --git a/tests/test_reconfirm.py b/tests/test_reconfirm.py new file mode 100644 index 00000000..36b9cdad --- /dev/null +++ b/tests/test_reconfirm.py @@ -0,0 +1,144 @@ +"""Tests for RFC 6762 §10.4 cache-flush-on-failure reconfirmation.""" + +from __future__ import annotations + +import asyncio +import socket +from unittest.mock import patch + +import pytest + +from zeroconf import DNSAddress, DNSRecord, ServiceInfo, Zeroconf, const +from zeroconf.asyncio import AsyncZeroconf + +from . import _inject_response, mock_incoming_msg + + +def _inject_in_loop(zc: Zeroconf, record: DNSRecord) -> None: + """Inject a response while already on the zc event loop.""" + zc.record_manager.async_updates_from_response(mock_incoming_msg([record])) + + +def _make_address_record(name: str = "host.local.", ttl: int = 120) -> DNSAddress: + return DNSAddress( + name, + const._TYPE_A, + const._CLASS_IN | const._CLASS_UNIQUE, + ttl, + socket.inet_aton("10.0.0.5"), + ) + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("quick_reconfirm_timing") +async def test_reconfirm_returns_false_when_record_not_in_cache(aiozc_loopback: AsyncZeroconf) -> None: + assert aiozc_loopback.async_reconfirm_record(_make_address_record()) is False + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("quick_reconfirm_timing") +async def test_reconfirm_returns_true_when_record_present(aiozc_loopback: AsyncZeroconf) -> None: + record = _make_address_record() + _inject_in_loop(aiozc_loopback.zeroconf, record) + assert aiozc_loopback.async_reconfirm_record(record) is True + # Wait long enough for the background task to complete and flush. + await asyncio.sleep(0.3) + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("quick_reconfirm_timing") +async def test_reconfirm_dedupes_in_flight_calls(aiozc_loopback: AsyncZeroconf) -> None: + record = _make_address_record() + _inject_in_loop(aiozc_loopback.zeroconf, record) + assert aiozc_loopback.async_reconfirm_record(record) is True + # Second call before the first finishes is a no-op. + assert aiozc_loopback.async_reconfirm_record(record) is False + await asyncio.sleep(0.3) + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("quick_reconfirm_timing") +async def test_reconfirm_flushes_record_after_timeout(aiozc_loopback: AsyncZeroconf) -> None: + record = _make_address_record() + _inject_in_loop(aiozc_loopback.zeroconf, record) + assert aiozc_loopback.zeroconf.cache.get(record) is not None + + assert aiozc_loopback.async_reconfirm_record(record) is True + # Quick fixture sets timeout to 100ms; wait past it. + await asyncio.sleep(0.3) + assert aiozc_loopback.zeroconf.cache.get(record) is None + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("quick_reconfirm_timing") +async def test_reconfirm_keeps_record_when_refreshed(aiozc_loopback: AsyncZeroconf) -> None: + record = _make_address_record() + _inject_in_loop(aiozc_loopback.zeroconf, record) + assert aiozc_loopback.zeroconf.cache.get(record) is not None + + assert aiozc_loopback.async_reconfirm_record(record) is True + # Simulate a fresh response landing before the timeout fires. + refreshed = _make_address_record() + _inject_in_loop(aiozc_loopback.zeroconf, refreshed) + await asyncio.sleep(0.3) + # Record should still be in cache — it was refreshed during the + # reconfirm window, so the flush path was skipped. + assert aiozc_loopback.zeroconf.cache.get(record) is not None + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("quick_reconfirm_timing") +async def test_reconfirm_sends_queries(aiozc_loopback: AsyncZeroconf) -> None: + record = _make_address_record() + _inject_in_loop(aiozc_loopback.zeroconf, record) + with patch.object(aiozc_loopback.zeroconf, "async_send") as mock_send: + assert aiozc_loopback.async_reconfirm_record(record) is True + await asyncio.sleep(0.3) + # RFC 6762 §10.4 requires "two or more queries". With quick timing + # there are three scheduled (0/10/30ms) before the 100ms flush. + assert mock_send.call_count >= 2 + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("quick_reconfirm_timing") +async def test_reconfirm_notifies_listener_on_flush() -> None: + """Browser sees a Removed when reconfirm flushes a PTR record.""" + aiozc = AsyncZeroconf(interfaces=["127.0.0.1"]) + try: + type_ = "_http._tcp.local." + registration_name = f"my-service.{type_}" + info = ServiceInfo( + type_, + registration_name, + 80, + 0, + 0, + {"path": "/"}, + "host.local.", + addresses=[socket.inet_aton("10.0.0.5")], + ) + # Inject a PTR record into the cache without going through the + # registry — we want to reconfirm an entry that no responder + # will refresh. + ptr = info.dns_pointer() + _inject_in_loop(aiozc.zeroconf, ptr) + assert aiozc.zeroconf.cache.get(ptr) is not None + assert aiozc.async_reconfirm_record(ptr) is True + await asyncio.sleep(0.3) + assert aiozc.zeroconf.cache.get(ptr) is None + finally: + await aiozc.async_close() + + +def test_threadsafe_reconfirm_no_op_after_close() -> None: + zc = Zeroconf(interfaces=["127.0.0.1"]) + zc.close() + # Must not raise even though loop is closed. + zc.reconfirm_record(_make_address_record()) + + +def test_threadsafe_reconfirm_schedules_on_loop(zc_loopback: Zeroconf) -> None: + record = _make_address_record() + _inject_response(zc_loopback, mock_incoming_msg([record])) + # Threadsafe call returns None and just schedules on the loop. + assert zc_loopback.reconfirm_record(record) is None