Skip to content

Commit d08b6c7

Browse files
committed
feat: relo
1 parent ff33a7a commit d08b6c7

8 files changed

Lines changed: 105 additions & 142 deletions

File tree

build_ext.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ def build(setup_kwargs: Any) -> None:
2525
[
2626
"src/zeroconf/_dns.py",
2727
"src/zeroconf/_cache.py",
28-
"src/zeroconf/_core.py",
2928
"src/zeroconf/_history.py",
3029
"src/zeroconf/_record_update.py",
3130
"src/zeroconf/_listener.py",

src/zeroconf/_core.pxd

Lines changed: 0 additions & 63 deletions
This file was deleted.

src/zeroconf/_core.py

Lines changed: 17 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,6 @@
3131
from ._dns import DNSQuestion, DNSQuestionType
3232
from ._engine import AsyncEngine
3333
from ._exceptions import NonUniqueNameException, NotRunningException
34-
from ._handlers.answers import (
35-
construct_outgoing_multicast_answers,
36-
construct_outgoing_unicast_answers,
37-
)
3834
from ._handlers.multicast_outgoing_queue import MulticastOutgoingQueue
3935
from ._handlers.query_handler import QueryHandler
4036
from ._handlers.record_manager import RecordManager
@@ -102,20 +98,16 @@
10298

10399
_REGISTER_BROADCASTS = 3
104100

105-
_str = str
106-
_int = int
107-
_bytes = bytes
108-
109101

110102
def async_send_with_transport(
111103
log_debug: bool,
112104
transport: _WrappedTransport,
113-
packet: _bytes,
114-
packet_num: _int,
105+
packet: bytes,
106+
packet_num: int,
115107
out: DNSOutgoing,
116-
addr: Optional[_str],
117-
port: _int,
118-
v6_flow_scope: Union[Tuple[()], Tuple[int, int]],
108+
addr: Optional[str],
109+
port: int,
110+
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
119111
) -> None:
120112
ipv6_socket = transport.is_ipv6
121113
if addr is None:
@@ -144,7 +136,7 @@ def async_send_with_transport(
144136
transport.transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope))
145137

146138

147-
class Zeroconf:
139+
class Zeroconf(QuietLogger):
148140

149141
"""Implementation of Zeroconf Multicast DNS Service Discovery
150142
@@ -191,15 +183,15 @@ def __init__(
191183
self.registry = ServiceRegistry()
192184
self.cache = DNSCache()
193185
self.question_history = QuestionHistory()
194-
self.query_handler = QueryHandler(self.registry, self.cache, self.question_history)
186+
self.query_handler = QueryHandler(self)
195187
self.record_manager = RecordManager(self)
196188

197189
self._notify_futures: Set[asyncio.Future] = set()
198190
self.loop: Optional[asyncio.AbstractEventLoop] = None
199191
self._loop_thread: Optional[threading.Thread] = None
200192

201-
self._out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY)
202-
self._out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY)
193+
self.out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY)
194+
self.out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY)
203195

204196
self.start()
205197

@@ -565,45 +557,11 @@ def async_remove_listener(self, listener: RecordUpdateListener) -> None:
565557
"""
566558
self.record_manager.async_remove_listener(listener)
567559

568-
def handle_assembled_query(
569-
self,
570-
packets: List[DNSIncoming],
571-
addr: _str,
572-
port: _int,
573-
transport: _WrappedTransport,
574-
v6_flow_scope: Union[Tuple[()], Tuple[int, int]],
575-
) -> None:
576-
"""Respond to a (re)assembled query.
577-
578-
If the protocol received packets with the TC bit set, it will
579-
wait a bit for the rest of the packets and only call
580-
handle_assembled_query once it has a complete set of packets
581-
or the timer expires. If the TC bit is not set, a single
582-
packet will be in packets.
583-
"""
584-
ucast_source = port != _MDNS_PORT
585-
question_answers = self.query_handler.async_response(packets, ucast_source)
586-
if not question_answers:
587-
return
588-
first_packet = packets[0]
589-
now = first_packet.now
590-
if question_answers.ucast:
591-
questions = first_packet.questions
592-
id_ = first_packet.id
593-
out = construct_outgoing_unicast_answers(question_answers.ucast, ucast_source, questions, id_)
594-
# When sending unicast, only send back the reply
595-
# via the same socket that it was recieved from
596-
# as we know its reachable from that socket
597-
self.async_send(out, addr, port, v6_flow_scope, transport)
598-
if question_answers.mcast_now:
599-
self.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now))
600-
if question_answers.mcast_aggregate:
601-
self._out_queue.async_add(now, question_answers.mcast_aggregate)
602-
if question_answers.mcast_aggregate_last_second:
603-
# https://datatracker.ietf.org/doc/html/rfc6762#section-14
604-
# If we broadcast it in the last second, we have to delay
605-
# at least a second before we send it again
606-
self._out_delay_queue.async_add(now, question_answers.mcast_aggregate_last_second)
560+
def handle_response(self, msg: DNSIncoming) -> None:
561+
"""Deal with incoming response packets. All answers
562+
are held in the cache, and listeners are notified."""
563+
self.log_warning_once("handle_response is deprecated, use record_manager.async_updates_from_response")
564+
self.record_manager.async_updates_from_response(msg)
607565

