diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 41fadc7b..dd566a29 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -150,16 +150,43 @@ jobs: run: poetry run pytest --no-cov -vvvvv --codspeed tests/benchmarks mode: instrumentation - release: + # Dry run on PRs and non-master pushes. No environment, no publish + # permissions, no OIDC, so PR runs carry no release blast radius. + release-dry-run: needs: - test - lint - if: ${{ github.repository_owner }} == "python-zeroconf" + if: github.ref_name != 'master' && github.repository_owner == 'python-zeroconf' + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v4 + with: + fetch-depth: 0 + ref: ${{ github.ref }} + + - name: Create local branch name + env: + BRANCH: ${{ github.head_ref || github.ref_name }} + run: git switch -C "$BRANCH" + + - name: Test release + uses: python-semantic-release/python-semantic-release@350c48fcb3ffcdfd2e0a235206bc2ecea6b69df0 # v10.5.3 + with: + no_operation_mode: true + # Real release, only on master. The release environment and write/OIDC + # permissions are scoped to this job so they never apply to PR runs. + release: + needs: + - test + - lint + if: github.ref_name == 'master' && github.repository_owner == 'python-zeroconf' runs-on: ubuntu-latest environment: release concurrency: - group: release-${{ github.head_ref || github.ref }} + group: release-${{ github.ref }} cancel-in-progress: false permissions: id-token: write @@ -175,20 +202,13 @@ jobs: ref: ${{ github.ref }} - name: Create local branch name - run: git switch -C ${{ github.head_ref || github.ref_name }} - - # Do a dry run of PSR - - name: Test release - uses: python-semantic-release/python-semantic-release@350c48fcb3ffcdfd2e0a235206bc2ecea6b69df0 # v10.5.3 - if: github.ref_name != 'master' - with: - no_operation_mode: true + env: + BRANCH: ${{ github.ref_name }} + run: git switch -C "$BRANCH" - # On main branch: actual PSR + upload to PyPI & GitHub - name: Release uses: python-semantic-release/python-semantic-release@350c48fcb3ffcdfd2e0a235206bc2ecea6b69df0 # v10.5.3 id: release - if: github.ref_name == 'master' with: github_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/CHANGELOG.md b/CHANGELOG.md index ddbf53d4..4d43ebbf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ +## v0.150.0 (2026-06-22) + +### Features + +- Add async_update_interfaces to rescan network interfaces at runtime + ([#1797](https://github.com/python-zeroconf/python-zeroconf/pull/1797), + [`471feb4`](https://github.com/python-zeroconf/python-zeroconf/commit/471feb490dc315578ca124411c00745028a1a736)) + + ## v0.149.17 (2026-06-21) ### Performance Improvements diff --git a/poetry.lock b/poetry.lock index 9c5d8432..a7b74aad 100644 --- a/poetry.lock +++ b/poetry.lock @@ -597,14 +597,14 @@ windows-terminal = ["colorama (>=0.4.6)"] [[package]] name = "pytest" -version = "9.1.0" +version = "9.1.1" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.10" groups = ["dev"] files = [ - {file = "pytest-9.1.0-py3-none-any.whl", hash = "sha256:8ebb0e7888bdf2bdfc602ec51f8f62d50200af37356c74e503c79a94f5c81f32"}, - {file = "pytest-9.1.0.tar.gz", hash = "sha256:41dd9148c08072446394cefd3d79701701335a9f4cae69ba92e39f6c7f5c061c"}, + {file = "pytest-9.1.1-py3-none-any.whl", hash = "sha256:37a86b45efb9a47a61a36449063e8e18d0cab3161329fc099eb21783169c4f0c"}, + {file = "pytest-9.1.1.tar.gz", hash = "sha256:1088fbde8f2b49d95a549a195707afa7a76a3ce9bcadc26b6d71f0ffda5fe313"}, ] [package.dependencies] @@ -1045,4 +1045,4 @@ zstd = ["backports-zstd (>=1.0.0) ; python_version < \"3.14\""] [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "b6eec4df18bd228809dd7f1005fe97b26f3b7ddbdef1abb1a31f7be8b002c1ab" +content-hash = "5678177abdca2822cd6367bc523d89b5ffe794809d1dbd3556e63dc0b08f7119" diff --git a/pyproject.toml b/pyproject.toml index 9c05ede8..3dfa987e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [project] name = "zeroconf" -version = "0.149.17" +version = "0.150.0" license = "LGPL-2.1-or-later" description = "A pure python implementation of multicast DNS service discovery" readme = "README.rst" @@ -84,7 +84,7 @@ python = "^3.10" ifaddr = ">=0.1.7" [tool.poetry.group.dev.dependencies] -pytest = ">=9.1.0,<10.0" +pytest = ">=9.1.1,<10.0" pytest-cov = ">=4,<8" pytest-asyncio = ">=1.4.0,<1.5.0" cython = "^3.2.5" diff --git a/src/zeroconf/__init__.py b/src/zeroconf/__init__.py index f993ae95..c82c2f79 100644 --- a/src/zeroconf/__init__.py +++ b/src/zeroconf/__init__.py @@ -88,7 +88,7 @@ __author__ = "Paul Scott-Murphy, William McBrine" __maintainer__ = "Jakub Stasiak " -__version__ = "0.149.17" +__version__ = "0.150.0" __license__ = "LGPL" diff --git a/src/zeroconf/_core.py b/src/zeroconf/_core.py index f184b6fa..209e1dec 100644 --- a/src/zeroconf/_core.py +++ b/src/zeroconf/_core.py @@ -199,6 +199,12 @@ def __init__( self.unicast = unicast self._use_asyncio = use_asyncio + # Retained so async_update_interfaces can re-run create_sockets / + # normalize_interface_choice against the live interface set later. + # Copy a mutable list so later caller mutation can't change it. + self._interfaces = list(interfaces) if isinstance(interfaces, list) else interfaces + self._ip_version = ip_version + self._apple_p2p = apple_p2p listen_socket, respond_sockets = create_sockets(interfaces, unicast, ip_version, apple_p2p=apple_p2p) log.debug("Listen socket %s, respond sockets %s", listen_socket, respond_sockets) @@ -216,6 +222,9 @@ def __init__( self.record_manager = RecordManager(self) self._notify_futures: set[asyncio.Future] = set() + # Serializes async_update_interfaces so overlapping calls (a bursty + # adapter-change source) don't diff against a stale sender snapshot. + self._interface_update_lock = asyncio.Lock() self.loop: asyncio.AbstractEventLoop | None = None self._loop_thread: threading.Thread | None = None @@ -406,6 +415,91 @@ async def async_update_service(self, info: ServiceInfo) -> Awaitable: self.registry.async_update(info) return asyncio.ensure_future(self._async_broadcast_service(info, _REGISTER_TIME, None)) + def update_interfaces( + self, + interfaces: InterfacesType | None = None, + ip_version: IPVersion | None = None, + apple_p2p: bool | None = None, + ) -> None: + """Rescan network interfaces and reconcile the sockets in use. + + While it is not expected during normal operation, + this function may raise EventLoopBlocked if the underlying + call to `async_update_interfaces` cannot be completed. Raises + RuntimeError if apple_p2p is set on a non-Apple platform. + """ + assert self.loop is not None + # Unlike register/update, the re-announce is awaited inline (to log + # per-service failures), so the budget must cover the full announce + # window ((_REGISTER_BROADCASTS - 1) * _REGISTER_TIME) plus the reconcile + # and wait-for-start overhead; double the register budget for headroom. + run_coro_with_timeout( + self.async_update_interfaces(interfaces, ip_version, apple_p2p), + self.loop, + _REGISTER_TIME * _REGISTER_BROADCASTS * 2, + ) + + async def async_update_interfaces( + self, + interfaces: InterfacesType | None = None, + ip_version: IPVersion | None = None, + apple_p2p: bool | None = None, + ) -> None: + """Rescan network interfaces and reconcile the sockets in use. + + Adds sockets for interfaces that appeared, drops sockets for + interfaces that disappeared, and re-announces existing + registrations when a new sender appeared. ``interfaces``, + ``ip_version`` and ``apple_p2p`` each default to the value passed at + construction; pass a new value to switch it. When the resulting + interface set is unchanged this is a no-op (no sockets touched, + nothing re-announced). The listen socket is rebuilt if the new set + needs a different address family; unicast mode is fixed at + construction. Concurrent calls are serialized. Bringing up interfaces + is best-effort: a requested interface that fails to bind, or fails to + re-join after a rebuild, is logged rather than raised, and likewise a + registration that fails to re-announce is logged so one failure cannot + block the others. Raises RuntimeError if apple_p2p is set on a non-Apple + platform (input validation, matching the constructor). + """ + # Resolve against the retained config but only commit it after the + # engine reconcile succeeds, so a failed reconcile leaves the stored + # config unchanged rather than recording a set that never fully bound + # (a mid-reconcile failure may still have changed some sockets). + interfaces = self._interfaces if interfaces is None else interfaces + ip_version = self._ip_version if ip_version is None else ip_version + apple_p2p = self._apple_p2p if apple_p2p is None else apple_p2p + if apple_p2p and sys.platform != "darwin": + raise RuntimeError("Option `apple_p2p` is not supported on non-Apple platforms.") + await self.async_wait_for_start() + # Only the reconcile mutates the sender set, so hold the lock for that + # alone; the multi-second re-announce runs unlocked so a bursty + # adapter-change source isn't blocked behind it. + async with self._interface_update_lock: + added = await self.engine.async_update_interfaces(interfaces, ip_version, apple_p2p) + # Copy a mutable list so later caller mutation can't change the + # retained configuration. + self._interfaces = list(interfaces) if isinstance(interfaces, list) else interfaces + self._ip_version = ip_version + self._apple_p2p = apple_p2p + if not added: + return + # Re-announce every registration; one broadcast failing must not mask + # the rest, so collect exceptions and log them individually, naming the + # service so a partial failure is actionable. + infos = self.registry.async_get_service_infos() + results = await asyncio.gather( + *[self._async_broadcast_service(info, _REGISTER_TIME, None) for info in infos], + return_exceptions=True, + ) + for info, result in zip(infos, results, strict=True): + if isinstance(result, Exception): + log.warning("Error re-announcing %s after interface update: %s", info.name, result) + elif isinstance(result, BaseException): + # gather(return_exceptions=True) also captures BaseExceptions + # such as CancelledError; don't swallow a cancellation/interrupt. + raise result + async def async_get_service_info( self, type_: str, diff --git a/src/zeroconf/_engine.py b/src/zeroconf/_engine.py index 0e1c01a1..c68c1521 100644 --- a/src/zeroconf/_engine.py +++ b/src/zeroconf/_engine.py @@ -28,8 +28,18 @@ import threading from typing import TYPE_CHECKING, cast +from ._logger import log from ._record_update import RecordUpdate from ._utils.asyncio import get_running_loop, run_coro_with_timeout +from ._utils.net import ( + InterfacesType, + IPVersion, + add_interface, + add_multicast_member, + drop_multicast_member, + new_listen_socket, + normalize_interface_choice, +) from ._utils.time import current_time_millis from .const import _CACHE_CLEANUP_INTERVAL @@ -38,17 +48,62 @@ from ._listener import AsyncListener -from ._transport import _WrappedTransport, make_wrapped_transport +from ._transport import _strip_zone, _WrappedTransport, make_wrapped_transport _CLOSE_TIMEOUT = 3000 # ms +def _interface_key(interface: str | tuple[tuple[str, int, int], int]) -> tuple[str, int]: + """Return the (address, scope_id) an interface choice maps to, for diffing. + + Must produce the same key shape as ``_WrappedTransport.interface_key`` so + the desired set (from ``normalize_interface_choice``) and the current set + (from the bound senders) diff against each other. + """ + if isinstance(interface, tuple): + return (_strip_zone(interface[0][0]), interface[0][2]) + return (interface, 0) + + +def _listen_socket_supports( + listen_socket: socket.socket, interface: str | tuple[tuple[str, int, int], int] +) -> bool: + """Whether the fixed-family listen socket can join this interface's group.""" + if isinstance(interface, tuple): + # An IPv6 interface can only be joined on an AF_INET6 socket. + return listen_socket.family == socket.AF_INET6 + if listen_socket.family != socket.AF_INET6: + # An IPv4 interface on an AF_INET socket. + return True + # An IPv4 interface on an AF_INET6 socket: only when it is dual-stack. + supported = True + try: + supported = not listen_socket.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY) + except OSError as exc: + # Reading IPV6_V6ONLY essentially never fails on a valid AF_INET6 + # socket. Assume dual-stack rather than abort the rescan; returning + # False instead could loop rebuilds if the rebuilt socket's read also + # fails. Log at warning, not debug, because if the socket really were + # v6-only this skips a needed rebuild and leaves an added IPv4 family + # unreceivable, which is worth surfacing. + log.warning("Unable to read IPV6_V6ONLY, assuming dual-stack: %s", exc) + return supported + + +def _without_transport( + wrappers: list[_WrappedTransport], transport: asyncio.DatagramTransport +) -> list[_WrappedTransport]: + """Return the wrappers whose underlying transport is not ``transport``.""" + return [wrapped for wrapped in wrappers if wrapped.transport is not transport] + + class AsyncEngine: """An engine wraps sockets in the event loop.""" __slots__ = ( "_cleanup_timer", "_listen_socket", + "_listen_transport", "_respond_sockets", "_setup_task", "loop", @@ -72,6 +127,7 @@ def __init__( self.senders: list[_WrappedTransport] = [] self.running_future: asyncio.Future[bool | None] | None = None self._listen_socket = listen_socket + self._listen_transport: _WrappedTransport | None = None self._respond_sockets = respond_sockets self._cleanup_timer: asyncio.TimerHandle | None = None self._setup_task: asyncio.Task[None] | None = None @@ -98,8 +154,6 @@ async def _async_setup(self, loop_thread_ready: threading.Event | None) -> None: async def _async_create_endpoints(self) -> None: """Create endpoints to send and receive.""" - assert self.loop is not None - loop = self.loop reader_sockets = [] sender_sockets = [] if self._listen_socket: @@ -110,22 +164,244 @@ async def _async_create_endpoints(self) -> None: sender_sockets.append(s) for s in reader_sockets: - transport, protocol = await loop.create_datagram_endpoint( # type: ignore[type-var] - lambda: AsyncListener(self.zc), # type: ignore[arg-type, return-value] - sock=s, - ) - # Register the wrapped transport before releasing the engine's - # handle so a concurrent shutdown always sees ``s`` in exactly - # one place; do not add an ``await`` between these two steps. - self.protocols.append(cast(AsyncListener, protocol)) - self.readers.append(make_wrapped_transport(cast(asyncio.DatagramTransport, transport))) - if s in sender_sockets: - self.senders.append(make_wrapped_transport(cast(asyncio.DatagramTransport, transport))) + reader = await self._async_wrap_socket(s, s in sender_sockets) + # _async_wrap_socket registers the transport with no await between + # creating and registering it, and the pending-handle cleanup below + # adds no await either, so a concurrent shutdown always sees ``s`` + # in exactly one place. if s is self._listen_socket: + # Keep a handle to the shared listen socket so interface + # rescans can add/drop multicast memberships on it. + self._listen_transport = reader self._listen_socket = None if s in self._respond_sockets: self._respond_sockets.remove(s) + async def _async_wrap_socket(self, sock: socket.socket, is_sender: bool) -> _WrappedTransport: + """Adopt a socket into a transport, register it, and return the reader wrapper.""" + assert self.loop is not None + transport, protocol = await self.loop.create_datagram_endpoint( # type: ignore[type-var] + lambda: AsyncListener(self.zc), # type: ignore[arg-type, return-value] + sock=sock, + ) + datagram_transport = cast(asyncio.DatagramTransport, transport) + reader = make_wrapped_transport(datagram_transport) + # No ``await`` between wrapping and registering so a concurrent + # shutdown always sees the transport in exactly one place. + self.protocols.append(cast(AsyncListener, protocol)) + self.readers.append(reader) + if is_sender: + self.senders.append(make_wrapped_transport(datagram_transport)) + return reader + + async def async_update_interfaces( + self, + interfaces: InterfacesType, + ip_version: IPVersion, + apple_p2p: bool, + ) -> bool: + """Reconcile sender/reader sockets to the live interface set. + + Adds a per-interface responder socket for each interface that + appeared and tears down the socket for each interface that + disappeared, diffing on the bound address. A Default single-family + instance's dual-use listen/responder socket is converted to a pure + listener when moving to an explicit set; otherwise the shared listen + socket is left intact. Returns whether any responder socket was + added, so the caller can skip re-announcing when nothing appeared. + """ + assert self.loop is not None + try: + normalized = normalize_interface_choice(interfaces, ip_version) + except RuntimeError as exc: + # An All/Default instance can transiently resolve to zero addresses + # during adapter churn, where normalize_interface_choice raises + # instead of returning an empty set. Treat that as a logged no-op so + # a momentary down state doesn't crash a caller's adapter-change + # handler (best-effort contract); the next rescan reconciles. + log.warning("Skipping interface update; no interfaces available: %s", exc) + return False + desired = {_interface_key(interface): interface for interface in normalized} + current = {wrapped.interface_key: wrapped for wrapped in self.senders} + listen_transport = self._listen_transport + listen_socket = listen_transport.sock if listen_transport is not None else None + + # The listen socket's family is fixed at construction, so a desired + # interface of another family (e.g. an IPv6 interface added to an IPv4 + # instance) needs a fresh listen socket before senders are reconciled, + # otherwise the current senders would be torn down with no replacements + # bound. + needs_rebuild = listen_socket is not None and any( + not _listen_socket_supports(listen_socket, interface) for interface in desired.values() + ) + + # A Default single-family instance shares the listen socket as its only + # sender (the dual-use socket). Moving it to an explicit interface set + # abandons that optimization: rebuild it as a pure listener (its existing + # group memberships would otherwise collide with the new per-interface + # joins), which also stops it responding so it can't double-announce on + # the overlapping interface. Drop it from the diff's view so the desired + # interfaces are added fresh; the actual sender removal is done by the + # rebuild once it succeeds (so a failed rebuild leaves senders intact). + # The no-arg refresh of a Default instance leaves desired == {its + # interface} and so neither converts nor rebuilds. + if listen_transport is not None and any( + wrapped.transport is listen_transport.transport for wrapped in self.senders + ): + listen_key = listen_transport.interface_key + if any(key != listen_key for key in desired): + current.pop(listen_key, None) + needs_rebuild = True + + if needs_rebuild: + try: + await self._async_rebuild_listen_socket(apple_p2p, desired, current) + except (OSError, RuntimeError) as exc: + # A fresh wildcard bind / endpoint creation can transiently fail + # during adapter churn. The rebuild raises before tearing down + # the old listen socket, so state is unchanged; log and no-op + # rather than crash the caller's handler (best-effort contract). + log.warning("Skipping interface update; listen socket rebuild failed: %s", exc) + return False + listen_transport = self._listen_transport + listen_socket = listen_transport.sock if listen_transport is not None else None + + for bind_address, wrapped in current.items(): + if bind_address in desired: + continue + if listen_transport is not None and wrapped.transport is listen_transport.transport: + # The shared listen / dual-use socket is not a per-interface + # sender; leaving the group or closing it would break receive. + continue + # After a rebuild, listen_socket is the new socket, which this gone + # interface never joined (its membership died with the old socket); + # the leave is then a benign no-op that drop_multicast_member swallows. + self._async_close_sender(wrapped, listen_socket) + + added = False + for bind_address, interface in desired.items(): + if bind_address in current: + continue + if await self._async_add_interface(interface, listen_socket, apple_p2p): + added = True + return added + + async def _async_add_interface( + self, + interface: str | tuple[tuple[str, int, int], int], + listen_socket: socket.socket | None, + apple_p2p: bool, + ) -> bool: + """Join the multicast group and adopt a responder socket for one interface. + + Returns whether a responder socket was actually added. + """ + # Join the group and create the responder via the same primitive + # construction uses, so setup and rescan stay in lockstep. These are + # user-initiated reconciles, so a requested interface that fails to + # come up is surfaced once at warning (deduped per interface so the + # polling monitor doesn't spam) rather than only at debug. + respond_socket = add_interface(listen_socket, interface, apple_p2p=apple_p2p, unicast=self.zc.unicast) + if respond_socket is None: + self.zc.log_warning_once(f"Interface {interface!r} not added") + return False + try: + await self._async_wrap_socket(respond_socket, is_sender=True) + except Exception as exc: + # Roll back the socket + group join on any failure so nothing is left + # dangling. + respond_socket.close() + if listen_socket is not None: + drop_multicast_member(listen_socket, interface) + if not isinstance(exc, OSError): + # Only an expected socket-level failure is best-effort; a real + # bug (e.g. TypeError) must propagate rather than be downgraded + # to a one-time "interface not added" warning. + raise + # Log and skip rather than abort the whole reconcile so the other + # interfaces still come up (best-effort bring-up). + self.zc.log_warning_once(f"Interface {interface!r} not added: {exc}") + return False + return True + + def _async_remove_transport(self, transport: asyncio.DatagramTransport) -> None: + """Drop a transport's protocol/reader/sender wrappers, cancelling its timers.""" + kept_protocols = [] + for protocol in self.protocols: + if protocol.transport is not None and protocol.transport.transport is transport: + # Cancel any pending TC-reassembly timers so one can't fire a + # response against the transport we're about to close. + protocol.cancel_pending_timers() + else: + kept_protocols.append(protocol) + self.protocols = kept_protocols + self.readers = _without_transport(self.readers, transport) + self.senders = _without_transport(self.senders, transport) + + def _async_close_sender(self, wrapped: _WrappedTransport, listen_socket: socket.socket | None) -> None: + """Drop a per-interface sender's wrappers/protocol and close its transport.""" + transport = wrapped.transport + self._async_remove_transport(transport) + try: + if listen_socket is not None: + drop_multicast_member(listen_socket, wrapped.multicast_interface) + finally: + # Release the socket even if a non-benign leave (e.g. EPERM) raises. + transport.close() + + async def _async_rebuild_listen_socket( + self, + apple_p2p: bool, + desired: dict[tuple[str, int], str | tuple[tuple[str, int, int], int]], + current: dict[tuple[str, int], _WrappedTransport], + ) -> None: + """Replace the listen socket with one whose family covers the desired set. + + The listen socket's family is otherwise fixed at construction; this + lets an instance start receiving a newly added address family, and is + also used to convert a Default dual-use socket to a pure listener. The + replacement family is derived from the desired set (not the + requested ip_version, which an explicit list can contradict) so it + always covers every desired interface and never needs an immediate + re-rebuild. Interfaces that are staying are re-joined on the new socket, + and the old socket is closed (releasing its memberships). + """ + has_v6 = any(isinstance(interface, tuple) for interface in desired.values()) + has_v4 = any(not isinstance(interface, tuple) for interface in desired.values()) + if has_v4 and has_v6: + family_version = IPVersion.All + elif has_v6: + family_version = IPVersion.V6Only + else: + family_version = IPVersion.V4Only + new_listen = new_listen_socket(family_version, apple_p2p) + if new_listen is None: + raise RuntimeError("Failed to create a listen socket for the new interface family") + try: + for bind_address, interface in desired.items(): + # A staying interface that can't re-join on the new socket keeps + # its sender but receives only via the shared socket it never + # joined; surface that degraded state like _async_add_interface. + if bind_address in current and not add_multicast_member(new_listen, interface): + self.zc.log_warning_once( + f"Interface {interface!r} could not re-join the multicast group " + "on the rebuilt listen socket" + ) + new_reader = await self._async_wrap_socket(new_listen, is_sender=False) + except Exception: + # Endpoint creation failed; close the unadopted socket (and its + # joins) rather than leak it, mirroring _async_add_interface. + new_listen.close() + raise + # A rebuild is only entered with a live listen socket, so the old + # transport is always present. + old_listen_transport = self._listen_transport + assert old_listen_transport is not None + self._listen_transport = new_reader + old_transport = old_listen_transport.transport + self._async_remove_transport(old_transport) + old_transport.close() + def _async_cache_cleanup(self) -> None: """Periodic cache cleanup.""" now = current_time_millis() diff --git a/src/zeroconf/_listener.pxd b/src/zeroconf/_listener.pxd index 260ba091..b5001a25 100644 --- a/src/zeroconf/_listener.pxd +++ b/src/zeroconf/_listener.pxd @@ -43,6 +43,8 @@ cdef class AsyncListener: cdef _cancel_any_timers_for_addr(self, object addr) + cdef _drop_deferred(self, object addr) + cdef _evict_oldest_deferred(self) @cython.locals(deadline=object, fire_at=double) diff --git a/src/zeroconf/_listener.py b/src/zeroconf/_listener.py index 7be2a828..f50aba6f 100644 --- a/src/zeroconf/_listener.py +++ b/src/zeroconf/_listener.py @@ -309,6 +309,23 @@ def _cancel_any_timers_for_addr(self, addr: _str) -> None: if addr in self._timers: self._timers.pop(addr).cancel() + def _drop_deferred(self, addr: _str) -> None: + """Cancel an address's timer and discard its reassembly state.""" + self._cancel_any_timers_for_addr(addr) + self._deferred_deadlines.pop(addr, None) + self._deferred.pop(addr, None) + + def cancel_pending_timers(self) -> None: + """Cancel all pending TC-reassembly timers and drop deferred state. + + Called when this listener's transport is removed so a timer cannot + fire a response against an already-closed transport. Every timer's + addr also has a deferred entry, so dropping each deferred addr + cancels its timer too. + """ + for addr in list(self._deferred): + self._drop_deferred(addr) + def _evict_oldest_deferred(self) -> None: """Discard the oldest deferred addr's reassembly state. @@ -319,10 +336,7 @@ def _evict_oldest_deferred(self) -> None: order) rather than LRU so an active flooder cannot pin its slots by re-sending into the same addr. """ - oldest_addr = next(iter(self._deferred)) - self._cancel_any_timers_for_addr(oldest_addr) - self._deferred_deadlines.pop(oldest_addr, None) - del self._deferred[oldest_addr] + self._drop_deferred(next(iter(self._deferred))) def _respond_query( self, diff --git a/src/zeroconf/_transport.py b/src/zeroconf/_transport.py index c8d7699b..b67ca73e 100644 --- a/src/zeroconf/_transport.py +++ b/src/zeroconf/_transport.py @@ -23,7 +23,18 @@ from __future__ import annotations import asyncio +import logging import socket +import sys +from typing import cast + +from ._logger import log + + +def _strip_zone(address: str) -> str: + """Drop a ``%zone`` suffix from an IPv6 address string.""" + percent = address.find("%") + return address[:percent] if percent != -1 else address class _WrappedTransport: @@ -32,6 +43,7 @@ class _WrappedTransport: __slots__ = ( "fileno", "is_ipv6", + "multicast_index", "sock", "sock_name", "transport", @@ -44,25 +56,74 @@ def __init__( sock: socket.socket, fileno: int, sock_name: tuple, + multicast_index: int = 0, ) -> None: """Initialize the wrapped transport. - These attributes are used when sending packets. + ``multicast_index`` is the IPV6_MULTICAST_IF interface index the + sender joined the group with, carried so a group leave uses the same + index the join did (the bound socket's scope_id is 0 for global IPv6). """ self.transport = transport self.is_ipv6 = is_ipv6 self.sock = sock self.fileno = fileno self.sock_name = sock_name + self.multicast_index = multicast_index + + @property + def interface_key(self) -> tuple[str, int]: + """The bound (address, scope_id) identifying this sender's interface. + + Used to diff senders against the desired interface set. The scope_id + keeps link-local IPv6 addresses that repeat across interfaces (same + address, different zone) from colliding to one key. + """ + sock_name = self.sock_name + if self.is_ipv6: + scope_id = cast(int, sock_name[3]) if len(sock_name) > 3 else 0 + return (_strip_zone(sock_name[0]), scope_id) + return (cast(str, sock_name[0]), 0) + + @property + def multicast_interface(self) -> str | tuple[tuple[str, int, int], int]: + """The interface value a group leave takes for this transport. + + For IPv6 this carries ``multicast_index`` (the index the join used), + not the bound scope_id, so leave and join stay symmetric. + """ + address, _scope_id = self.interface_key + if self.is_ipv6: + return ((address, 0, 0), self.multicast_index) + return address def make_wrapped_transport(transport: asyncio.DatagramTransport) -> _WrappedTransport: """Make a wrapped transport.""" sock: socket.socket = transport.get_extra_info("socket") + is_ipv6 = sock.family == socket.AF_INET6 + multicast_index = 0 + if is_ipv6: + # IPV6_MULTICAST_IF holds the interface index new_respond_socket + # joined the group with; capture it so a later group leave uses the + # same index. This is on the startup/connection path, and the index + # only selects the interface for a future (benign) group leave, so a + # read failure (Windows rejects it with WSAEINVAL; other platforms + # shouldn't) keeps the default index 0 rather than aborting setup. + try: + multicast_index = sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF) + except OSError as exc: + # Windows rejects this read (WSAEINVAL) for every v6 socket, so it is + # expected and benign there; keep it at debug. Elsewhere a failure is + # unexpected and means a later group leave uses the wrong index (a + # benign no-op that leaks the membership), so surface it at warning. + level = logging.DEBUG if sys.platform == "win32" else logging.WARNING + log.log(level, "Unable to read IPV6_MULTICAST_IF, using default index 0: %s", exc) return _WrappedTransport( transport=transport, - is_ipv6=sock.family == socket.AF_INET6, + is_ipv6=is_ipv6, sock=sock, fileno=sock.fileno(), sock_name=sock.getsockname(), + multicast_index=multicast_index, ) diff --git a/src/zeroconf/_utils/net.py b/src/zeroconf/_utils/net.py index 01c5b040..abd8434f 100644 --- a/src/zeroconf/_utils/net.py +++ b/src/zeroconf/_utils/net.py @@ -401,6 +401,43 @@ def add_multicast_member( return True +def drop_multicast_member( + listen_socket: socket.socket, + interface: str | tuple[tuple[str, int, int], int], +) -> bool: + """Leave the mDNS multicast group on an interface; inverse of add_multicast_member.""" + # This is based on assumptions in normalize_interface_choice + is_v6 = isinstance(interface, tuple) + log.debug("Dropping %r (socket %d) from multicast group", interface, listen_socket.fileno()) + try: + if is_v6: + try: + mdns_addr6_bytes = socket.inet_pton(socket.AF_INET6, _MDNS_ADDR6) + except OSError: + log.info( + "Unable to translate IPv6 address when dropping %s from multicast group, " + "this can happen if IPv6 is disabled on the system", + interface, + ) + return False + iface_bin = struct.pack("@I", cast(int, interface[1])) + listen_socket.setsockopt(_IPPROTO_IPV6, socket.IPV6_LEAVE_GROUP, mdns_addr6_bytes + iface_bin) + else: + _value = socket.inet_aton(_MDNS_ADDR) + socket.inet_aton(cast(str, interface)) + listen_socket.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP, _value) + except OSError as e: + # The kernel drops memberships automatically when an interface + # disappears, so a stale leave is expected to fail benignly. + benign = {errno.EADDRNOTAVAIL, errno.EINVAL, errno.ENODEV, errno.ENOPROTOOPT} + if sys.platform == "win32": + # No WSAEINVAL definition in typeshed + benign.add(cast(Any, errno).WSAEINVAL) # pylint: disable=no-member + if get_errno(e) in benign: + return False + raise + return True + + def new_respond_socket( interface: str | tuple[tuple[str, int, int], int], apple_p2p: bool = False, @@ -439,16 +476,41 @@ def new_respond_socket( return respond_socket +def new_listen_socket( + ip_version: IPVersion = IPVersion.V4Only, apple_p2p: bool = False +) -> socket.socket | None: + """Create the shared wildcard socket used to receive multicast queries.""" + return new_socket(bind_addr=("",), ip_version=ip_version, apple_p2p=apple_p2p) + + +def add_interface( + listen_socket: socket.socket | None, + interface: str | tuple[tuple[str, int, int], int], + apple_p2p: bool = False, + unicast: bool = False, +) -> socket.socket | None: + """Join an interface's multicast group and create its responder socket. + + Returns the responder socket, or None if the interface can't be brought + up. A group membership joined here is rolled back if the responder socket + cannot be created, so a failure leaves no dangling membership. Shared by + construction (``create_sockets``) and the runtime rescan. + """ + if listen_socket is not None and not add_multicast_member(listen_socket, interface): + return None + respond_socket = new_respond_socket(interface, apple_p2p=apple_p2p, unicast=unicast) + if respond_socket is None and listen_socket is not None: + drop_multicast_member(listen_socket, interface) + return respond_socket + + def create_sockets( interfaces: InterfacesType = InterfaceChoice.All, unicast: bool = False, ip_version: IPVersion = IPVersion.V4Only, apple_p2p: bool = False, ) -> tuple[socket.socket | None, list[socket.socket]]: - if unicast: - listen_socket = None - else: - listen_socket = new_socket(bind_addr=("",), ip_version=ip_version, apple_p2p=apple_p2p) + listen_socket = None if unicast else new_listen_socket(ip_version, apple_p2p) normalized_interfaces = normalize_interface_choice(interfaces, ip_version) @@ -462,14 +524,8 @@ def create_sockets( return listen_socket, [cast(socket.socket, listen_socket)] respond_sockets = [] - for interface in normalized_interfaces: - # Only create response socket if unicast or becoming multicast member was successful - if not unicast and not add_multicast_member(cast(socket.socket, listen_socket), interface): - continue - - respond_socket = new_respond_socket(interface, apple_p2p=apple_p2p, unicast=unicast) - + respond_socket = add_interface(listen_socket, interface, apple_p2p=apple_p2p, unicast=unicast) if respond_socket is not None: respond_sockets.append(respond_socket) diff --git a/src/zeroconf/asyncio.py b/src/zeroconf/asyncio.py index 45aac67a..2061243d 100644 --- a/src/zeroconf/asyncio.py +++ b/src/zeroconf/asyncio.py @@ -224,6 +224,22 @@ async def async_update_service(self, info: ServiceInfo) -> Awaitable: """ return await self.zeroconf.async_update_service(info) + async def async_update_interfaces( + self, + interfaces: InterfacesType | None = None, + ip_version: IPVersion | None = None, + apple_p2p: bool | None = None, + ) -> None: + """Rescan network interfaces and reconcile the sockets in use. + + Adds sockets for interfaces that appeared, drops sockets for + interfaces that disappeared, and re-announces existing + registrations on the resulting senders. ``interfaces``, + ``ip_version`` and ``apple_p2p`` each default to the construction-time + value. Raises RuntimeError if apple_p2p is set on a non-Apple platform. + """ + await self.zeroconf.async_update_interfaces(interfaces, ip_version, apple_p2p) + async def async_close(self) -> None: """Ends the background threads, and prevent this instance from servicing further queries.""" diff --git a/tests/test_interface_update.py b/tests/test_interface_update.py new file mode 100644 index 00000000..380f98b9 --- /dev/null +++ b/tests/test_interface_update.py @@ -0,0 +1,974 @@ +"""Unit tests for runtime interface rescanning (async_update_interfaces).""" + +from __future__ import annotations + +import asyncio +import logging +import socket +from typing import cast +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from zeroconf import InterfaceChoice, IPVersion, ServiceInfo, Zeroconf, _engine, _listener +from zeroconf._engine import _interface_key, _listen_socket_supports +from zeroconf._transport import _strip_zone, _WrappedTransport, make_wrapped_transport +from zeroconf.asyncio import AsyncZeroconf + + +def _make_wrapped( + sock_name: tuple, + is_ipv6: bool = False, + transport: object | None = None, + multicast_index: int = 0, +) -> _WrappedTransport: + """Build a _WrappedTransport with mocked socket/transport for diff tests.""" + return _WrappedTransport( + transport=cast("asyncio.DatagramTransport", transport or Mock()), + is_ipv6=is_ipv6, + sock=cast("socket.socket", Mock()), + fileno=0, + sock_name=sock_name, + multicast_index=multicast_index, + ) + + +def test_strip_zone() -> None: + assert _strip_zone("fe80::1%eth0") == "fe80::1" + assert _strip_zone("192.168.1.5") == "192.168.1.5" + + +def test_interface_key() -> None: + assert _interface_key("192.168.1.5") == ("192.168.1.5", 0) + assert _interface_key((("fe80::1%eth0", 0, 7), 2)) == ("fe80::1", 7) + # The same link-local address on two interfaces must not collapse to one key. + assert _interface_key((("fe80::1", 0, 2), 2)) != _interface_key((("fe80::1", 0, 3), 3)) + + +def test_wrapped_interface_key() -> None: + assert _make_wrapped(("192.168.1.5", 5353)).interface_key == ("192.168.1.5", 0) + assert _make_wrapped(("fe80::1%eth0", 5353, 0, 7), True).interface_key == ("fe80::1", 7) + # A short sock_name (no scope_id) falls back to interface index 0. + assert _make_wrapped(("fe80::1", 5353), True).interface_key == ("fe80::1", 0) + + +def test_wrapped_multicast_interface() -> None: + assert _make_wrapped(("192.168.1.5", 5353)).multicast_interface == "192.168.1.5" + # IPv6 leave carries the join index (IPV6_MULTICAST_IF), not the bound + # scope_id (here sock_name scope_id 5 differs from multicast_index 9). + wrapped = _make_wrapped(("fe80::1", 5353, 0, 5), is_ipv6=True, multicast_index=9) + assert wrapped.multicast_interface == (("fe80::1", 0, 0), 9) + + +def test_make_wrapped_transport_reads_v6_multicast_index() -> None: + """make_wrapped_transport reads IPV6_MULTICAST_IF as the v6 join index.""" + sock = Mock() + sock.family = socket.AF_INET6 + sock.fileno.return_value = 0 + sock.getsockname.return_value = ("fe80::1", 5353, 0, 0) + sock.getsockopt.return_value = 5 + transport = Mock() + transport.get_extra_info.return_value = sock + wrapped = make_wrapped_transport(transport) + assert wrapped.is_ipv6 is True + assert wrapped.multicast_index == 5 + sock.getsockopt.assert_called_once_with(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF) + + +def test_make_wrapped_transport_unreadable_multicast_index() -> None: + """A socket that rejects reading IPV6_MULTICAST_IF falls back to index 0. + + This runs on the startup/connection path, so a read failure must not abort + setup; it degrades to the default index (only a future group leave cares). + """ + sock = Mock() + sock.family = socket.AF_INET6 + sock.fileno.return_value = 0 + sock.getsockname.return_value = ("fe80::1", 5353, 0, 0) + sock.getsockopt.side_effect = OSError + transport = Mock() + transport.get_extra_info.return_value = sock + assert make_wrapped_transport(transport).multicast_index == 0 + + +def test_listen_socket_supports_family() -> None: + """A desired interface is only supported by a listen socket of a compatible family.""" + v4_sock = Mock() + v4_sock.family = socket.AF_INET + v6_sock = Mock() + v6_sock.family = socket.AF_INET6 + v6_interface = (("fe80::1", 0, 0), 1) + + assert _listen_socket_supports(v4_sock, "1.2.3.4") is True + assert _listen_socket_supports(v4_sock, v6_interface) is False + assert _listen_socket_supports(v6_sock, v6_interface) is True + # IPv4 on an AF_INET6 socket depends on whether it is dual-stack. + v6_sock.getsockopt.return_value = 0 # IPV6_V6ONLY off -> dual-stack + assert _listen_socket_supports(v6_sock, "1.2.3.4") is True + v6_sock.getsockopt.return_value = 1 # IPV6_V6ONLY on -> v6-only + assert _listen_socket_supports(v6_sock, "1.2.3.4") is False + # An unreadable option degrades to "assume dual-stack" rather than aborting + # the rescan; at worst it skips a rebuild the next reconcile re-evaluates. + v6_sock.getsockopt.side_effect = OSError + assert _listen_socket_supports(v6_sock, "1.2.3.4") is True + + +@pytest.mark.asyncio +async def test_update_interfaces_noop(aiozc_loopback: AsyncZeroconf) -> None: + """Re-scanning the same interface set leaves the engine lists unchanged.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + engine = zc.engine + before = (len(engine.senders), len(engine.readers), len(engine.protocols)) + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + assert (len(engine.senders), len(engine.readers), len(engine.protocols)) == before + + +@pytest.mark.asyncio +async def test_update_interfaces_defaults_to_stored_choice(aiozc_loopback: AsyncZeroconf) -> None: + """Calling without an argument reuses the interface choice from construction.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + engine = zc.engine + before = (len(engine.senders), len(engine.readers), len(engine.protocols)) + await aiozc_loopback.async_update_interfaces() + assert (len(engine.senders), len(engine.readers), len(engine.protocols)) == before + + +@pytest.mark.asyncio +async def test_update_interfaces_accepts_ip_version_and_apple_p2p(aiozc_loopback: AsyncZeroconf) -> None: + """ip_version and apple_p2p overrides are stored for the rescan.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + await aiozc_loopback.async_update_interfaces(["127.0.0.1"], ip_version=IPVersion.V4Only, apple_p2p=False) + assert zc._ip_version is IPVersion.V4Only + assert zc._apple_p2p is False + + +@pytest.mark.asyncio +async def test_update_interfaces_removes_and_readds(aiozc_loopback: AsyncZeroconf) -> None: + """A gone interface drops its sender; a returning interface re-adds it.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + engine = zc.engine + listen_reader_count = len(engine.readers) - len(engine.senders) + + await aiozc_loopback.async_update_interfaces([]) + await asyncio.sleep(0) + assert engine.senders == [] + # The shared listen socket is never torn down. + assert len(engine.readers) == listen_reader_count + + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + await asyncio.sleep(0) + assert len(engine.senders) == 1 + + +@pytest.mark.asyncio +async def test_update_interfaces_keeps_unchanged_sender_untouched(aiozc_loopback: AsyncZeroconf) -> None: + """An unchanged interface keeps its exact transport; only the gone interface is torn down.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + engine = zc.engine + kept = engine.senders[0] + kept_transport = kept.transport + + # Inject a sender for an interface that is absent from the new set. + gone_transport = Mock() + gone = _make_wrapped(("10.0.0.5", 5353), transport=gone_transport) + engine.senders.append(gone) + + with patch.object(_engine, "drop_multicast_member"): + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + await asyncio.sleep(0) + + # The unchanged 127.0.0.1 sender is the same object, never recreated. + assert engine.senders == [kept] + assert engine.senders[0].transport is kept_transport + # The gone interface's transport was closed exactly once. + gone_transport.close.assert_called_once() + + +@pytest.mark.asyncio +async def test_update_interfaces_cancels_removed_listener_timers(aiozc_loopback: AsyncZeroconf) -> None: + """Removing an interface cancels its listener's pending TC-reassembly timers.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + engine = zc.engine + sender = engine.senders[0] + protocol = next( + p for p in engine.protocols if p.transport is not None and p.transport.transport is sender.transport + ) + timer = Mock() + protocol._timers["1.2.3.4"] = timer + protocol._deferred["1.2.3.4"] = [] + protocol._deferred_deadlines["1.2.3.4"] = 0.0 + + await aiozc_loopback.async_update_interfaces([]) + await asyncio.sleep(0) + + timer.cancel.assert_called_once() + assert protocol._timers == {} + assert protocol._deferred == {} + + +@pytest.mark.asyncio +async def test_close_sender_keeps_protocol_without_transport(aiozc_loopback: AsyncZeroconf) -> None: + """A protocol that never bound a transport is left in place when a sender is closed.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + sender = engine.senders[0] + orphan = _listener.AsyncListener(aiozc_loopback.zeroconf) + assert orphan.transport is None + engine.protocols.append(orphan) + + with patch.object(_engine, "drop_multicast_member"): + engine._async_close_sender(sender, None) + + assert orphan in engine.protocols + + +@pytest.mark.asyncio +async def test_update_interfaces_reconciles_mixed_set(aiozc_loopback: AsyncZeroconf) -> None: + """One rescan keeps unchanged, drops gone, adds new across v4 and link-local v6. + + Drives the engine diff directly over a controlled sender set (no real + sockets) so the (address, scope_id) keying is exercised end to end, + including two interfaces sharing fe80::1 distinguished only by scope. + """ + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + + keep_v4 = _make_wrapped(("192.168.1.5", 5353)) + drop_v4 = _make_wrapped(("10.0.0.9", 5353)) + keep_v6 = _make_wrapped(("fe80::1", 5353, 0, 2), is_ipv6=True, multicast_index=2) + drop_v6 = _make_wrapped(("fe80::1", 5353, 0, 3), is_ipv6=True, multicast_index=3) + engine.senders = [keep_v4, drop_v4, keep_v6, drop_v6] + + # Keep 192.168.1.5 and fe80::1%2; drop 10.0.0.9 and fe80::1%3; add 192.168.1.9. + desired = ["192.168.1.5", "192.168.1.9", (("fe80::1", 0, 2), 2)] + added_sockets: list = [] + + def _fake_wrap(sock: object, is_sender: bool) -> _WrappedTransport: + added_sockets.append(sock) + wrapped = _make_wrapped(("added", 0)) + engine.senders.append(wrapped) + return wrapped + + with ( + patch.object(_engine, "normalize_interface_choice", return_value=desired), + # This test exercises the diff over a contrived sender set, not the + # listen-socket rebuild, so treat every family as supported. + patch.object(_engine, "_listen_socket_supports", return_value=True), + patch.object(_engine, "add_interface", return_value=Mock()), + patch.object(_engine, "drop_multicast_member") as mock_drop, + patch.object(_engine.AsyncEngine, "_async_wrap_socket", new=AsyncMock(side_effect=_fake_wrap)), + ): + added = await engine.async_update_interfaces(["unused"], IPVersion.All, False) + + assert added is True + # Unchanged senders are the same objects; gone ones are removed. + assert keep_v4 in engine.senders + assert keep_v6 in engine.senders + assert drop_v4 not in engine.senders + assert drop_v6 not in engine.senders + # Exactly one brand-new interface was added. + assert len(added_sockets) == 1 + # Each gone interface left its group with its own representation; the + # scope-3 v6 is dropped while the scope-2 v6 with the same address is kept. + dropped = {call.args[1] for call in mock_drop.call_args_list} + assert dropped == {"10.0.0.9", (("fe80::1", 0, 0), 3)} + + +@pytest.mark.asyncio +async def test_update_interfaces_reannounces_services_on_add(aiozc_loopback: AsyncZeroconf) -> None: + """Existing registrations are re-announced when a new sender appears.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + info = ServiceInfo( + "_test._tcp.local.", + "Test._test._tcp.local.", + addresses=[b"\x7f\x00\x00\x01"], + port=80, + server="test.local.", + ) + await aiozc_loopback.async_register_service(info) + # Drop the sender so the next rescan genuinely adds one back. + await aiozc_loopback.async_update_interfaces([]) + await asyncio.sleep(0) + + with patch.object(zc, "_async_broadcast_service", new_callable=AsyncMock) as mock_broadcast: + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + await asyncio.sleep(0) + + assert mock_broadcast.call_count == 1 + assert mock_broadcast.call_args.args[0] is info + + +@pytest.mark.asyncio +async def test_update_interfaces_noop_does_not_reannounce(aiozc_loopback: AsyncZeroconf) -> None: + """An unchanged interface set neither touches sockets nor re-announces.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + engine = zc.engine + info = ServiceInfo( + "_test._tcp.local.", + "Test._test._tcp.local.", + addresses=[b"\x7f\x00\x00\x01"], + port=80, + server="test.local.", + ) + await aiozc_loopback.async_register_service(info) + before = (len(engine.senders), len(engine.readers), len(engine.protocols)) + + with patch.object(zc, "_async_broadcast_service", new_callable=AsyncMock) as mock_broadcast: + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + await asyncio.sleep(0) + + mock_broadcast.assert_not_called() + assert (len(engine.senders), len(engine.readers), len(engine.protocols)) == before + + +@pytest.mark.asyncio +async def test_update_interfaces_logs_reannounce_errors( + aiozc_loopback: AsyncZeroconf, caplog: pytest.LogCaptureFixture +) -> None: + """A re-announce failure is logged and does not propagate out of the rescan.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + info = ServiceInfo( + "_test._tcp.local.", + "Test._test._tcp.local.", + addresses=[b"\x7f\x00\x00\x01"], + port=80, + server="test.local.", + ) + await aiozc_loopback.async_register_service(info) + await aiozc_loopback.async_update_interfaces([]) + await asyncio.sleep(0) + + with ( + patch.object(zc, "_async_broadcast_service", new_callable=AsyncMock, side_effect=ValueError("boom")), + caplog.at_level(logging.WARNING), + ): + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + await asyncio.sleep(0) + + # The failing service is named so a partial failure is actionable. + assert "Error re-announcing Test._test._tcp.local. after interface update" in caplog.text + + +@pytest.mark.asyncio +async def test_update_interfaces_reannounces_all_services_one_failing( + aiozc_loopback: AsyncZeroconf, caplog: pytest.LogCaptureFixture +) -> None: + """Every registration is re-announced on add; one failing does not stop the rest.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + infos = [ + ServiceInfo( + "_test._tcp.local.", + f"T{n}._test._tcp.local.", + addresses=[b"\x7f\x00\x00\x01"], + port=80 + n, + server=f"t{n}.local.", + ) + for n in range(2) + ] + for info in infos: + await aiozc_loopback.async_register_service(info) + await aiozc_loopback.async_update_interfaces([]) + await asyncio.sleep(0) + + async def broadcast(info: ServiceInfo, *args: object) -> None: + if info is infos[0]: + raise ValueError("boom") + + with ( + patch.object(zc, "_async_broadcast_service", new_callable=AsyncMock, side_effect=broadcast) as mock, + caplog.at_level(logging.WARNING), + ): + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + await asyncio.sleep(0) + + # Both services were attempted (the gather fans out over all registrations) + # and the second still ran despite the first raising. + announced = {call.args[0] for call in mock.call_args_list} + assert announced == set(infos) + # Only the failing service is named in the warning. + assert f"Error re-announcing {infos[0].name} after interface update" in caplog.text + assert infos[1].name not in caplog.text + + +@pytest.mark.asyncio +async def test_update_interfaces_reannounce_cancellation_propagates(aiozc_loopback: AsyncZeroconf) -> None: + """A cancelled re-announce propagates rather than being silently dropped as a non-Exception.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + info = ServiceInfo( + "_test._tcp.local.", + "Test._test._tcp.local.", + addresses=[b"\x7f\x00\x00\x01"], + port=80, + server="test.local.", + ) + await aiozc_loopback.async_register_service(info) + await aiozc_loopback.async_update_interfaces([]) + await asyncio.sleep(0) + + with ( + patch.object( + zc, "_async_broadcast_service", new_callable=AsyncMock, side_effect=asyncio.CancelledError + ), + pytest.raises(asyncio.CancelledError), + ): + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + + +@pytest.mark.asyncio +async def test_update_interfaces_ip_change_in_one_rescan(aiozc_loopback: AsyncZeroconf) -> None: + """An interface whose address changes is removed and re-added in a single rescan.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + old_transport = Mock() + old = _make_wrapped(("10.0.0.5", 5353), transport=old_transport) + engine.senders = [old] + + async def fake_wrap(sock: object, is_sender: bool) -> _WrappedTransport: + wrapped = _make_wrapped(("10.0.0.9", 5353), transport=Mock()) + (engine.senders if is_sender else engine.readers).append(wrapped) + return wrapped + + with ( + patch.object(_engine, "normalize_interface_choice", return_value=["10.0.0.9"]), + patch.object(_engine, "_listen_socket_supports", return_value=True), + patch.object(_engine, "add_interface", return_value=Mock()), + patch.object(_engine, "drop_multicast_member") as mock_drop, + patch.object(_engine.AsyncEngine, "_async_wrap_socket", new=AsyncMock(side_effect=fake_wrap)), + ): + added = await engine.async_update_interfaces(["unused"], IPVersion.All, False) + + assert added is True + # The old address left its group and was closed; exactly the new one remains. + assert {call.args[1] for call in mock_drop.call_args_list} == {"10.0.0.5"} + old_transport.close.assert_called_once() + assert old not in engine.senders + assert len(engine.senders) == 1 + assert engine.senders[0].interface_key == ("10.0.0.9", 0) + + +@pytest.mark.asyncio +async def test_update_interfaces_transient_empty_set_is_noop( + aiozc_loopback: AsyncZeroconf, caplog: pytest.LogCaptureFixture +) -> None: + """An All instance that transiently resolves to zero interfaces logs and no-ops.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + engine = zc.engine + before = (len(engine.senders), len(engine.readers), len(engine.protocols)) + + # normalize_interface_choice raises for an All instance with no addresses + # (adapter churn); the rescan must not crash the caller's handler. + with ( + patch.object( + _engine, + "normalize_interface_choice", + side_effect=RuntimeError("No interfaces to listen on"), + ), + caplog.at_level(logging.WARNING), + ): + added = await engine.async_update_interfaces(InterfaceChoice.All, IPVersion.All, False) + + assert added is False + assert "Skipping interface update; no interfaces available" in caplog.text + # Current sockets are left intact rather than torn down. + assert (len(engine.senders), len(engine.readers), len(engine.protocols)) == before + + +@pytest.mark.asyncio +async def test_update_interfaces_add_failure_adds_no_sender(aiozc_loopback: AsyncZeroconf) -> None: + """An interface that fails to come up adds no responder socket.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + engine = zc.engine + await aiozc_loopback.async_update_interfaces([]) + await asyncio.sleep(0) + + with patch.object(_engine, "add_interface", return_value=None): + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + assert engine.senders == [] + + +@pytest.mark.asyncio +async def test_update_interfaces_rolls_back_membership_on_wrap_failure( + aiozc_loopback: AsyncZeroconf, +) -> None: + """Endpoint creation failing rolls the interface back and is skipped, not raised.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + await aiozc_loopback.async_update_interfaces([]) + await asyncio.sleep(0) + assert zc._interfaces == [] + + fake_socket = Mock() + with ( + patch.object(_engine, "add_interface", return_value=fake_socket), + patch.object(_engine, "drop_multicast_member") as mock_drop, + patch.object(_engine.AsyncEngine, "_async_wrap_socket", new=AsyncMock(side_effect=OSError("boom"))), + ): + # Best-effort: the failure is logged and skipped, not propagated. + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + + # The just-joined membership was dropped, the socket closed, no sender added. + mock_drop.assert_called_once() + fake_socket.close.assert_called_once() + assert zc.engine.senders == [] + + +@pytest.mark.asyncio +async def test_add_interface_rollback_without_listen_socket(aiozc_loopback: AsyncZeroconf) -> None: + """A wrap failure with no listen socket (unicast) closes the socket and drops no membership.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + + fake_socket = Mock() + with ( + patch.object(_engine, "add_interface", return_value=fake_socket), + patch.object(_engine, "drop_multicast_member") as mock_drop, + patch.object(_engine.AsyncEngine, "_async_wrap_socket", new=AsyncMock(side_effect=OSError("boom"))), + ): + added = await engine._async_add_interface("127.0.0.1", None, False) + + assert added is False + fake_socket.close.assert_called_once() + mock_drop.assert_not_called() + + +@pytest.mark.asyncio +async def test_add_interface_propagates_non_oserror(aiozc_loopback: AsyncZeroconf) -> None: + """A non-OSError (a real bug) propagates after rollback, not downgraded to a warning.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + + fake_socket = Mock() + listen_socket = Mock() + with ( + patch.object(_engine, "add_interface", return_value=fake_socket), + patch.object(_engine, "drop_multicast_member") as mock_drop, + patch.object(_engine.AsyncEngine, "_async_wrap_socket", new=AsyncMock(side_effect=TypeError("bug"))), + pytest.raises(TypeError), + ): + await engine._async_add_interface("127.0.0.1", listen_socket, False) + + # Rolled back (socket closed, membership dropped) even though it propagates. + fake_socket.close.assert_called_once() + mock_drop.assert_called_once() + + +@pytest.mark.asyncio +async def test_update_interfaces_keeps_dual_use_listen_socket(aiozc_loopback: AsyncZeroconf) -> None: + """A dual-use sender (the listen socket itself) is never torn down on rescan.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + listen = engine._listen_transport + assert listen is not None + # Simulate a Default single-family instance: the listen socket is the sole sender. + engine.senders = [listen] + await engine.async_update_interfaces([], IPVersion.V4Only, False) + assert engine.senders == [listen] + + +@pytest.mark.asyncio +async def test_update_interfaces_default_to_explicit_reconciles(aiozc_loopback: AsyncZeroconf) -> None: + """Moving a dual-use instance to an explicit set demotes its socket and rebuilds clean.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + listen = engine._listen_transport + assert listen is not None + old_underlying = listen.transport + # Simulate a Default single-family instance: the listen socket is the sole sender. + engine.senders = [listen] + new_listen_sock = Mock() + new_listen_sock.family = socket.AF_INET + + async def fake_wrap(sock: object, is_sender: bool) -> _WrappedTransport: + wrapped = _make_wrapped(("wrapped", 0), transport=Mock()) + (engine.senders if is_sender else engine.readers).append(wrapped) + return wrapped + + with ( + patch.object(_engine, "normalize_interface_choice", return_value=["192.168.1.5"]), + patch.object(_engine, "new_listen_socket", return_value=new_listen_sock) as mock_new_listen, + patch.object(_engine, "add_interface", return_value=Mock()), + patch.object(_engine, "drop_multicast_member"), + patch.object(_engine.AsyncEngine, "_async_wrap_socket", new=AsyncMock(side_effect=fake_wrap)), + ): + added = await engine.async_update_interfaces(["unused"], IPVersion.V4Only, False) + + # The dual-use socket is rebuilt as a pure listener (demoted and closed), + # a fresh listener replaces it, and the explicit interface gains a responder. + assert added is True + mock_new_listen.assert_called_once() + assert engine._listen_transport is not listen + assert listen not in engine.senders + assert listen not in engine.readers + assert old_underlying.is_closing() + # One brand-new responder (for 192.168.1.5) is the only sender now. + assert len(engine.senders) == 1 + assert engine.senders[0] is not listen + + +@pytest.mark.asyncio +async def test_update_interfaces_default_to_explicit_real(aiozc_loopback: AsyncZeroconf) -> None: + """A real dual-use socket with an overlapping membership reconciles without EADDRINUSE.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + listen = engine._listen_transport + assert listen is not None + assert listen.sock.family == socket.AF_INET + old_underlying = listen.transport + # Simulate a Default dual-use instance whose listen socket already joined + # the loopback group, so a naive demote-and-rejoin would hit EADDRINUSE. + _engine.add_multicast_member(listen.sock, "127.0.0.1") + engine.senders = [listen] + + with patch.object(_engine, "normalize_interface_choice", return_value=["127.0.0.1"]): + added = await engine.async_update_interfaces(["unused"], IPVersion.V4Only, False) + + assert added is True + new_listen = engine._listen_transport + assert new_listen is not None + assert new_listen is not listen + assert new_listen.sock.family == socket.AF_INET + assert old_underlying.is_closing() + # The overlapping interface got a real responder on the fresh listen socket. + assert len(engine.senders) == 1 + assert engine.senders[0] is not listen + + +@pytest.mark.asyncio +async def test_update_interfaces_does_not_rebuild_when_family_supported( + aiozc_loopback: AsyncZeroconf, +) -> None: + """Same-family rescans (and All/dual-stack) never rebuild the listen socket.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + listen = zc.engine._listen_transport + with patch.object(_engine, "new_listen_socket") as mock_new_listen: + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + await aiozc_loopback.async_update_interfaces([]) + await asyncio.sleep(0) + await aiozc_loopback.async_update_interfaces(["127.0.0.1"]) + await asyncio.sleep(0) + mock_new_listen.assert_not_called() + assert zc.engine._listen_transport is listen + + +@pytest.mark.asyncio +async def test_update_interfaces_rebuild_rejoins_kept_interfaces(aiozc_loopback: AsyncZeroconf) -> None: + """On a family-change rebuild, interfaces that stay are re-joined on the new listen socket.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + + # Keep the existing IPv4 interface and add an IPv6 one (which the IPv4 + # listen socket can't join, forcing a rebuild). + v6 = (("fe80::1", 0, 0), 1) + new_listen_sock = Mock() + new_listen_sock.family = socket.AF_INET6 + + async def fake_wrap(sock: object, is_sender: bool) -> _WrappedTransport: + wrapped = _make_wrapped(("wrapped", 0), transport=Mock()) + (engine.senders if is_sender else engine.readers).append(wrapped) + return wrapped + + with ( + patch.object(_engine, "normalize_interface_choice", return_value=["127.0.0.1", v6]), + patch.object(_engine, "new_listen_socket", return_value=new_listen_sock), + patch.object(_engine, "add_multicast_member", return_value=True) as mock_add, + patch.object(_engine, "add_interface", return_value=Mock()), + patch.object(_engine, "drop_multicast_member"), + patch.object(_engine.AsyncEngine, "_async_wrap_socket", new=AsyncMock(side_effect=fake_wrap)), + ): + added = await engine.async_update_interfaces(["unused"], IPVersion.All, False) + + assert added is True + # The kept IPv4 interface was re-joined on the new listen socket. + assert any( + call.args[0] is new_listen_sock and call.args[1] == "127.0.0.1" for call in mock_add.call_args_list + ) + + +@pytest.mark.asyncio +async def test_update_interfaces_rebuild_rejoin_failure_warns( + aiozc_loopback: AsyncZeroconf, caplog: pytest.LogCaptureFixture +) -> None: + """A staying interface that fails to re-join on the rebuilt listen socket warns.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + v6 = (("fe80::1", 0, 0), 1) + new_listen_sock = Mock() + new_listen_sock.family = socket.AF_INET6 + + async def fake_wrap(sock: object, is_sender: bool) -> _WrappedTransport: + wrapped = _make_wrapped(("wrapped", 0), transport=Mock()) + (engine.senders if is_sender else engine.readers).append(wrapped) + return wrapped + + with ( + patch.object(_engine, "normalize_interface_choice", return_value=["127.0.0.1", v6]), + patch.object(_engine, "new_listen_socket", return_value=new_listen_sock), + patch.object(_engine, "add_multicast_member", return_value=False), + patch.object(_engine, "add_interface", return_value=Mock()), + patch.object(_engine, "drop_multicast_member"), + patch.object(_engine.AsyncEngine, "_async_wrap_socket", new=AsyncMock(side_effect=fake_wrap)), + caplog.at_level(logging.WARNING), + ): + await engine.async_update_interfaces(["unused"], IPVersion.All, False) + + assert "could not re-join the multicast group on the rebuilt listen socket" in caplog.text + + +@pytest.mark.asyncio +async def test_update_interfaces_rebuild_failure_is_noop( + aiozc_loopback: AsyncZeroconf, caplog: pytest.LogCaptureFixture +) -> None: + """If the replacement listen socket can't be created, the rescan logs and no-ops.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + old_listen = engine._listen_transport + before = (list(engine.senders), list(engine.readers)) + with ( + patch.object(_engine, "normalize_interface_choice", return_value=[(("fe80::1", 0, 0), 1)]), + patch.object(_engine, "new_listen_socket", return_value=None), + caplog.at_level(logging.WARNING), + ): + added = await engine.async_update_interfaces(["unused"], IPVersion.V6Only, False) + + # Best-effort: no raise into the caller, state left untouched. + assert added is False + assert "listen socket rebuild failed" in caplog.text + assert engine._listen_transport is old_listen + assert (list(engine.senders), list(engine.readers)) == before + + +@pytest.mark.asyncio +async def test_update_interfaces_default_rebuild_failure_keeps_dual_use( + aiozc_loopback: AsyncZeroconf, +) -> None: + """A failed rebuild during a dual-use conversion leaves the dual-use sender intact.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + listen = engine._listen_transport + assert listen is not None + # Simulate a Default single-family instance: the listen socket is the sole sender. + engine.senders = [listen] + with ( + patch.object(_engine, "normalize_interface_choice", return_value=["192.168.1.5"]), + patch.object(_engine, "new_listen_socket", return_value=None), + ): + added = await engine.async_update_interfaces(["unused"], IPVersion.V4Only, False) + + # The dual-use socket was not demoted before the (failed) rebuild, so the + # instance still responds on its interface rather than going silent. + assert added is False + assert engine.senders == [listen] + assert engine._listen_transport is listen + + +@pytest.mark.asyncio +async def test_update_interfaces_rebuild_closes_socket_on_wrap_failure( + aiozc_loopback: AsyncZeroconf, +) -> None: + """If wrapping the new listen socket fails, it is closed rather than leaked.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + old_listen = engine._listen_transport + new_listen_sock = Mock() + new_listen_sock.family = socket.AF_INET6 + + with ( + patch.object(_engine, "normalize_interface_choice", return_value=[(("fe80::1", 0, 0), 1)]), + patch.object(_engine, "new_listen_socket", return_value=new_listen_sock), + patch.object(_engine, "add_multicast_member", return_value=True), + patch.object(_engine.AsyncEngine, "_async_wrap_socket", new=AsyncMock(side_effect=OSError("boom"))), + ): + added = await engine.async_update_interfaces(["unused"], IPVersion.V6Only, False) + + # Best-effort no-op: the unadopted socket was closed (not leaked) and the + # old listen socket is untouched. + assert added is False + new_listen_sock.close.assert_called_once() + assert engine._listen_transport is old_listen + + +@pytest.mark.asyncio +async def test_update_interfaces_rebuild_family_matches_desired_set( + aiozc_loopback: AsyncZeroconf, +) -> None: + """The rebuilt listen socket's family is derived from the desired set, not ip_version.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + new_listen_sock = Mock() + new_listen_sock.family = socket.AF_INET + + async def fake_wrap(sock: object, is_sender: bool) -> _WrappedTransport: + wrapped = _make_wrapped(("wrapped", 0), transport=Mock()) + (engine.senders if is_sender else engine.readers).append(wrapped) + return wrapped + + with ( + patch.object(_engine, "normalize_interface_choice", return_value=["192.168.1.5"]), + patch.object(_engine, "_listen_socket_supports", return_value=False), # force a rebuild + patch.object(_engine, "new_listen_socket", return_value=new_listen_sock) as mock_new_listen, + patch.object(_engine, "add_multicast_member", return_value=True), + patch.object(_engine, "add_interface", return_value=Mock()), + patch.object(_engine, "drop_multicast_member"), + patch.object(_engine.AsyncEngine, "_async_wrap_socket", new=AsyncMock(side_effect=fake_wrap)), + ): + # ip_version says V6Only, but the desired set is all IPv4, so the + # rebuilt socket is IPv4 (covers the set; no immediate re-rebuild). + await engine.async_update_interfaces(["unused"], IPVersion.V6Only, False) + + mock_new_listen.assert_called_once() + assert mock_new_listen.call_args.args[0] is IPVersion.V4Only + + +@pytest.mark.asyncio +async def test_update_interfaces_rebuilds_real_listen_socket(aiozc_loopback: AsyncZeroconf) -> None: + """End to end: a family change builds a real dual-stack listen socket and closes the old one.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + old_listen = engine._listen_transport + assert old_listen is not None + assert old_listen.sock.family == socket.AF_INET # V4Only loopback instance + old_underlying = old_listen.transport + + v6 = (("fe80::1", 0, 0), 1) + # Real new_listen_socket + _async_wrap_socket run; only membership joins and + # the (unbindable) v6 responder are stubbed so no real multicast is exercised. + with ( + patch.object(_engine, "normalize_interface_choice", return_value=["127.0.0.1", v6]), + patch.object(_engine, "add_multicast_member", return_value=True), + patch.object(_engine, "add_interface", return_value=None), + ): + await engine.async_update_interfaces(["unused"], IPVersion.All, False) + + new_listen = engine._listen_transport + assert new_listen is not None + assert new_listen is not old_listen + assert new_listen.sock.family == socket.AF_INET6 # rebuilt to a dual-stack socket + # The old listen socket was closed and removed; no duplicate remains. + assert old_underlying.is_closing() + assert old_listen not in engine.readers + assert sum(1 for r in engine.readers if r is new_listen) == 1 + + +@pytest.mark.asyncio +async def test_close_sender_closes_transport_when_drop_raises(aiozc_loopback: AsyncZeroconf) -> None: + """A non-benign group-leave error still releases the transport.""" + engine = aiozc_loopback.zeroconf.engine + await aiozc_loopback.zeroconf.async_wait_for_start() + gone_transport = Mock() + gone = _make_wrapped(("10.0.0.5", 5353), transport=gone_transport) + listen_socket = Mock() + + with ( + patch.object(_engine, "drop_multicast_member", side_effect=OSError("EPERM")), + pytest.raises(OSError), + ): + engine._async_close_sender(gone, listen_socket) + + gone_transport.close.assert_called_once() + + +@pytest.mark.asyncio +async def test_update_interfaces_apple_p2p_non_darwin_raises(aiozc_loopback: AsyncZeroconf) -> None: + """apple_p2p=True on a non-Apple platform raises, matching __init__.""" + await aiozc_loopback.zeroconf.async_wait_for_start() + with ( + patch("zeroconf._core.sys.platform", "linux"), + pytest.raises(RuntimeError, match="apple_p2p"), + ): + await aiozc_loopback.async_update_interfaces(apple_p2p=True) + + +@pytest.mark.asyncio +async def test_update_interfaces_copies_interface_list(aiozc_loopback: AsyncZeroconf) -> None: + """A mutable interfaces list is copied so later mutation doesn't change retained config.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + ifaces = ["127.0.0.1"] + await aiozc_loopback.async_update_interfaces(ifaces) + ifaces.append("10.0.0.1") + assert zc._interfaces == ["127.0.0.1"] + + +@pytest.mark.asyncio +async def test_update_interfaces_unicast_has_no_listen_socket() -> None: + """In unicast mode there is no listen socket, so membership ops are skipped.""" + aiozc = AsyncZeroconf(interfaces=["127.0.0.1"], unicast=True) + try: + zc = aiozc.zeroconf + await zc.async_wait_for_start() + engine = zc.engine + assert engine._listen_transport is None + await aiozc.async_update_interfaces([]) + await asyncio.sleep(0) + assert engine.senders == [] + # A None responder socket has no membership to roll back without a listen socket. + with ( + patch.object(_engine, "add_interface", return_value=None), + patch.object(_engine, "drop_multicast_member") as mock_drop, + ): + await aiozc.async_update_interfaces(["127.0.0.1"]) + assert engine.senders == [] + mock_drop.assert_not_called() + await aiozc.async_update_interfaces(["127.0.0.1"]) + await asyncio.sleep(0) + assert len(engine.senders) == 1 + finally: + await aiozc.async_close() + + +@pytest.mark.asyncio +async def test_update_interfaces_serializes_concurrent_calls(aiozc_loopback: AsyncZeroconf) -> None: + """Overlapping rescans are serialized so an interface is not added twice.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + engine = zc.engine + await aiozc_loopback.async_update_interfaces([]) + await asyncio.sleep(0) + assert engine.senders == [] + + await asyncio.gather( + aiozc_loopback.async_update_interfaces(["127.0.0.1"]), + aiozc_loopback.async_update_interfaces(["127.0.0.1"]), + ) + await asyncio.sleep(0) + assert len(engine.senders) == 1 + + +@pytest.mark.asyncio +async def test_update_interfaces_keeps_config_on_reconcile_failure(aiozc_loopback: AsyncZeroconf) -> None: + """A failed engine reconcile leaves the retained interface config unchanged.""" + zc = aiozc_loopback.zeroconf + await zc.async_wait_for_start() + original_interfaces = zc._interfaces + original_ip_version = zc._ip_version + + with ( + patch.object(_engine.AsyncEngine, "async_update_interfaces", new=AsyncMock(side_effect=OSError)), + pytest.raises(OSError), + ): + await aiozc_loopback.async_update_interfaces(["10.0.0.1"], ip_version=IPVersion.All) + + assert zc._interfaces == original_interfaces + assert zc._ip_version == original_ip_version + + +def test_sync_update_interfaces(zc_loopback: Zeroconf) -> None: + """The sync wrapper drives a rescan through the loop without changing a stable set.""" + engine = zc_loopback.engine + sender_count = len(engine.senders) + zc_loopback.update_interfaces(["127.0.0.1"]) + assert len(engine.senders) == sender_count diff --git a/tests/utils/test_net.py b/tests/utils/test_net.py index 311e95e6..b04a774d 100644 --- a/tests/utils/test_net.py +++ b/tests/utils/test_net.py @@ -4,6 +4,7 @@ import errno import socket +import struct import sys import unittest import warnings @@ -287,6 +288,57 @@ def test_add_multicast_member(caplog: pytest.LogCaptureFixture) -> None: assert "net.ipv4.igmp_max_memberships" not in caplog.text +def test_drop_multicast_member() -> None: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + interface = "127.0.0.1" + + # No error should return True + with patch("socket.socket.setsockopt"): + assert netutils.drop_multicast_member(sock, interface) is True + + # IPv6 leave should return True + with patch("socket.socket.setsockopt"): + assert netutils.drop_multicast_member(sock, (("ff02::fb", 0, 0), 1)) is True # type: ignore[arg-type] + + # Benign errnos when the interface is already gone should return False + for benign in (errno.EADDRNOTAVAIL, errno.EINVAL, errno.ENODEV, errno.ENOPROTOOPT): + with patch("socket.socket.setsockopt", side_effect=OSError(benign, None)): + assert netutils.drop_multicast_member(sock, interface) is False + + # EPERM should always raise + with ( + pytest.raises(OSError), + patch("socket.socket.setsockopt", side_effect=OSError(errno.EPERM, None)), + ): + netutils.drop_multicast_member(sock, interface) + + # No IPv6 support should return False for IPv6 + with patch("socket.inet_pton", side_effect=OSError()): + assert netutils.drop_multicast_member(sock, (("ff02::fb", 0, 0), 1)) is False # type: ignore[arg-type] + + +def test_drop_multicast_member_v6_uses_join_index() -> None: + """The IPv6 group leave packs the join interface index, not the bound scope_id.""" + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + with patch("socket.socket.setsockopt") as mock_set: + assert netutils.drop_multicast_member(sock, (("ff02::fb", 0, 0), 7)) is True # type: ignore[arg-type] + _level, optname, value = mock_set.call_args.args + assert optname == socket.IPV6_LEAVE_GROUP + # Trailing 4 bytes are the interface index the join used. + assert value[-4:] == struct.pack("@I", 7) + + +def test_drop_multicast_member_wsaeinval(monkeypatch: pytest.MonkeyPatch) -> None: + """On Windows, WSAEINVAL when leaving the group is treated as benign.""" + monkeypatch.setattr(sys, "platform", "win32") + monkeypatch.setattr(errno, "WSAEINVAL", 10022, raising=False) + with ( + socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock, + patch("socket.socket.setsockopt", side_effect=OSError(10022, None)), + ): + assert netutils.drop_multicast_member(sock, "127.0.0.1") is False + + def test_bind_raises_skips_address(): """Test bind failing in new_socket returns None on EADDRNOTAVAIL.""" err = errno.EADDRNOTAVAIL @@ -344,6 +396,57 @@ def test_new_respond_socket_new_socket_returns_none(): assert netutils.new_respond_socket(("0.0.0.0", 0)) is None # type: ignore[arg-type] +def test_add_interface_returns_responder_on_success(): + """add_interface joins the group and returns the responder socket.""" + listen_socket = Mock() + respond_socket = Mock() + with ( + patch.object(netutils, "add_multicast_member", return_value=True) as mock_add, + patch.object(netutils, "new_respond_socket", return_value=respond_socket), + patch.object(netutils, "drop_multicast_member") as mock_drop, + ): + assert netutils.add_interface(listen_socket, "127.0.0.1") is respond_socket + mock_add.assert_called_once_with(listen_socket, "127.0.0.1") + mock_drop.assert_not_called() + + +def test_add_interface_join_failure_returns_none(): + """A failed multicast join returns None and never creates a responder.""" + listen_socket = Mock() + with ( + patch.object(netutils, "add_multicast_member", return_value=False), + patch.object(netutils, "new_respond_socket") as mock_respond, + patch.object(netutils, "drop_multicast_member") as mock_drop, + ): + assert netutils.add_interface(listen_socket, "127.0.0.1") is None + mock_respond.assert_not_called() + mock_drop.assert_not_called() + + +def test_add_interface_responder_failure_rolls_back_membership(): + """A None responder socket drops the membership just joined.""" + listen_socket = Mock() + with ( + patch.object(netutils, "add_multicast_member", return_value=True), + patch.object(netutils, "new_respond_socket", return_value=None), + patch.object(netutils, "drop_multicast_member") as mock_drop, + ): + assert netutils.add_interface(listen_socket, "127.0.0.1") is None + mock_drop.assert_called_once_with(listen_socket, "127.0.0.1") + + +def test_add_interface_no_listen_socket_skips_membership(): + """Without a listen socket (unicast) no membership op runs and rollback is skipped.""" + with ( + patch.object(netutils, "add_multicast_member") as mock_add, + patch.object(netutils, "new_respond_socket", return_value=None), + patch.object(netutils, "drop_multicast_member") as mock_drop, + ): + assert netutils.add_interface(None, "127.0.0.1", unicast=True) is None + mock_add.assert_not_called() + mock_drop.assert_not_called() + + def test_create_sockets_interfaces_all_unicast(): """Test create_sockets with unicast."""