Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@ If you don't know the name of the service you need to browse for, try:
from zeroconf import ZeroconfServiceTypes
print('\n'.join(ZeroconfServiceTypes.find()))

If an application-level connect to a discovered service fails and you
suspect the cached record is stale, ``reconfirm_record`` implements
the cache-flush-on-failure hint from RFC 6762 §10.4 — it re-queries
the record and flushes it from the cache if no response arrives
within ten seconds:

.. code-block:: python

cached = zeroconf.cache.get_by_details(name, type_, class_)
if cached is not None:
zeroconf.reconfirm_record(cached)

The async API exposes the same method as
``AsyncZeroconf.async_reconfirm_record(record)``.

See examples directory for more.

Changelog
Expand Down
33 changes: 32 additions & 1 deletion src/zeroconf/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from types import TracebackType

from ._cache import DNSCache
from ._dns import DNSQuestion, DNSQuestionType
from ._dns import DNSQuestion, DNSQuestionType, DNSRecord
from ._engine import AsyncEngine
from ._exceptions import NonUniqueNameException, NotRunningException
from ._handlers.multicast_outgoing_queue import MulticastOutgoingQueue
Expand Down Expand Up @@ -595,6 +595,37 @@ async def async_check_service(
i += 1
next_time += _CHECK_TIME

def async_reconfirm_record(self, record: DNSRecord) -> bool:
"""Hint that ``record`` may be stale (RFC 6762 §10.4).

Schedules a background reconfirmation: re-queries the record
and flushes it from the cache if no response is received within
ten seconds, even when the TTL has not yet expired.

Returns ``True`` if reconfirmation was scheduled; ``False`` if
the record is not in the cache or a reconfirmation is already
in flight for it.

This method is not threadsafe and must be called from the
event loop. Use :meth:`reconfirm_record` from other threads.
"""
return self.record_manager.async_reconfirm_record(record)

def reconfirm_record(self, record: DNSRecord) -> None:
"""Hint that ``record`` may be stale (RFC 6762 §10.4).

Threadsafe variant of :meth:`async_reconfirm_record` — schedules
a background reconfirmation on the event loop and returns
immediately. Caller does not learn whether the record was
already absent or a reconfirmation was already in flight; use
:meth:`async_reconfirm_record` from the loop if that signal
matters.
"""
if self.done:
return
assert self.loop is not None
self.loop.call_soon_threadsafe(self.record_manager.async_reconfirm_record, record)

def add_listener(
self,
listener: RecordUpdateListener,
Expand Down
3 changes: 3 additions & 0 deletions src/zeroconf/_handlers/record_manager.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ cdef class RecordManager:
cdef public object zc
cdef public DNSCache cache
cdef public cython.set listeners
cdef public cython.dict _reconfirm_tasks

cpdef void async_updates(self, object now, list records)

Expand All @@ -38,5 +39,7 @@ cdef class RecordManager:

cpdef void async_remove_listener(self, RecordUpdateListener listener)

cpdef bint async_reconfirm_record(self, DNSRecord record)

@cython.locals(question=DNSQuestion, record=DNSRecord)
cdef void _async_update_matching_records(self, RecordUpdateListener listener, cython.list questions)
100 changes: 97 additions & 3 deletions src/zeroconf/_handlers/record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,26 @@

from __future__ import annotations

import asyncio
from functools import partial
from typing import TYPE_CHECKING, cast

from .._cache import _UniqueRecordsType
from .._dns import DNSQuestion, DNSRecord
from .._logger import log
from .._protocol.incoming import DNSIncoming
from .._protocol.outgoing import DNSOutgoing
from .._record_update import RecordUpdate
from .._updates import RecordUpdateListener
from .._utils.time import current_time_millis
from ..const import _ADDRESS_RECORD_TYPES, _DNS_PTR_MIN_TTL, _TYPE_PTR
from .._utils.time import current_time_millis, millis_to_seconds
from ..const import (
_ADDRESS_RECORD_TYPES,
_DNS_PTR_MIN_TTL,
_FLAGS_QR_QUERY,
_RECONFIRM_QUERY_INTERVALS_MS,
_RECONFIRM_TIMEOUT_MS,
_TYPE_PTR,
)

if TYPE_CHECKING:
from .._core import Zeroconf
Expand All @@ -42,13 +52,17 @@
class RecordManager:
"""Process records into the cache and notify listeners."""

__slots__ = ("cache", "listeners", "zc")
__slots__ = ("_reconfirm_tasks", "cache", "listeners", "zc")

def __init__(self, zeroconf: Zeroconf) -> None:
"""Init the record manager."""
self.zc = zeroconf
self.cache = zeroconf.cache
self.listeners: set[RecordUpdateListener] = set()
# Active per-record reconfirmations. Keyed by the cache entry
# so that repeated calls for the same record while one is in
# flight are no-ops (RFC 6762 §10.4).
self._reconfirm_tasks: dict[DNSRecord, asyncio.Task] = {}

def async_updates(self, now: _float, records: list[RecordUpdate]) -> None:
"""Used to notify listeners of new information that has updated
Expand Down Expand Up @@ -219,3 +233,83 @@ def async_remove_listener(self, listener: RecordUpdateListener) -> None:
self.zc.async_notify_all()
except ValueError as e:
log.exception("Failed to remove listener: %r", e)

def async_reconfirm_record(self, record: DNSRecord) -> bool:
"""Schedule RFC 6762 §10.4 reconfirmation for ``record``."""
cached = self.cache.get(record)
if cached is None:
return False
if cached in self._reconfirm_tasks:
return False
loop = self.zc.loop
if loop is None:
return False
task = loop.create_task(self._async_reconfirm(cached))
self._reconfirm_tasks[cached] = task
task.add_done_callback(partial(self._reconfirm_done, cached))
return True

def _reconfirm_done(self, record: DNSRecord, _task: asyncio.Task) -> None:
"""Drop ``record`` from the active reconfirmation set."""
self._reconfirm_tasks.pop(record, None)

async def _async_reconfirm(self, record: DNSRecord) -> None:
"""Re-query ``record`` and flush from cache if not refreshed.

RFC 6762 §10.4: send two or more queries, then flush the
record if no response arrives within ten seconds.
"""
start = current_time_millis()
original_created = record.created
zc = self.zc
question = DNSQuestion(record.name, record.type, record.class_)

prev_delay_ms = 0
for delay_ms in _RECONFIRM_QUERY_INTERVALS_MS:
wait_ms = delay_ms - prev_delay_ms
if wait_ms > 0:
await asyncio.sleep(millis_to_seconds(wait_ms))
prev_delay_ms = delay_ms
if zc.done:
return
if self._record_refreshed_since(record, original_created):
return
out = DNSOutgoing(_FLAGS_QR_QUERY)
out.add_question(question)
zc.async_send(out)

remaining_ms = _RECONFIRM_TIMEOUT_MS - prev_delay_ms
if remaining_ms > 0:
await asyncio.sleep(millis_to_seconds(remaining_ms))
if zc.done:
return
if self._record_refreshed_since(record, original_created):
return

now = current_time_millis()
elapsed_secs = max(0, int((now - start) / 1000))
log.debug(
"Reconfirmation of %s timed out after %ds; flushing from cache",
record,
elapsed_secs,
)
cached = self.cache.get(record)
if cached is None:
return
# Mark expired so listeners interpret this as a goodbye when
# they re-check ``is_expired(now)`` from inside
# ``async_update_records``. Mirrors the goodbye path in
# ``async_updates_from_response``.
cached._set_created_ttl(now - 1000, 0)
update = RecordUpdate.__new__(RecordUpdate)
update._fast_init(cached, cached)
self.async_updates(now, [update])
self.cache.async_remove_records([cached])
self.async_updates_complete(True)

def _record_refreshed_since(self, record: DNSRecord, original_created: float) -> bool:
"""Return True if the cache holds a newer copy of ``record``."""
cached = self.cache.get(record)
if cached is None:
return True
return cached.created > original_created
15 changes: 14 additions & 1 deletion src/zeroconf/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from types import TracebackType # used in type hints

from ._core import Zeroconf
from ._dns import DNSQuestionType
from ._dns import DNSQuestionType, DNSRecord
from ._exceptions import NotRunningException
from ._services import ServiceListener
from ._services.browser import _ServiceBrowserBase
Expand Down Expand Up @@ -224,6 +224,19 @@ async def async_update_service(self, info: ServiceInfo) -> Awaitable:
"""
return await self.zeroconf.async_update_service(info)

def async_reconfirm_record(self, record: DNSRecord) -> bool:
"""Hint that ``record`` may be stale (RFC 6762 §10.4).

Schedules a background reconfirmation: re-queries the record
and flushes it from the cache if no response is received within
ten seconds.

Returns ``True`` if reconfirmation was scheduled; ``False`` if
the record is not in the cache or a reconfirmation is already
in flight for it.
"""
return self.zeroconf.async_reconfirm_record(record)

async def async_close(self) -> None:
"""Ends the background threads, and prevent this instance from
servicing further queries."""
Expand Down
8 changes: 8 additions & 0 deletions src/zeroconf/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@
# level of rate limit and safe guards so we use 1/4 of the recommended value
_DNS_PTR_MIN_TTL = 1125

# RFC 6762 §10.4 cache-flush-on-failure timing.
# Issue queries at t=0, t=1s, t=3s (intervals doubling per
# §5.2), then flush at t=10s if the record was not refreshed
# in the cache. Schedule is fixed by the RFC; do not tighten
# without an interop note.
_RECONFIRM_QUERY_INTERVALS_MS = (0, 1000, 3000)
_RECONFIRM_TIMEOUT_MS = 10000

# Upper bound on the number of records the DNSCache will hold before it
# starts evicting the closest-to-expiration entry to make room for new
# arrivals. Bounds the memory a malicious LAN peer can force the cache
Expand Down
19 changes: 18 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pytest_asyncio

from zeroconf import Zeroconf, _core, const
from zeroconf._handlers import query_handler
from zeroconf._handlers import query_handler, record_manager
from zeroconf._services import browser as service_browser
from zeroconf._services import info as service_info
from zeroconf.asyncio import AsyncZeroconf
Expand Down Expand Up @@ -188,6 +188,23 @@ def quick_aggregation_timing() -> Generator[None]:
yield


@pytest.fixture
def quick_reconfirm_timing() -> Generator[None]:
"""Shrink RFC 6762 §10.4 reconfirm intervals for loopback tests.

The production values (queries at t=0/1s/3s, flush at t=10s) come
straight from the RFC and exist for real-network interop. Loopback
tests can compress them aggressively without losing what the test
pins. Opt in via fixture arg or
`@pytest.mark.usefixtures("quick_reconfirm_timing")`.
"""
with (
patch.object(record_manager, "_RECONFIRM_QUERY_INTERVALS_MS", (0, 10, 30)),
patch.object(record_manager, "_RECONFIRM_TIMEOUT_MS", 100),
):
yield


@pytest.fixture
def quick_request_timing() -> Generator[None]:
"""Shorten the initial-query delay used by AsyncServiceInfo.async_request.
Expand Down
Loading
Loading