Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/zeroconf/_cache.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ cdef class DNSCache:

cpdef bint async_add_records(self, object entries)

cpdef async_remove_records(self, object entries)
cpdef void async_remove_records(self, object entries)

@cython.locals(
store=cython.dict,
)
cpdef async_get_unique(self, DNSRecord entry)
cpdef DNSRecord async_get_unique(self, DNSRecord entry)

@cython.locals(
record=DNSRecord,
)
cpdef async_expire(self, double now)
cpdef list async_expire(self, double now)

@cython.locals(
records=cython.dict,
record=DNSRecord,
)
cpdef async_all_by_details(self, str name, object type_, object class_)
cpdef list async_all_by_details(self, str name, object type_, object class_)

cpdef cython.dict async_entries_with_name(self, str name)

Expand All @@ -51,7 +51,7 @@ cdef class DNSCache:
@cython.locals(
cached_entry=DNSRecord,
)
cpdef get_by_details(self, str name, object type_, object class_)
cpdef DNSRecord get_by_details(self, str name, object type_, object class_)

@cython.locals(
records=cython.dict,
Expand All @@ -62,12 +62,12 @@ cdef class DNSCache:
@cython.locals(
store=cython.dict,
)
cdef _async_add(self, DNSRecord record)
cdef bint _async_add(self, DNSRecord record)

cdef _async_remove(self, DNSRecord record)
cdef void _async_remove(self, DNSRecord record)

@cython.locals(
record=DNSRecord,
created_double=double,
)
cpdef async_mark_unique_records_older_than_1s_to_expire(self, cython.set unique_types, object answers, double now)
cpdef void async_mark_unique_records_older_than_1s_to_expire(self, cython.set unique_types, object answers, double now)
7 changes: 4 additions & 3 deletions src/zeroconf/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,17 @@ def __init__(
self.registry = ServiceRegistry()
self.cache = DNSCache()
self.question_history = QuestionHistory()

self.out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY)
self.out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY)

self.query_handler = QueryHandler(self)
self.record_manager = RecordManager(self)

self._notify_futures: Set[asyncio.Future] = set()
self.loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_thread: Optional[threading.Thread] = None

self.out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY)
self.out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY)

self.start()

@property
Expand Down
14 changes: 7 additions & 7 deletions src/zeroconf/_handlers/answers.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ from .._protocol.outgoing cimport DNSOutgoing

cdef class QuestionAnswers:

cdef public object ucast
cdef public object mcast_now
cdef public object mcast_aggregate
cdef public object mcast_aggregate_last_second
cdef public dict ucast
cdef public dict mcast_now
cdef public dict mcast_aggregate
cdef public dict mcast_aggregate_last_second


cdef class AnswerGroup:
Expand All @@ -25,11 +25,11 @@ cdef class AnswerGroup:
cdef object _FLAGS_QR_RESPONSE_AA
cdef object NAME_GETTER

cpdef construct_outgoing_multicast_answers(cython.dict answers)
cpdef DNSOutgoing construct_outgoing_multicast_answers(cython.dict answers)

cpdef construct_outgoing_unicast_answers(
cpdef DNSOutgoing construct_outgoing_unicast_answers(
cython.dict answers, bint ucast_source, cython.list questions, object id_
)

@cython.locals(answer=DNSRecord, additionals=cython.set, additional=DNSRecord)
cdef _add_answers_additionals(DNSOutgoing out, cython.dict answers)
cdef void _add_answers_additionals(DNSOutgoing out, cython.dict answers)
6 changes: 3 additions & 3 deletions src/zeroconf/_handlers/multicast_outgoing_queue.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ cdef class MulticastOutgoingQueue:
cdef object _aggregation_delay

@cython.locals(last_group=AnswerGroup, random_int=cython.uint)
cpdef async_add(self, double now, cython.dict answers)
cpdef void async_add(self, double now, cython.dict answers)

@cython.locals(pending=AnswerGroup)
cdef _remove_answers_from_queue(self, cython.dict answers)
cdef void _remove_answers_from_queue(self, cython.dict answers)

cpdef async_ready(self)
cpdef void async_ready(self)
18 changes: 9 additions & 9 deletions src/zeroconf/_handlers/query_handler.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ cdef class _QueryResponse:
cdef cython.set _mcast_aggregate_last_second

@cython.locals(record=DNSRecord)
cdef add_qu_question_response(self, cython.dict answers)
cdef void add_qu_question_response(self, cython.dict answers)

cdef add_ucast_question_response(self, cython.dict answers)
cdef void add_ucast_question_response(self, cython.dict answers)

@cython.locals(answer=DNSRecord, question=DNSQuestion)
cdef add_mcast_question_response(self, cython.dict answers)
cdef void add_mcast_question_response(self, cython.dict answers)

@cython.locals(maybe_entry=DNSRecord)
cdef bint _has_mcast_within_one_quarter_ttl(self, DNSRecord record)
Expand All @@ -74,15 +74,17 @@ cdef class QueryHandler:
cdef ServiceRegistry registry
cdef DNSCache cache
cdef QuestionHistory question_history
cdef MulticastOutgoingQueue out_queue
cdef MulticastOutgoingQueue out_delay_queue

@cython.locals(service=ServiceInfo)
cdef _add_service_type_enumeration_query_answers(self, list types, cython.dict answer_set, DNSRRSet known_answers)
cdef void _add_service_type_enumeration_query_answers(self, list types, cython.dict answer_set, DNSRRSet known_answers)

@cython.locals(service=ServiceInfo)
cdef _add_pointer_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers)
cdef void _add_pointer_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers)