608566
def send(
609567
self,
@@ -617,9 +575,6 @@ def send(
617575
assert self.loop is not None
618576
self.loop.call_soon_threadsafe(self.async_send, out, addr, port, v6_flow_scope, transport)
619577

620-
def _debug_enabled(self) -> bool:
621-
return log.isEnabledFor(logging.DEBUG)
622-
623578
def async_send(
624579
self,
625580
out: DNSOutgoing,
@@ -635,14 +590,11 @@ def async_send(
635590
# If no transport is specified, we send to all the ones
636591
# with the same address family
637592
transports = [transport] if transport else self.engine.senders
638-
log_debug = self._debug_enabled()
639-
max_size = _MAX_MSG_ABSOLUTE
593+
log_debug = log.isEnabledFor(logging.DEBUG)
640594

641595
for packet_num, packet in enumerate(out.packets()):
642-
if len(packet) > max_size:
643-
QuietLogger.log_warning_once(
644-
"Dropping %r over-sized packet (%d bytes) %r", out, len(packet), packet
645-
)
596+
if len(packet) > _MAX_MSG_ABSOLUTE:
597+
self.log_warning_once("Dropping %r over-sized packet (%d bytes) %r", out, len(packet), packet)
646598
return
647599
for send_transport in transports:
648600
async_send_with_transport(

src/zeroconf/_handlers/query_handler.pxd

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ from .._history cimport QuestionHistory
77
from .._protocol.incoming cimport DNSIncoming
88
from .._services.info cimport ServiceInfo
99
from .._services.registry cimport ServiceRegistry
10-
from .answers cimport QuestionAnswers
10+
from .answers cimport (
11+
QuestionAnswers,
12+
construct_outgoing_multicast_answers,
13+
construct_outgoing_unicast_answers,
14+
)
15+
from .multicast_outgoing_queue cimport MulticastOutgoingQueue
1116

1217

1318
cdef bint TYPE_CHECKING
@@ -65,6 +70,7 @@ cdef class _QueryResponse:
6570

6671
cdef class QueryHandler:
6772

73+
cdef object zc
6874
cdef ServiceRegistry registry
6975
cdef DNSCache cache
7076
cdef QuestionHistory question_history
@@ -97,3 +103,17 @@ cdef class QueryHandler:
97103

98104
@cython.locals(name=str, question_lower_name=str)
99105
cdef _get_answer_strategies(self, DNSQuestion question)
106+
107+
@cython.locals(
108+
first_packet=DNSIncoming,
109+
out_queue=MulticastOutgoingQueue,
110+
out_delay_queue=MulticastOutgoingQueue
111+
)
112+
cpdef handle_assembled_query(
113+
self,
114+
list packets,
115+
object addr,
116+
object port,
117+
_WrappedTransport transport,
118+
tuple v6_flow_scope
119+
)

src/zeroconf/_handlers/query_handler.py

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
USA
2121
"""
2222

23-
from typing import TYPE_CHECKING, List, Optional, Set, cast
23+
from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union, cast
2424

2525
from .._cache import DNSCache, _UniqueRecordsType
26+
from .._core import _MDNS_PORT
2627
from .._dns import DNSAddress, DNSPointer, DNSQuestion, DNSRecord, DNSRRSet
27-
from .._history import QuestionHistory
2828
from .._protocol.incoming import DNSIncoming
2929
from .._services.info import ServiceInfo
30-
from .._services.registry import ServiceRegistry
30+
from .._transport import _WrappedTransport
3131
from .._utils.net import IPVersion
3232
from ..const import (
3333
_ADDRESS_RECORD_TYPES,
@@ -43,7 +43,12 @@
4343
_TYPE_SRV,
4444
_TYPE_TXT,
4545
)
46-
from .answers import QuestionAnswers, _AnswerWithAdditionalsType
46+
from .answers import (
47+
QuestionAnswers,
48+
_AnswerWithAdditionalsType,
49+
construct_outgoing_multicast_answers,
50+
construct_outgoing_unicast_answers,
51+
)
4752

4853
_RESPOND_IMMEDIATE_TYPES = {_TYPE_NSEC, _TYPE_SRV, *_ADDRESS_RECORD_TYPES}
4954

@@ -53,14 +58,17 @@
5358
_IPVersion_ALL = IPVersion.All
5459

5560
_int = int
56-
61+
_str = str
5762

5863
_ANSWER_STRATEGY_SERVICE_TYPE_ENUMERATION = 0
5964
_ANSWER_STRATEGY_POINTER = 1
6065
_ANSWER_STRATEGY_ADDRESS = 2
6166
_ANSWER_STRATEGY_SERVICE = 3
6267
_ANSWER_STRATEGY_TEXT = 4
6368

69+
if TYPE_CHECKING:
70+
from .._core import Zeroconf
71+
6472

6573
class _AnswerStrategy:
6674

@@ -183,13 +191,14 @@ def _has_mcast_record_in_last_second(self, record: DNSRecord) -> bool:
183191
class QueryHandler:
184192
"""Query the ServiceRegistry."""
185193

186-
__slots__ = ("registry", "cache", "question_history")
194+
__slots__ = ("zc", "registry", "cache", "question_history")
187195

188-
def __init__(self, registry: ServiceRegistry, cache: DNSCache, question_history: QuestionHistory) -> None:
196+
def __init__(self, zc: 'Zeroconf') -> None:
189197
"""Init the query handler."""
190-
self.registry = registry
191-
self.cache = cache
192-
self.question_history = question_history
198+
self.zc = zc
199+
self.registry = zc.registry
200+
self.cache = zc.cache
201+
self.question_history = zc.question_history
193202

194203
def _add_service_type_enumeration_query_answers(
195204
self, types: List[str], answer_set: _AnswerWithAdditionalsType, known_answers: DNSRRSet
@@ -385,3 +394,45 @@ def _get_answer_strategies(
385394
)
386395

387396
return strategies
397+
398+
def handle_assembled_query(
399+
self,
400+
packets: List[DNSIncoming],
401+
addr: _str,
402+
port: _int,
403+
transport: _WrappedTransport,
404+
v6_flow_scope: Union[Tuple[()], Tuple[int, int]],
405+
) -> None:
406+
"""Respond to a (re)assembled query.
407+
408+
If the protocol recieved packets with the TC bit set, it will
409+
wait a bit for the rest of the packets and only call
410+
handle_assembled_query once it has a complete set of packets
411+
or the timer expires. If the TC bit is not set, a single
412+
packet will be in packets.
413+
"""
414+
first_packet = packets[0]
415+
now = first_packet.now
416+
ucast_source = port != _MDNS_PORT
417+
question_answers = self.async_response(packets, ucast_source)
418+
if not question_answers:
419+
return
420+
if question_answers.ucast:
421+
questions = first_packet.questions
422+
id_ = first_packet.id
423+
out = construct_outgoing_unicast_answers(question_answers.ucast, ucast_source, questions, id_)
424+
# When sending unicast, only send back the reply
425+
# via the same socket that it was recieved from
426+
# as we know its reachable from that socket
427+
self.zc.async_send(out, addr, port, v6_flow_scope, transport)
428+
if question_answers.mcast_now:
429+
self.zc.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now))
430+
if question_answers.mcast_aggregate:
431+
out_queue = self.zc.out_queue
432+
out_queue.async_add(now, question_answers.mcast_aggregate)
433+
if question_answers.mcast_aggregate_last_second:
434+
# https://datatracker.ietf.org/doc/html/rfc6762#section-14
435+
# If we broadcast it in the last second, we have to delay
436+
# at least a second before we send it again
437+
out_delay_queue = self.zc.out_delay_queue
438+
out_delay_queue.async_add(now, question_answers.mcast_aggregate_last_second)

src/zeroconf/_listener.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

22
import cython
33

4+
from ._handlers.query_handler cimport QueryHandler
45
from ._handlers.record_manager cimport RecordManager
56
from ._protocol.incoming cimport DNSIncoming
67
from ._services.registry cimport ServiceRegistry
@@ -21,6 +22,7 @@ cdef class AsyncListener:
2122
cdef public object zc
2223
cdef ServiceRegistry _registry
2324
cdef RecordManager _record_manager
25+
cdef QueryHandler _query_handler
2426
cdef public cython.bytes data
2527
cdef public double last_time
2628
cdef public DNSIncoming last_message

0 commit comments

Comments
 (0)