From 4c3e85914aa2cf0e453bb9692ea90209739cbbaf Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 31 Mar 2023 13:46:20 -1000 Subject: [PATCH 1/3] feat: reduce overhead to send responses There are still cases were we cannot answer questions fast enough to keep up with systems (mainly HomeKit) that require a response within a time period to prevent a device from going offline. --- src/zeroconf/_core.py | 70 ++++++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/src/zeroconf/_core.py b/src/zeroconf/_core.py index 958b3468..416b09f4 100644 --- a/src/zeroconf/_core.py +++ b/src/zeroconf/_core.py @@ -27,8 +27,9 @@ import socket import sys import threading +from dataclasses import dataclass from types import TracebackType # noqa # used in type hints -from typing import Awaitable, Dict, List, Optional, Tuple, Type, Union, cast +from typing import Any, Awaitable, Dict, List, Optional, Tuple, Type, Union, cast from ._cache import DNSCache from ._dns import DNSQuestion, DNSQuestionType @@ -105,6 +106,29 @@ _REGISTER_BROADCASTS = 3 +@dataclass +class _WrappedTransport: + """A wrapper for transports.""" + + transport: asyncio.DatagramTransport + is_ipv6: bool + socket: socket.socket + fileno: int + sock_name: Any + + +def _make_wrapped_transport(transport: asyncio.DatagramTransport) -> _WrappedTransport: + """Make a wrapped transport.""" + sock: socket.socket = transport.get_extra_info('socket') + return _WrappedTransport( + transport=transport, + is_ipv6=sock.family == socket.AF_INET6, + socket=sock, + fileno=sock.fileno(), + sock_name=sock.getsockname(), + ) + + class AsyncEngine: """An engine wraps sockets in the event loop.""" @@ -117,8 +141,8 @@ def __init__( self.loop: Optional[asyncio.AbstractEventLoop] = None self.zc = zeroconf self.protocols: List[AsyncListener] = [] - self.readers: List[asyncio.DatagramTransport] = [] - self.senders: List[asyncio.DatagramTransport] = [] + self.readers: List[_WrappedTransport] = [] + self.senders: List[_WrappedTransport] = [] self.running_event: Optional[asyncio.Event] = None self._listen_socket = listen_socket self._respond_sockets = respond_sockets @@ -158,9 +182,9 @@ async def _async_create_endpoints(self) -> None: for s in reader_sockets: transport, protocol = await loop.create_datagram_endpoint(lambda: AsyncListener(self.zc), sock=s) self.protocols.append(cast(AsyncListener, protocol)) - self.readers.append(cast(asyncio.DatagramTransport, transport)) + self.readers.append(_make_wrapped_transport(cast(asyncio.DatagramTransport, transport))) if s in sender_sockets: - self.senders.append(cast(asyncio.DatagramTransport, transport)) + self.senders.append(_make_wrapped_transport(cast(asyncio.DatagramTransport, transport))) def _async_cache_cleanup(self) -> None: """Periodic cache cleanup.""" @@ -186,8 +210,8 @@ def _async_shutdown(self) -> None: """Shutdown transports and sockets.""" assert self.running_event is not None self.running_event.clear() - for transport in itertools.chain(self.senders, self.readers): - transport.close() + for wrapped_transport in itertools.chain(self.senders, self.readers): + wrapped_transport.transport.close() def close(self) -> None: """Close from sync context. @@ -221,7 +245,7 @@ def __init__(self, zc: 'Zeroconf') -> None: self.zc = zc self.data: Optional[bytes] = None self.last_time: float = 0 - self.transport: Optional[asyncio.DatagramTransport] = None + self.transport: Optional[_WrappedTransport] = None self.sock_description: Optional[str] = None self._deferred: Dict[str, List[DNSIncoming]] = {} self._timers: Dict[str, asyncio.TimerHandle] = {} @@ -309,7 +333,7 @@ def handle_query_or_defer( msg: DNSIncoming, addr: str, port: int, - transport: asyncio.DatagramTransport, + transport: _WrappedTransport, v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (), ) -> None: """Deal with incoming query packets. Provides a response if @@ -341,7 +365,7 @@ def _respond_query( msg: Optional[DNSIncoming], addr: str, port: int, - transport: asyncio.DatagramTransport, + transport: _WrappedTransport, v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (), ) -> None: """Respond to a query and reassemble any truncated deferred packets.""" @@ -362,10 +386,9 @@ def error_received(self, exc: Exception) -> None: self.log_exception_once(exc, msg_str, exc) def connection_made(self, transport: asyncio.BaseTransport) -> None: - self.transport = cast(asyncio.DatagramTransport, transport) - sock_name = self.transport.get_extra_info('sockname') - sock_fileno = self.transport.get_extra_info('socket').fileno() - self.sock_description = f"{sock_fileno} ({sock_name})" + wrapped_transport = _make_wrapped_transport(cast(asyncio.DatagramTransport, transport)) + self.transport = wrapped_transport + self.sock_description = f"{wrapped_transport.fileno} ({wrapped_transport.sock_name})" def connection_lost(self, exc: Optional[Exception]) -> None: """Handle connection lost.""" @@ -373,7 +396,7 @@ def connection_lost(self, exc: Optional[Exception]) -> None: def async_send_with_transport( log_debug: bool, - transport: asyncio.DatagramTransport, + transport: _WrappedTransport, packet: bytes, packet_num: int, out: DNSOutgoing, @@ -381,8 +404,7 @@ def async_send_with_transport( port: int, v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (), ) -> None: - s = transport.get_extra_info('socket') - ipv6_socket = s.family == socket.AF_INET6 + ipv6_socket = transport.is_ipv6 if addr is None: real_addr = _MDNS_ADDR6 if ipv6_socket else _MDNS_ADDR else: @@ -394,8 +416,8 @@ def async_send_with_transport( 'Sending to (%s, %d) via [socket %s (%s)] (%d bytes #%d) %r as %r...', real_addr, port or _MDNS_PORT, - s.fileno(), - transport.get_extra_info('sockname'), + transport.fileno, + transport.sock_name, len(packet), packet_num + 1, out, @@ -404,9 +426,9 @@ def async_send_with_transport( # Get flowinfo and scopeid for the IPV6 socket to create a complete IPv6 # address tuple: https://docs.python.org/3.6/library/socket.html#socket-families if ipv6_socket and not v6_flow_scope: - _, _, sock_flowinfo, sock_scopeid = s.getsockname() + _, _, sock_flowinfo, sock_scopeid = transport.sock_name v6_flow_scope = (sock_flowinfo, sock_scopeid) - transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope)) + transport.transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope)) class Zeroconf(QuietLogger): @@ -832,7 +854,7 @@ def handle_assembled_query( packets: List[DNSIncoming], addr: str, port: int, - transport: asyncio.DatagramTransport, + transport: _WrappedTransport, v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (), ) -> None: """Respond to a (re)assembled query. @@ -870,7 +892,7 @@ def send( addr: Optional[str] = None, port: int = _MDNS_PORT, v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (), - transport: Optional[asyncio.DatagramTransport] = None, + transport: Optional[_WrappedTransport] = None, ) -> None: """Sends an outgoing packet threadsafe.""" assert self.loop is not None @@ -882,7 +904,7 @@ def async_send( addr: Optional[str] = None, port: int = _MDNS_PORT, v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (), - transport: Optional[asyncio.DatagramTransport] = None, + transport: Optional[_WrappedTransport] = None, ) -> None: """Sends an outgoing packet.""" if self.done: From 0058c034250c4b14d2714e47756e6a1c82aaf1b5 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 31 Mar 2023 14:25:07 -1000 Subject: [PATCH 2/3] fix: tweaks for cython compat --- src/zeroconf/_core.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/zeroconf/_core.py b/src/zeroconf/_core.py index 416b09f4..68e0b657 100644 --- a/src/zeroconf/_core.py +++ b/src/zeroconf/_core.py @@ -27,7 +27,6 @@ import socket import sys import threading -from dataclasses import dataclass from types import TracebackType # noqa # used in type hints from typing import Any, Awaitable, Dict, List, Optional, Tuple, Type, Union, cast @@ -106,15 +105,31 @@ _REGISTER_BROADCASTS = 3 -@dataclass class _WrappedTransport: """A wrapper for transports.""" - transport: asyncio.DatagramTransport - is_ipv6: bool - socket: socket.socket - fileno: int - sock_name: Any + __slots__ = ( + 'transport', + 'is_ipv6', + 'sock', + 'fileno', + 'sock_name', + ) + + def __init__( + self, + transport: asyncio.DatagramTransport, + is_ipv6: bool, + sock: socket.socket, + fileno: int, + sock_name: Any, + ) -> None: + """Initialize the wrapped transport.""" + self.transport = transport + self.is_ipv6 = is_ipv6 + self.sock = sock + self.fileno = fileno + self.sock_name = sock_name def _make_wrapped_transport(transport: asyncio.DatagramTransport) -> _WrappedTransport: @@ -123,7 +138,7 @@ def _make_wrapped_transport(transport: asyncio.DatagramTransport) -> _WrappedTra return _WrappedTransport( transport=transport, is_ipv6=sock.family == socket.AF_INET6, - socket=sock, + sock=sock, fileno=sock.fileno(), sock_name=sock.getsockname(), ) From cc66021c36c1f2c18bad791ea7000afd2496ed99 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 31 Mar 2023 15:05:13 -1000 Subject: [PATCH 3/3] fix: comment --- src/zeroconf/_core.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/zeroconf/_core.py b/src/zeroconf/_core.py index 68e0b657..aa50ddae 100644 --- a/src/zeroconf/_core.py +++ b/src/zeroconf/_core.py @@ -124,7 +124,10 @@ def __init__( fileno: int, sock_name: Any, ) -> None: - """Initialize the wrapped transport.""" + """Initialize the wrapped transport. + + These attributes are used when sending packets. + """ self.transport = transport self.is_ipv6 = is_ipv6 self.sock = sock