@cython.locals(service=ServiceInfo, dns_address=DNSAddress)
cdef _add_address_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers, cython.uint type_)
cdef void _add_address_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers, cython.uint type_)

@cython.locals(question_lower_name=str, type_=cython.uint, service=ServiceInfo)
cdef cython.dict _answer_question(self, DNSQuestion question, unsigned int strategy_type, list types, list services, DNSRRSet known_answers)
Expand All @@ -102,13 +104,11 @@ cdef class QueryHandler:
cpdef QuestionAnswers async_response(self, cython.list msgs, cython.bint unicast_source)

@cython.locals(name=str, question_lower_name=str)
cdef _get_answer_strategies(self, DNSQuestion question)
cdef list _get_answer_strategies(self, DNSQuestion question)

@cython.locals(
first_packet=DNSIncoming,
ucast_source=bint,
out_queue=MulticastOutgoingQueue,
out_delay_queue=MulticastOutgoingQueue
)
cpdef void handle_assembled_query(
self,
Expand Down
21 changes: 10 additions & 11 deletions src/zeroconf/_handlers/query_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,16 @@ def _has_mcast_record_in_last_second(self, record: DNSRecord) -> bool:
class QueryHandler:
"""Query the ServiceRegistry."""

__slots__ = ("zc", "registry", "cache", "question_history")
__slots__ = ("zc", "registry", "cache", "question_history", "out_queue", "out_delay_queue")

def __init__(self, zc: 'Zeroconf') -> None:
"""Init the query handler."""
self.zc = zc
self.registry = zc.registry
self.cache = zc.cache
self.question_history = zc.question_history
self.out_queue = zc.out_queue
self.out_delay_queue = zc.out_delay_queue

def _add_service_type_enumeration_query_answers(
self, types: List[str], answer_set: _AnswerWithAdditionalsType, known_answers: DNSRRSet
Expand Down Expand Up @@ -301,7 +303,7 @@ def async_response( # pylint: disable=unused-argument
"""
strategies: List[_AnswerStrategy] = []
for msg in msgs:
for question in msg.questions:
for question in msg._questions:
strategies.extend(self._get_answer_strategies(question))

if not strategies:
Expand All @@ -311,7 +313,8 @@ def async_response( # pylint: disable=unused-argument
return None

is_probe = False
questions = msg.questions
msg = msgs[0]
questions = msg._questions
# Only decode known answers if we are not a probe and we have
# at least one answer strategy
answers: List[DNSRecord] = []
Expand All @@ -321,7 +324,6 @@ def async_response( # pylint: disable=unused-argument
else:
answers.extend(msg.answers())

msg = msgs[0]
query_res = _QueryResponse(self.cache, questions, is_probe, msg.now)
known_answers = DNSRRSet(answers)
known_answers_set: Optional[Set[DNSRecord]] = None
Expand Down Expand Up @@ -412,13 +414,12 @@ def handle_assembled_query(
packet will be in packets.
"""
first_packet = packets[0]
now = first_packet.now
ucast_source = port != _MDNS_PORT
question_answers = self.async_response(packets, ucast_source)
if not question_answers:
if question_answers is None:
return
if question_answers.ucast:
questions = first_packet.questions
questions = first_packet._questions
id_ = first_packet.id
out = construct_outgoing_unicast_answers(question_answers.ucast, ucast_source, questions, id_)
# When sending unicast, only send back the reply
Expand All @@ -428,11 +429,9 @@ def handle_assembled_query(
if question_answers.mcast_now:
self.zc.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now))
if question_answers.mcast_aggregate:
out_queue = self.zc.out_queue
out_queue.async_add(now, question_answers.mcast_aggregate)
self.out_queue.async_add(first_packet.now, question_answers.mcast_aggregate)
if question_answers.mcast_aggregate_last_second:
# https://datatracker.ietf.org/doc/html/rfc6762#section-14
# If we broadcast it in the last second, we have to delay
# at least a second before we send it again
out_delay_queue = self.zc.out_delay_queue
out_delay_queue.async_add(now, question_answers.mcast_aggregate_last_second)
self.out_delay_queue.async_add(first_packet.now, question_answers.mcast_aggregate_last_second)
12 changes: 6 additions & 6 deletions src/zeroconf/_protocol/incoming.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ cdef class DNSIncoming:

cpdef bint is_probe(self)

cpdef answers(self)
cpdef list answers(self)

cpdef bint is_response(self)

Expand All @@ -86,16 +86,16 @@ cdef class DNSIncoming:
cdef unsigned int _decode_labels_at_offset(self, unsigned int off, cython.list labels, cython.set seen_pointers)

@cython.locals(offset="unsigned int")
cdef _read_header(self)
cdef void _read_header(self)

cdef _initial_parse(self)
cdef void _initial_parse(self)

@cython.locals(
end="unsigned int",
length="unsigned int",
offset="unsigned int"
)
cdef _read_others(self)
cdef void _read_others(self)

@cython.locals(offset="unsigned int")
cdef _read_questions(self)
Expand Down Expand Up @@ -123,6 +123,6 @@ cdef class DNSIncoming:
i="unsigned int",
bitmap_length="unsigned int",
)
cdef _read_bitmap(self, unsigned int end)
cdef list _read_bitmap(self, unsigned int end)

cdef _read_name(self)
cdef str _read_name(self)