From 560f293a992346654b52062f47e0e46d3da77050 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 17:41:16 -1000 Subject: [PATCH 01/12] feat: speed up core with a cython pxd --- build_ext.py | 2 + src/zeroconf/_core.pxd | 69 ++++++++++++++++++++++++ src/zeroconf/_core.py | 64 ++++++++++++++-------- src/zeroconf/_handlers/query_handler.pxd | 2 +- src/zeroconf/_transport.pxd | 11 ++++ src/zeroconf/_transport.py | 4 +- 6 files changed, 126 insertions(+), 26 deletions(-) create mode 100644 src/zeroconf/_core.pxd create mode 100644 src/zeroconf/_transport.pxd diff --git a/build_ext.py b/build_ext.py index 0f02f53a4..da0dc3437 100644 --- a/build_ext.py +++ b/build_ext.py @@ -25,6 +25,7 @@ def build(setup_kwargs: Any) -> None: [ "src/zeroconf/_dns.py", "src/zeroconf/_cache.py", + "src/zeroconf/_core.py", "src/zeroconf/_history.py", "src/zeroconf/_record_update.py", "src/zeroconf/_listener.py", @@ -38,6 +39,7 @@ def build(setup_kwargs: Any) -> None: "src/zeroconf/_services/browser.py", "src/zeroconf/_services/info.py", "src/zeroconf/_services/registry.py", + "src/zeroconf/_transport.py", "src/zeroconf/_updates.py", "src/zeroconf/_utils/ipaddress.py", "src/zeroconf/_utils/time.py", diff --git a/src/zeroconf/_core.pxd b/src/zeroconf/_core.pxd new file mode 100644 index 000000000..ffb69fd1a --- /dev/null +++ b/src/zeroconf/_core.pxd @@ -0,0 +1,69 @@ + +import cython + + +cdef bint TYPE_CHECKING +cdef object _MDNS_ADDR6,_MDNS_ADDR + + +from ._cache cimport DNSCache +from ._handlers.multicast_outgoing_queue cimport MulticastOutgoingQueue +from ._handlers.query_handler cimport QueryHandler, QuestionAnswers +from ._history cimport QuestionHistory +from ._listener cimport AsyncListener +from ._protocol.incoming cimport DNSIncoming +from ._protocol.outgoing cimport DNSOutgoing +from ._services.registry cimport ServiceRegistry +from ._transport cimport _WrappedTransport +from ._updates cimport RecordUpdateListener + + +cdef void async_send_with_transport( + bint log_debug, + _WrappedTransport transport, + object packet, + object packet_num, + DNSOutgoing out, + object addr, + object port, + tuple v6_flow_scope +) + +cdef class Zeroconf: + + cdef public bint done + cdef public bint unicast + cdef public object engine + cdef public dict browsers + cdef public ServiceRegistry registry + cdef public DNSCache cache + cdef public QuestionHistory question_history + cdef public QueryHandler query_handler + cdef public object record_manager + cdef public set _notify_futures + cdef public object loop + cdef public object _loop_thread + cdef public MulticastOutgoingQueue _out_queue + cdef public MulticastOutgoingQueue _out_delay_queue + + cdef bint _debug_enabled(self) + + @cython.locals(first_packet=DNSIncoming) + cpdef handle_assembled_query( + self, + list packets, + object addr, + object port, + _WrappedTransport transport, + tuple v6_flow_scope + ) + + @cython.locals(max_size="unsigned int") + cpdef _async_send( + self, + DNSOutgoing out, + object addr, + object port, + tuple v6_flow_scope, + _WrappedTransport transport + ) diff --git a/src/zeroconf/_core.py b/src/zeroconf/_core.py index 5827e2d5b..9b2e21287 100644 --- a/src/zeroconf/_core.py +++ b/src/zeroconf/_core.py @@ -102,16 +102,20 @@ _REGISTER_BROADCASTS = 3 +_str = str +_int = int +_bytes = bytes + def async_send_with_transport( log_debug: bool, transport: _WrappedTransport, - packet: bytes, - packet_num: int, + packet: _bytes, + packet_num: _int, out: DNSOutgoing, - addr: Optional[str], - port: int, - v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (), + addr: Optional[_str], + port: _int, + v6_flow_scope: Union[Tuple[()], Tuple[int, int]], ) -> None: ipv6_socket = transport.is_ipv6 if addr is None: @@ -140,7 +144,7 @@ def async_send_with_transport( transport.transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope)) -class Zeroconf(QuietLogger): +class Zeroconf: """Implementation of Zeroconf Multicast DNS Service Discovery @@ -561,17 +565,11 @@ def async_remove_listener(self, listener: RecordUpdateListener) -> None: """ self.record_manager.async_remove_listener(listener) - def handle_response(self, msg: DNSIncoming) -> None: - """Deal with incoming response packets. All answers - are held in the cache, and listeners are notified.""" - self.log_warning_once("handle_response is deprecated, use record_manager.async_updates_from_response") - self.record_manager.async_updates_from_response(msg) - def handle_assembled_query( self, packets: List[DNSIncoming], - addr: str, - port: int, + addr: _str, + port: _int, transport: _WrappedTransport, v6_flow_scope: Union[Tuple[()], Tuple[int, int]], ) -> None: @@ -587,17 +585,20 @@ def handle_assembled_query( question_answers = self.query_handler.async_response(packets, ucast_source) if not question_answers: return - now = packets[0].now + first_packet = packets[0] + now = first_packet.now if question_answers.ucast: - questions = packets[0].questions - id_ = packets[0].id + questions = first_packet.questions + id_ = first_packet.id out = construct_outgoing_unicast_answers(question_answers.ucast, ucast_source, questions, id_) # When sending unicast, only send back the reply # via the same socket that it was recieved from # as we know its reachable from that socket - self.async_send(out, addr, port, v6_flow_scope, transport) + self._async_send(out, addr, port, v6_flow_scope, transport) if question_answers.mcast_now: - self.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now)) + self._async_send( + construct_outgoing_multicast_answers(question_answers.mcast_now), None, _MDNS_PORT, (), None + ) if question_answers.mcast_aggregate: self._out_queue.async_add(now, question_answers.mcast_aggregate) if question_answers.mcast_aggregate_last_second: @@ -616,7 +617,10 @@ def send( ) -> None: """Sends an outgoing packet threadsafe.""" assert self.loop is not None - self.loop.call_soon_threadsafe(self.async_send, out, addr, port, v6_flow_scope, transport) + self.loop.call_soon_threadsafe(self._async_send, out, addr, port, v6_flow_scope, transport) + + def _debug_enabled(self) -> bool: + return log.isEnabledFor(logging.DEBUG) def async_send( self, @@ -625,6 +629,17 @@ def async_send( port: int = _MDNS_PORT, v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (), transport: Optional[_WrappedTransport] = None, + ) -> None: + """Sends an outgoing packet.""" + self._async_send(out, addr, port, v6_flow_scope, transport) + + def _async_send( + self, + out: DNSOutgoing, + addr: Optional[str], + port: _int, + v6_flow_scope: Union[Tuple[()], Tuple[int, int]], + transport: Optional[_WrappedTransport], ) -> None: """Sends an outgoing packet.""" if self.done: @@ -633,11 +648,14 @@ def async_send( # If no transport is specified, we send to all the ones # with the same address family transports = [transport] if transport else self.engine.senders - log_debug = log.isEnabledFor(logging.DEBUG) + log_debug = self._debug_enabled() + max_size = _MAX_MSG_ABSOLUTE for packet_num, packet in enumerate(out.packets()): - if len(packet) > _MAX_MSG_ABSOLUTE: - self.log_warning_once("Dropping %r over-sized packet (%d bytes) %r", out, len(packet), packet) + if len(packet) > max_size: + QuietLogger.log_warning_once( + "Dropping %r over-sized packet (%d bytes) %r", out, len(packet), packet + ) return for send_transport in transports: async_send_with_transport( diff --git a/src/zeroconf/_handlers/query_handler.pxd b/src/zeroconf/_handlers/query_handler.pxd index 3e726a533..7c457ed61 100644 --- a/src/zeroconf/_handlers/query_handler.pxd +++ b/src/zeroconf/_handlers/query_handler.pxd @@ -93,7 +93,7 @@ cdef class QueryHandler: is_probe=object, now=double ) - cpdef async_response(self, cython.list msgs, cython.bint unicast_source) + cpdef QuestionAnswers async_response(self, cython.list msgs, cython.bint unicast_source) @cython.locals(name=str, question_lower_name=str) cdef _get_answer_strategies(self, DNSQuestion question) diff --git a/src/zeroconf/_transport.pxd b/src/zeroconf/_transport.pxd new file mode 100644 index 000000000..2a26f40e4 --- /dev/null +++ b/src/zeroconf/_transport.pxd @@ -0,0 +1,11 @@ + +import cython + + +cdef class _WrappedTransport: + + cdef public object transport + cdef public bint is_ipv6 + cdef public object socket + cdef public object fileno + cdef public tuple sock_name diff --git a/src/zeroconf/_transport.py b/src/zeroconf/_transport.py index 7f6d7ac8c..c37af2efd 100644 --- a/src/zeroconf/_transport.py +++ b/src/zeroconf/_transport.py @@ -22,7 +22,7 @@ import asyncio import socket -from typing import Any +from typing import Tuple class _WrappedTransport: @@ -42,7 +42,7 @@ def __init__( is_ipv6: bool, sock: socket.socket, fileno: int, - sock_name: Any, + sock_name: Tuple, ) -> None: """Initialize the wrapped transport. From d5f957190800ced4532cb62a4b9d0872b3f98d17 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 17:44:08 -1000 Subject: [PATCH 02/12] fix: lint --- src/zeroconf/_core.pxd | 4 ++++ tests/test_asyncio.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/zeroconf/_core.pxd b/src/zeroconf/_core.pxd index ffb69fd1a..069070c96 100644 --- a/src/zeroconf/_core.pxd +++ b/src/zeroconf/_core.pxd @@ -7,6 +7,10 @@ cdef object _MDNS_ADDR6,_MDNS_ADDR from ._cache cimport DNSCache +from ._handlers.answers cimport ( + construct_outgoing_multicast_answers, + construct_outgoing_unicast_answers, +) from ._handlers.multicast_outgoing_queue cimport MulticastOutgoingQueue from ._handlers.query_handler cimport QueryHandler, QuestionAnswers from ._history cimport QuestionHistory diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index d4594788b..c9c43e541 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -1246,7 +1246,7 @@ def update_service(self, zc, type_, name) -> None: # type: ignore[no-untyped-de ) zc.record_manager.async_updates_from_response(DNSIncoming(generated.packets()[0])) - zc.handle_response(DNSIncoming(generated.packets()[0])) + zc.record_manager.async_updates_from_response(DNSIncoming(generated.packets()[0])) await browser.async_cancel() await asyncio.sleep(0) From ff33a7ad2183b19193e1c33d5b5663e10108e5c1 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 17:56:08 -1000 Subject: [PATCH 03/12] fix: reverts --- src/zeroconf/_core.pxd | 10 ---------- src/zeroconf/_core.py | 19 +++---------------- 2 files changed, 3 insertions(+), 26 deletions(-) diff --git a/src/zeroconf/_core.pxd b/src/zeroconf/_core.pxd index 069070c96..244b8cea5 100644 --- a/src/zeroconf/_core.pxd +++ b/src/zeroconf/_core.pxd @@ -61,13 +61,3 @@ cdef class Zeroconf: _WrappedTransport transport, tuple v6_flow_scope ) - - @cython.locals(max_size="unsigned int") - cpdef _async_send( - self, - DNSOutgoing out, - object addr, - object port, - tuple v6_flow_scope, - _WrappedTransport transport - ) diff --git a/src/zeroconf/_core.py b/src/zeroconf/_core.py index 9b2e21287..4799e97ad 100644 --- a/src/zeroconf/_core.py +++ b/src/zeroconf/_core.py @@ -594,11 +594,9 @@ def handle_assembled_query( # When sending unicast, only send back the reply # via the same socket that it was recieved from # as we know its reachable from that socket - self._async_send(out, addr, port, v6_flow_scope, transport) + self.async_send(out, addr, port, v6_flow_scope, transport) if question_answers.mcast_now: - self._async_send( - construct_outgoing_multicast_answers(question_answers.mcast_now), None, _MDNS_PORT, (), None - ) + self.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now)) if question_answers.mcast_aggregate: self._out_queue.async_add(now, question_answers.mcast_aggregate) if question_answers.mcast_aggregate_last_second: @@ -617,7 +615,7 @@ def send( ) -> None: """Sends an outgoing packet threadsafe.""" assert self.loop is not None - self.loop.call_soon_threadsafe(self._async_send, out, addr, port, v6_flow_scope, transport) + self.loop.call_soon_threadsafe(self.async_send, out, addr, port, v6_flow_scope, transport) def _debug_enabled(self) -> bool: return log.isEnabledFor(logging.DEBUG) @@ -629,17 +627,6 @@ def async_send( port: int = _MDNS_PORT, v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (), transport: Optional[_WrappedTransport] = None, - ) -> None: - """Sends an outgoing packet.""" - self._async_send(out, addr, port, v6_flow_scope, transport) - - def _async_send( - self, - out: DNSOutgoing, - addr: Optional[str], - port: _int, - v6_flow_scope: Union[Tuple[()], Tuple[int, int]], - transport: Optional[_WrappedTransport], ) -> None: """Sends an outgoing packet.""" if self.done: From d08b6c7fb52fe8f9bba5a008708b35b04fac52a9 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 18:17:50 -1000 Subject: [PATCH 04/12] feat: relo --- build_ext.py | 1 - src/zeroconf/_core.pxd | 63 ------------------ src/zeroconf/_core.py | 82 +++++------------------- src/zeroconf/_handlers/query_handler.pxd | 22 ++++++- src/zeroconf/_handlers/query_handler.py | 71 +++++++++++++++++--- src/zeroconf/_listener.pxd | 2 + src/zeroconf/_listener.py | 4 +- tests/test_asyncio.py | 2 +- 8 files changed, 105 insertions(+), 142 deletions(-) delete mode 100644 src/zeroconf/_core.pxd diff --git a/build_ext.py b/build_ext.py index da0dc3437..34aaa9d60 100644 --- a/build_ext.py +++ b/build_ext.py @@ -25,7 +25,6 @@ def build(setup_kwargs: Any) -> None: [ "src/zeroconf/_dns.py", "src/zeroconf/_cache.py", - "src/zeroconf/_core.py", "src/zeroconf/_history.py", "src/zeroconf/_record_update.py", "src/zeroconf/_listener.py", diff --git a/src/zeroconf/_core.pxd b/src/zeroconf/_core.pxd deleted file mode 100644 index 244b8cea5..000000000 --- a/src/zeroconf/_core.pxd +++ /dev/null @@ -1,63 +0,0 @@ - -import cython - - -cdef bint TYPE_CHECKING -cdef object _MDNS_ADDR6,_MDNS_ADDR - - -from ._cache cimport DNSCache -from ._handlers.answers cimport ( - construct_outgoing_multicast_answers, - construct_outgoing_unicast_answers, -) -from ._handlers.multicast_outgoing_queue cimport MulticastOutgoingQueue -from ._handlers.query_handler cimport QueryHandler, QuestionAnswers -from ._history cimport QuestionHistory -from ._listener cimport AsyncListener -from ._protocol.incoming cimport DNSIncoming -from ._protocol.outgoing cimport DNSOutgoing -from ._services.registry cimport ServiceRegistry -from ._transport cimport _WrappedTransport -from ._updates cimport RecordUpdateListener - - -cdef void async_send_with_transport( - bint log_debug, - _WrappedTransport transport, - object packet, - object packet_num, - DNSOutgoing out, - object addr, - object port, - tuple v6_flow_scope -) - -cdef class Zeroconf: - - cdef public bint done - cdef public bint unicast - cdef public object engine - cdef public dict browsers - cdef public ServiceRegistry registry - cdef public DNSCache cache - cdef public QuestionHistory question_history - cdef public QueryHandler query_handler - cdef public object record_manager - cdef public set _notify_futures - cdef public object loop - cdef public object _loop_thread - cdef public MulticastOutgoingQueue _out_queue - cdef public MulticastOutgoingQueue _out_delay_queue - - cdef bint _debug_enabled(self) - - @cython.locals(first_packet=DNSIncoming) - cpdef handle_assembled_query( - self, - list packets, - object addr, - object port, - _WrappedTransport transport, - tuple v6_flow_scope - ) diff --git a/src/zeroconf/_core.py b/src/zeroconf/_core.py index 4799e97ad..3a3381a91 100644 --- a/src/zeroconf/_core.py +++ b/src/zeroconf/_core.py @@ -31,10 +31,6 @@ from ._dns import DNSQuestion, DNSQuestionType from ._engine import AsyncEngine from ._exceptions import NonUniqueNameException, NotRunningException -from ._handlers.answers import ( - construct_outgoing_multicast_answers, - construct_outgoing_unicast_answers, -) from ._handlers.multicast_outgoing_queue import MulticastOutgoingQueue from ._handlers.query_handler import QueryHandler from ._handlers.record_manager import RecordManager @@ -102,20 +98,16 @@ _REGISTER_BROADCASTS = 3 -_str = str -_int = int -_bytes = bytes - def async_send_with_transport( log_debug: bool, transport: _WrappedTransport, - packet: _bytes, - packet_num: _int, + packet: bytes, + packet_num: int, out: DNSOutgoing, - addr: Optional[_str], - port: _int, - v6_flow_scope: Union[Tuple[()], Tuple[int, int]], + addr: Optional[str], + port: int, + v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (), ) -> None: ipv6_socket = transport.is_ipv6 if addr is None: @@ -144,7 +136,7 @@ def async_send_with_transport( transport.transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope)) -class Zeroconf: +class Zeroconf(QuietLogger): """Implementation of Zeroconf Multicast DNS Service Discovery @@ -191,15 +183,15 @@ def __init__( self.registry = ServiceRegistry() self.cache = DNSCache() self.question_history = QuestionHistory() - self.query_handler = QueryHandler(self.registry, self.cache, self.question_history) + self.query_handler = QueryHandler(self) self.record_manager = RecordManager(self) self._notify_futures: Set[asyncio.Future] = set() self.loop: Optional[asyncio.AbstractEventLoop] = None self._loop_thread: Optional[threading.Thread] = None - self._out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY) - self._out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY) + self.out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY) + self.out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY) self.start() @@ -565,45 +557,11 @@ def async_remove_listener(self, listener: RecordUpdateListener) -> None: """ self.record_manager.async_remove_listener(listener) - def handle_assembled_query( - self, - packets: List[DNSIncoming], - addr: _str, - port: _int, - transport: _WrappedTransport, - v6_flow_scope: Union[Tuple[()], Tuple[int, int]], - ) -> None: - """Respond to a (re)assembled query. - - If the protocol received packets with the TC bit set, it will - wait a bit for the rest of the packets and only call - handle_assembled_query once it has a complete set of packets - or the timer expires. If the TC bit is not set, a single - packet will be in packets. - """ - ucast_source = port != _MDNS_PORT - question_answers = self.query_handler.async_response(packets, ucast_source) - if not question_answers: - return - first_packet = packets[0] - now = first_packet.now - if question_answers.ucast: - questions = first_packet.questions - id_ = first_packet.id - out = construct_outgoing_unicast_answers(question_answers.ucast, ucast_source, questions, id_) - # When sending unicast, only send back the reply - # via the same socket that it was recieved from - # as we know its reachable from that socket - self.async_send(out, addr, port, v6_flow_scope, transport) - if question_answers.mcast_now: - self.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now)) - if question_answers.mcast_aggregate: - self._out_queue.async_add(now, question_answers.mcast_aggregate) - if question_answers.mcast_aggregate_last_second: - # https://datatracker.ietf.org/doc/html/rfc6762#section-14 - # If we broadcast it in the last second, we have to delay - # at least a second before we send it again - self._out_delay_queue.async_add(now, question_answers.mcast_aggregate_last_second) + def handle_response(self, msg: DNSIncoming) -> None: + """Deal with incoming response packets. All answers + are held in the cache, and listeners are notified.""" + self.log_warning_once("handle_response is deprecated, use record_manager.async_updates_from_response") + self.record_manager.async_updates_from_response(msg) def send( self, @@ -617,9 +575,6 @@ def send( assert self.loop is not None self.loop.call_soon_threadsafe(self.async_send, out, addr, port, v6_flow_scope, transport) - def _debug_enabled(self) -> bool: - return log.isEnabledFor(logging.DEBUG) - def async_send( self, out: DNSOutgoing, @@ -635,14 +590,11 @@ def async_send( # If no transport is specified, we send to all the ones # with the same address family transports = [transport] if transport else self.engine.senders - log_debug = self._debug_enabled() - max_size = _MAX_MSG_ABSOLUTE + log_debug = log.isEnabledFor(logging.DEBUG) for packet_num, packet in enumerate(out.packets()): - if len(packet) > max_size: - QuietLogger.log_warning_once( - "Dropping %r over-sized packet (%d bytes) %r", out, len(packet), packet - ) + if len(packet) > _MAX_MSG_ABSOLUTE: + self.log_warning_once("Dropping %r over-sized packet (%d bytes) %r", out, len(packet), packet) return for send_transport in transports: async_send_with_transport( diff --git a/src/zeroconf/_handlers/query_handler.pxd b/src/zeroconf/_handlers/query_handler.pxd index 7c457ed61..38fbf7518 100644 --- a/src/zeroconf/_handlers/query_handler.pxd +++ b/src/zeroconf/_handlers/query_handler.pxd @@ -7,7 +7,12 @@ from .._history cimport QuestionHistory from .._protocol.incoming cimport DNSIncoming from .._services.info cimport ServiceInfo from .._services.registry cimport ServiceRegistry -from .answers cimport QuestionAnswers +from .answers cimport ( + QuestionAnswers, + construct_outgoing_multicast_answers, + construct_outgoing_unicast_answers, +) +from .multicast_outgoing_queue cimport MulticastOutgoingQueue cdef bint TYPE_CHECKING @@ -65,6 +70,7 @@ cdef class _QueryResponse: cdef class QueryHandler: + cdef object zc cdef ServiceRegistry registry cdef DNSCache cache cdef QuestionHistory question_history @@ -97,3 +103,17 @@ cdef class QueryHandler: @cython.locals(name=str, question_lower_name=str) cdef _get_answer_strategies(self, DNSQuestion question) + + @cython.locals( + first_packet=DNSIncoming, + out_queue=MulticastOutgoingQueue, + out_delay_queue=MulticastOutgoingQueue + ) + cpdef handle_assembled_query( + self, + list packets, + object addr, + object port, + _WrappedTransport transport, + tuple v6_flow_scope + ) diff --git a/src/zeroconf/_handlers/query_handler.py b/src/zeroconf/_handlers/query_handler.py index c66d9c302..30d32c0bb 100644 --- a/src/zeroconf/_handlers/query_handler.py +++ b/src/zeroconf/_handlers/query_handler.py @@ -20,14 +20,14 @@ USA """ -from typing import TYPE_CHECKING, List, Optional, Set, cast +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union, cast from .._cache import DNSCache, _UniqueRecordsType +from .._core import _MDNS_PORT from .._dns import DNSAddress, DNSPointer, DNSQuestion, DNSRecord, DNSRRSet -from .._history import QuestionHistory from .._protocol.incoming import DNSIncoming from .._services.info import ServiceInfo -from .._services.registry import ServiceRegistry +from .._transport import _WrappedTransport from .._utils.net import IPVersion from ..const import ( _ADDRESS_RECORD_TYPES, @@ -43,7 +43,12 @@ _TYPE_SRV, _TYPE_TXT, ) -from .answers import QuestionAnswers, _AnswerWithAdditionalsType +from .answers import ( + QuestionAnswers, + _AnswerWithAdditionalsType, + construct_outgoing_multicast_answers, + construct_outgoing_unicast_answers, +) _RESPOND_IMMEDIATE_TYPES = {_TYPE_NSEC, _TYPE_SRV, *_ADDRESS_RECORD_TYPES} @@ -53,7 +58,7 @@ _IPVersion_ALL = IPVersion.All _int = int - +_str = str _ANSWER_STRATEGY_SERVICE_TYPE_ENUMERATION = 0 _ANSWER_STRATEGY_POINTER = 1 @@ -61,6 +66,9 @@ _ANSWER_STRATEGY_SERVICE = 3 _ANSWER_STRATEGY_TEXT = 4 +if TYPE_CHECKING: + from .._core import Zeroconf + class _AnswerStrategy: @@ -183,13 +191,14 @@ def _has_mcast_record_in_last_second(self, record: DNSRecord) -> bool: class QueryHandler: """Query the ServiceRegistry.""" - __slots__ = ("registry", "cache", "question_history") + __slots__ = ("zc", "registry", "cache", "question_history") - def __init__(self, registry: ServiceRegistry, cache: DNSCache, question_history: QuestionHistory) -> None: + def __init__(self, zc: 'Zeroconf') -> None: """Init the query handler.""" - self.registry = registry - self.cache = cache - self.question_history = question_history + self.zc = zc + self.registry = zc.registry + self.cache = zc.cache + self.question_history = zc.question_history def _add_service_type_enumeration_query_answers( self, types: List[str], answer_set: _AnswerWithAdditionalsType, known_answers: DNSRRSet @@ -385,3 +394,45 @@ def _get_answer_strategies( ) return strategies + + def handle_assembled_query( + self, + packets: List[DNSIncoming], + addr: _str, + port: _int, + transport: _WrappedTransport, + v6_flow_scope: Union[Tuple[()], Tuple[int, int]], + ) -> None: + """Respond to a (re)assembled query. + + If the protocol recieved packets with the TC bit set, it will + wait a bit for the rest of the packets and only call + handle_assembled_query once it has a complete set of packets + or the timer expires. If the TC bit is not set, a single + packet will be in packets. + """ + first_packet = packets[0] + now = first_packet.now + ucast_source = port != _MDNS_PORT + question_answers = self.async_response(packets, ucast_source) + if not question_answers: + return + if question_answers.ucast: + questions = first_packet.questions + id_ = first_packet.id + out = construct_outgoing_unicast_answers(question_answers.ucast, ucast_source, questions, id_) + # When sending unicast, only send back the reply + # via the same socket that it was recieved from + # as we know its reachable from that socket + self.zc.async_send(out, addr, port, v6_flow_scope, transport) + if question_answers.mcast_now: + self.zc.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now)) + if question_answers.mcast_aggregate: + out_queue = self.zc.out_queue + out_queue.async_add(now, question_answers.mcast_aggregate) + if question_answers.mcast_aggregate_last_second: + # https://datatracker.ietf.org/doc/html/rfc6762#section-14 + # If we broadcast it in the last second, we have to delay + # at least a second before we send it again + out_delay_queue = self.zc.out_delay_queue + out_delay_queue.async_add(now, question_answers.mcast_aggregate_last_second) diff --git a/src/zeroconf/_listener.pxd b/src/zeroconf/_listener.pxd index 8b144653e..96f52be02 100644 --- a/src/zeroconf/_listener.pxd +++ b/src/zeroconf/_listener.pxd @@ -1,6 +1,7 @@ import cython +from ._handlers.query_handler cimport QueryHandler from ._handlers.record_manager cimport RecordManager from ._protocol.incoming cimport DNSIncoming from ._services.registry cimport ServiceRegistry @@ -21,6 +22,7 @@ cdef class AsyncListener: cdef public object zc cdef ServiceRegistry _registry cdef RecordManager _record_manager + cdef QueryHandler _query_handler cdef public cython.bytes data cdef public double last_time cdef public DNSIncoming last_message diff --git a/src/zeroconf/_listener.py b/src/zeroconf/_listener.py index 23d245785..0f8a8cac7 100644 --- a/src/zeroconf/_listener.py +++ b/src/zeroconf/_listener.py @@ -59,6 +59,7 @@ class AsyncListener: 'zc', '_registry', '_record_manager', + "_query_handler", 'data', 'last_time', 'last_message', @@ -72,6 +73,7 @@ def __init__(self, zc: 'Zeroconf') -> None: self.zc = zc self._registry = zc.registry self._record_manager = zc.record_manager + self._query_handler = zc.query_handler self.data: Optional[bytes] = None self.last_time: float = 0 self.last_message: Optional[DNSIncoming] = None @@ -228,7 +230,7 @@ def _respond_query( if msg: packets.append(msg) - self.zc.handle_assembled_query(packets, addr, port, transport, v6_flow_scope) + self._query_handler.handle_assembled_query(packets, addr, port, transport, v6_flow_scope) def error_received(self, exc: Exception) -> None: """Likely socket closed or IPv6.""" diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index c9c43e541..d4594788b 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -1246,7 +1246,7 @@ def update_service(self, zc, type_, name) -> None: # type: ignore[no-untyped-de ) zc.record_manager.async_updates_from_response(DNSIncoming(generated.packets()[0])) - zc.record_manager.async_updates_from_response(DNSIncoming(generated.packets()[0])) + zc.handle_response(DNSIncoming(generated.packets()[0])) await browser.async_cancel() await asyncio.sleep(0) From f1054619bde1e60eca98fe3dbae0a0c61ba314cb Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 18:19:27 -1000 Subject: [PATCH 05/12] fix: remove transport pxd --- build_ext.py | 1 - src/zeroconf/_transport.pxd | 11 ----------- 2 files changed, 12 deletions(-) delete mode 100644 src/zeroconf/_transport.pxd diff --git a/build_ext.py b/build_ext.py index 34aaa9d60..0f02f53a4 100644 --- a/build_ext.py +++ b/build_ext.py @@ -38,7 +38,6 @@ def build(setup_kwargs: Any) -> None: "src/zeroconf/_services/browser.py", "src/zeroconf/_services/info.py", "src/zeroconf/_services/registry.py", - "src/zeroconf/_transport.py", "src/zeroconf/_updates.py", "src/zeroconf/_utils/ipaddress.py", "src/zeroconf/_utils/time.py", diff --git a/src/zeroconf/_transport.pxd b/src/zeroconf/_transport.pxd deleted file mode 100644 index 2a26f40e4..000000000 --- a/src/zeroconf/_transport.pxd +++ /dev/null @@ -1,11 +0,0 @@ - -import cython - - -cdef class _WrappedTransport: - - cdef public object transport - cdef public bint is_ipv6 - cdef public object socket - cdef public object fileno - cdef public tuple sock_name From 7789cd26bae3684774591317926f04f76d3ea51a Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 18:20:11 -1000 Subject: [PATCH 06/12] fix: remove transport pxd --- src/zeroconf/_handlers/query_handler.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zeroconf/_handlers/query_handler.pxd b/src/zeroconf/_handlers/query_handler.pxd index 38fbf7518..5ac21ece5 100644 --- a/src/zeroconf/_handlers/query_handler.pxd +++ b/src/zeroconf/_handlers/query_handler.pxd @@ -114,6 +114,6 @@ cdef class QueryHandler: list packets, object addr, object port, - _WrappedTransport transport, + object transport, tuple v6_flow_scope ) From 3493b520ef57a13388b977d62f62a3c83028a145 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 18:21:00 -1000 Subject: [PATCH 07/12] fix: tweak --- src/zeroconf/_handlers/query_handler.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zeroconf/_handlers/query_handler.pxd b/src/zeroconf/_handlers/query_handler.pxd index 5ac21ece5..adbb450d6 100644 --- a/src/zeroconf/_handlers/query_handler.pxd +++ b/src/zeroconf/_handlers/query_handler.pxd @@ -109,7 +109,7 @@ cdef class QueryHandler: out_queue=MulticastOutgoingQueue, out_delay_queue=MulticastOutgoingQueue ) - cpdef handle_assembled_query( + cpdef void handle_assembled_query( self, list packets, object addr, From 717b5dcf9c144868df2e2920d4b304f964da7dab Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 18:29:56 -1000 Subject: [PATCH 08/12] fix: cleanup related types --- src/zeroconf/_handlers/query_handler.pxd | 1 + src/zeroconf/_handlers/record_manager.pxd | 13 ++++++------- src/zeroconf/_handlers/record_manager.py | 5 ++--- src/zeroconf/_protocol/incoming.pxd | 2 +- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/zeroconf/_handlers/query_handler.pxd b/src/zeroconf/_handlers/query_handler.pxd index adbb450d6..bb7198be5 100644 --- a/src/zeroconf/_handlers/query_handler.pxd +++ b/src/zeroconf/_handlers/query_handler.pxd @@ -106,6 +106,7 @@ cdef class QueryHandler: @cython.locals( first_packet=DNSIncoming, + ucast_source=bint, out_queue=MulticastOutgoingQueue, out_delay_queue=MulticastOutgoingQueue ) diff --git a/src/zeroconf/_handlers/record_manager.pxd b/src/zeroconf/_handlers/record_manager.pxd index 0f543aff2..5be2c283b 100644 --- a/src/zeroconf/_handlers/record_manager.pxd +++ b/src/zeroconf/_handlers/record_manager.pxd @@ -22,22 +22,21 @@ cdef class RecordManager: cdef public DNSCache cache cdef public cython.set listeners - cpdef async_updates(self, object now, object records) + cpdef void async_updates(self, object now, object records) - cpdef async_updates_complete(self, object notify) + cpdef void async_updates_complete(self, bint notify) @cython.locals( cache=DNSCache, record=DNSRecord, answers=cython.list, maybe_entry=DNSRecord, - now_double=double ) - cpdef async_updates_from_response(self, DNSIncoming msg) + cpdef void async_updates_from_response(self, DNSIncoming msg) - cpdef async_add_listener(self, RecordUpdateListener listener, object question) + cpdef void async_add_listener(self, RecordUpdateListener listener, object question) - cpdef async_remove_listener(self, RecordUpdateListener listener) + cpdef void async_remove_listener(self, RecordUpdateListener listener) @cython.locals(question=DNSQuestion, record=DNSRecord) - cdef _async_update_matching_records(self, RecordUpdateListener listener, cython.list questions) + cdef void _async_update_matching_records(self, RecordUpdateListener listener, cython.list questions) diff --git a/src/zeroconf/_handlers/record_manager.py b/src/zeroconf/_handlers/record_manager.py index 129acd0b6..0a0f6c54b 100644 --- a/src/zeroconf/_handlers/record_manager.py +++ b/src/zeroconf/_handlers/record_manager.py @@ -84,7 +84,6 @@ def async_updates_from_response(self, msg: DNSIncoming) -> None: other_adds: List[DNSRecord] = [] removes: Set[DNSRecord] = set() now = msg.now - now_double = now unique_types: Set[Tuple[str, int, int]] = set() cache = self.cache answers = msg.answers() @@ -113,7 +112,7 @@ def async_updates_from_response(self, msg: DNSIncoming) -> None: record = cast(_UniqueRecordsType, record) maybe_entry = cache.async_get_unique(record) - if not record.is_expired(now_double): + if not record.is_expired(now): if maybe_entry is not None: maybe_entry.reset_ttl(record) else: @@ -129,7 +128,7 @@ def async_updates_from_response(self, msg: DNSIncoming) -> None: removes.add(record) if unique_types: - cache.async_mark_unique_records_older_than_1s_to_expire(unique_types, answers, now_double) + cache.async_mark_unique_records_older_than_1s_to_expire(unique_types, answers, now) if updates: self.async_updates(now, updates) diff --git a/src/zeroconf/_protocol/incoming.pxd b/src/zeroconf/_protocol/incoming.pxd index 07ae6e78e..a8c0dbdb8 100644 --- a/src/zeroconf/_protocol/incoming.pxd +++ b/src/zeroconf/_protocol/incoming.pxd @@ -56,7 +56,7 @@ cdef class DNSIncoming: cdef cython.uint _num_authorities cdef cython.uint _num_additionals cdef public bint valid - cdef public object now + cdef public double now cdef public object scope_id cdef public object source cdef bint _has_qu_question From 9a7f6cdbf1c1ab393ba3806c3de546cb6823392f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 18:33:55 -1000 Subject: [PATCH 09/12] fix: imports --- src/zeroconf/_handlers/query_handler.py | 2 +- tests/conftest.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/zeroconf/_handlers/query_handler.py b/src/zeroconf/_handlers/query_handler.py index 30d32c0bb..8349b584b 100644 --- a/src/zeroconf/_handlers/query_handler.py +++ b/src/zeroconf/_handlers/query_handler.py @@ -23,7 +23,6 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union, cast from .._cache import DNSCache, _UniqueRecordsType -from .._core import _MDNS_PORT from .._dns import DNSAddress, DNSPointer, DNSQuestion, DNSRecord, DNSRRSet from .._protocol.incoming import DNSIncoming from .._services.info import ServiceInfo @@ -33,6 +32,7 @@ _ADDRESS_RECORD_TYPES, _CLASS_IN, _DNS_OTHER_TTL, + _MDNS_PORT, _ONE_SECOND, _SERVICE_TYPE_ENUMERATION_NAME, _TYPE_A, diff --git a/tests/conftest.py b/tests/conftest.py index c0e926a34..48827ad10 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,7 +8,8 @@ import pytest -from zeroconf import _core, const +from zeroconf import const +from zeroconf._handlers import query_handler @pytest.fixture(autouse=True) @@ -23,7 +24,7 @@ def verify_threads_ended(): @pytest.fixture def run_isolated(): """Change the mDNS port to run the test in isolation.""" - with patch.object(_core, "_MDNS_PORT", 5454), patch.object(const, "_MDNS_PORT", 5454): + with patch.object(query_handler, "_MDNS_PORT", 5454), patch.object(const, "_MDNS_PORT", 5454): yield From c4d649b66e4a72eca6fe4894ac4e640429fc5f1f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 18:34:20 -1000 Subject: [PATCH 10/12] Revert "fix: imports" This reverts commit 9a7f6cdbf1c1ab393ba3806c3de546cb6823392f. --- src/zeroconf/_handlers/query_handler.py | 2 +- tests/conftest.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/zeroconf/_handlers/query_handler.py b/src/zeroconf/_handlers/query_handler.py index 8349b584b..30d32c0bb 100644 --- a/src/zeroconf/_handlers/query_handler.py +++ b/src/zeroconf/_handlers/query_handler.py @@ -23,6 +23,7 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union, cast from .._cache import DNSCache, _UniqueRecordsType +from .._core import _MDNS_PORT from .._dns import DNSAddress, DNSPointer, DNSQuestion, DNSRecord, DNSRRSet from .._protocol.incoming import DNSIncoming from .._services.info import ServiceInfo @@ -32,7 +33,6 @@ _ADDRESS_RECORD_TYPES, _CLASS_IN, _DNS_OTHER_TTL, - _MDNS_PORT, _ONE_SECOND, _SERVICE_TYPE_ENUMERATION_NAME, _TYPE_A, diff --git a/tests/conftest.py b/tests/conftest.py index 48827ad10..c0e926a34 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,8 +8,7 @@ import pytest -from zeroconf import const -from zeroconf._handlers import query_handler +from zeroconf import _core, const @pytest.fixture(autouse=True) @@ -24,7 +23,7 @@ def verify_threads_ended(): @pytest.fixture def run_isolated(): """Change the mDNS port to run the test in isolation.""" - with patch.object(query_handler, "_MDNS_PORT", 5454), patch.object(const, "_MDNS_PORT", 5454): + with patch.object(_core, "_MDNS_PORT", 5454), patch.object(const, "_MDNS_PORT", 5454): yield From 91c5d1d8e79634eaa1480f83e8b612cb41797753 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 18:35:58 -1000 Subject: [PATCH 11/12] fix: circular import --- src/zeroconf/_handlers/query_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zeroconf/_handlers/query_handler.py b/src/zeroconf/_handlers/query_handler.py index 30d32c0bb..8349b584b 100644 --- a/src/zeroconf/_handlers/query_handler.py +++ b/src/zeroconf/_handlers/query_handler.py @@ -23,7 +23,6 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union, cast from .._cache import DNSCache, _UniqueRecordsType -from .._core import _MDNS_PORT from .._dns import DNSAddress, DNSPointer, DNSQuestion, DNSRecord, DNSRRSet from .._protocol.incoming import DNSIncoming from .._services.info import ServiceInfo @@ -33,6 +32,7 @@ _ADDRESS_RECORD_TYPES, _CLASS_IN, _DNS_OTHER_TTL, + _MDNS_PORT, _ONE_SECOND, _SERVICE_TYPE_ENUMERATION_NAME, _TYPE_A, From 164eff5f6e1de4fdaad902202c785277e1dbb875 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Dec 2023 18:41:02 -1000 Subject: [PATCH 12/12] fix: isolation --- tests/conftest.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index c0e926a34..5525c4ee0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,7 @@ import pytest from zeroconf import _core, const +from zeroconf._handlers import query_handler @pytest.fixture(autouse=True) @@ -23,7 +24,9 @@ def verify_threads_ended(): @pytest.fixture def run_isolated(): """Change the mDNS port to run the test in isolation.""" - with patch.object(_core, "_MDNS_PORT", 5454), patch.object(const, "_MDNS_PORT", 5454): + with patch.object(query_handler, "_MDNS_PORT", 5454), patch.object( + _core, "_MDNS_PORT", 5454 + ), patch.object(const, "_MDNS_PORT", 5454): yield