Skip to content

Commit f47a414

Browse files
committed
Move QueryHandler and RecordManager handlers into zeroconf.handlers
1 parent ffdc988 commit f47a414

2 files changed

Lines changed: 260 additions & 217 deletions

File tree

zeroconf/core.py

Lines changed: 2 additions & 217 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
"""
2222

2323
import errno
24-
import itertools
2524
import platform
2625
import select
2726
import socket
@@ -33,7 +32,6 @@
3332
_CACHE_CLEANUP_INTERVAL,
3433
_CHECK_TIME,
3534
_CLASS_IN,
36-
_DNS_OTHER_TTL,
3735
_DNS_PORT,
3836
_FLAGS_AA,
3937
_FLAGS_QR_QUERY,
@@ -43,16 +41,12 @@
4341
_MDNS_ADDR6,
4442
_MDNS_PORT,
4543
_REGISTER_TIME,
46-
_SERVICE_TYPE_ENUMERATION_NAME,
47-
_TYPE_A,
48-
_TYPE_ANY,
4944
_TYPE_PTR,
50-
_TYPE_SRV,
51-
_TYPE_TXT,
5245
_UNREGISTER_TIME,
5346
)
54-
from .dns import DNSAddress, DNSCache, DNSIncoming, DNSOutgoing, DNSPointer, DNSQuestion, DNSRecord
47+
from .dns import DNSCache, DNSIncoming, DNSOutgoing, DNSQuestion
5548
from .exceptions import NonUniqueNameException
49+
from .handlers import QueryHandler, RecordManager
5650
from .logger import QuietLogger, log
5751
from .services import (
5852
RecordUpdateListener,
@@ -229,215 +223,6 @@ def handle_read(self, socket_: socket.socket) -> None:
229223
self.zc.handle_response(msg)
230224

231225

232-
class QueryHandler:
233-
"""Query the ServiceRegistry."""
234-
235-
def __init__(self, registry: ServiceRegistry):
236-
"""Init the query handler."""
237-
self.registry = registry
238-
239-
def _answer_service_type_enumeration_query(self, msg: DNSIncoming, out: DNSOutgoing) -> None:
240-
"""Provide an answer to a service type enumeration query.
241-
242-
https://datatracker.ietf.org/doc/html/rfc6763#section-9
243-
"""
244-
for stype in self.registry.get_types():
245-
out.add_answer(
246-
msg,
247-
DNSPointer(
248-
_SERVICE_TYPE_ENUMERATION_NAME,
249-
_TYPE_PTR,
250-
_CLASS_IN,
251-
_DNS_OTHER_TTL,
252-
stype,
253-
),
254-
)
255-
256-
def _answer_ptr_query(self, msg: DNSIncoming, out: DNSOutgoing, question: DNSQuestion) -> None:
257-
"""Answer a PTR query."""
258-
for service in self.registry.get_infos_type(question.name.lower()):
259-
out.add_answer(msg, service.dns_pointer())
260-
# Add recommended additional answers according to
261-
# https://tools.ietf.org/html/rfc6763#section-12.1.
262-
out.add_additional_answer(service.dns_service())
263-
out.add_additional_answer(service.dns_text())
264-
for dns_address in service.dns_addresses():
265-
out.add_additional_answer(dns_address)
266-
267-
def _answer_non_ptr_query(self, msg: DNSIncoming, out: DNSOutgoing, question: DNSQuestion) -> None:
268-
"""Answer a query any query other then PTR.
269-
270-
Add answer(s) for A, AAAA, SRV, or TXT queries.
271-
"""
272-
name_to_find = question.name.lower()
273-
# Answer A record queries for any service addresses we know
274-
if question.type in (_TYPE_A, _TYPE_ANY):
275-
for service in self.registry.get_infos_server(name_to_find):
276-
for dns_address in service.dns_addresses():
277-
out.add_answer(msg, dns_address)
278-
279-
service = self.registry.get_info_name(name_to_find) # type: ignore
280-
if service is None:
281-
return
282-
283-
if question.type in (_TYPE_SRV, _TYPE_ANY):
284-
out.add_answer(msg, service.dns_service())
285-
if question.type in (_TYPE_TXT, _TYPE_ANY):
286-
out.add_answer(msg, service.dns_text())
287-
if question.type == _TYPE_SRV:
288-
for dns_address in service.dns_addresses():
289-
out.add_additional_answer(dns_address)
290-
291-
def response(self, msg: DNSIncoming, unicast: bool) -> Optional[DNSOutgoing]:
292-
"""Deal with incoming query packets. Provides a response if possible."""
293-
if unicast:
294-
out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA, multicast=False)
295-
for question in msg.questions:
296-
out.add_question(question)
297-
else:
298-
out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA)
299-
300-
for question in msg.questions:
301-
if question.type == _TYPE_PTR:
302-
if question.name.lower() == _SERVICE_TYPE_ENUMERATION_NAME:
303-
self._answer_service_type_enumeration_query(msg, out)
304-
else:
305-
self._answer_ptr_query(msg, out, question)
306-
continue
307-
308-
self._answer_non_ptr_query(msg, out, question)
309-
310-
if out is not None and out.answers:
311-
out.id = msg.id
312-
return out
313-
314-
return None
315-
316-
317-
class RecordManager:
318-
"""Process records into the cache and notify listeners."""
319-
320-
def __init__(self, zeroconf: 'Zeroconf') -> None:
321-
"""Init the record manager."""
322-
self.zc = zeroconf
323-
self.cache = zeroconf.cache
324-
self.listeners: List[RecordUpdateListener] = []
325-
326-
def updates(self, now: float, rec: List[DNSRecord]) -> None:
327-
"""Used to notify listeners of new information that has updated
328-
a record.
329-
330-
This method must be called before the cache is updated.
331-
"""
332-
for listener in self.listeners:
333-
listener.update_records(self.zc, now, rec)
334-
335-
def updates_complete(self) -> None:
336-
"""Used to notify listeners of new information that has updated
337-
a record.
338-
339-
This method must be called after the cache is updated.
340-
"""
341-
for listener in self.listeners:
342-
listener.update_records_complete()
343-
self.zc.notify_all()
344-
345-
def updates_from_response(self, msg: DNSIncoming) -> None:
346-
"""Deal with incoming response packets. All answers
347-
are held in the cache, and listeners are notified."""
348-
updates: List[DNSRecord] = []
349-
address_adds: List[DNSAddress] = []
350-
other_adds: List[DNSRecord] = []
351-
removes: List[DNSRecord] = []
352-
now = current_time_millis()
353-
for record in msg.answers:
354-
355-
updated = True
356-
357-
if record.unique: # https://tools.ietf.org/html/rfc6762#section-10.2
358-
# rfc6762#section-10.2 para 2
359-
# Since unique is set, all old records with that name, rrtype,
360-
# and rrclass that were received more than one second ago are declared
361-
# invalid, and marked to expire from the cache in one second.
362-
for entry in self.cache.get_all_by_details(record.name, record.type, record.class_):
363-
if entry == record:
364-
updated = False
365-
if record.created - entry.created > 1000 and entry not in msg.answers:
366-
removes.append(entry)
367-
368-
expired = record.is_expired(now)
369-
maybe_entry = self.cache.get(record)
370-
if not expired:
371-
if maybe_entry is not None:
372-
maybe_entry.reset_ttl(record)
373-
else:
374-
if isinstance(record, DNSAddress):
375-
address_adds.append(record)
376-
else:
377-
other_adds.append(record)
378-
if updated:
379-
updates.append(record)
380-
elif maybe_entry is not None:
381-
updates.append(record)
382-
removes.append(record)
383-
384-
if not updates and not address_adds and not other_adds and not removes:
385-
return
386-
387-
self.updates(now, updates)
388-
# The cache adds must be processed AFTER we trigger
389-
# the updates since we compare existing data
390-
# with the new data and updating the cache
391-
# ahead of update_record will cause listeners
392-
# to miss changes
393-
#
394-
# We must process address adds before non-addresses
395-
# otherwise a fetch of ServiceInfo may miss an address
396-
# because it thinks the cache is complete
397-
#
398-
# The cache is processed under the context manager to ensure
399-
# that any ServiceBrowser that is going to call
400-
# zc.get_service_info will see the cached value
401-
# but ONLY after all the record updates have been
402-
# processsed.
403-
self.cache.add_records(itertools.chain(address_adds, other_adds))
404-
# Removes are processed last since
405-
# ServiceInfo could generate an un-needed query
406-
# because the data was not yet populated.
407-
self.cache.remove_records(removes)
408-
self.updates_complete()
409-
410-
def add_listener(
411-
self, listener: RecordUpdateListener, question: Optional[Union[DNSQuestion, List[DNSQuestion]]]
412-
) -> None:
413-
"""Adds a listener for a given question. The listener will have
414-
its update_record method called when information is available to
415-
answer the question(s)."""
416-
self.listeners.append(listener)
417-
418-
if question is not None:
419-
now = current_time_millis()
420-
records = []
421-
questions = [question] if isinstance(question, DNSQuestion) else question
422-
for single_question in questions:
423-
for record in self.cache.entries_with_name(single_question.name):
424-
if single_question.answered_by(record) and not record.is_expired(now):
425-
records.append(record)
426-
if records:
427-
listener.update_records(self.zc, now, records)
428-
listener.update_records_complete()
429-
430-
self.zc.notify_all()
431-
432-
def remove_listener(self, listener: RecordUpdateListener) -> None:
433-
"""Removes a listener."""
434-
try:
435-
self.listeners.remove(listener)
436-
self.zc.notify_all()
437-
except ValueError as e:
438-
log.exception('Failed to remove listener: %r', e)
439-
440-
441226
class Zeroconf(QuietLogger):
442227

443228
"""Implementation of Zeroconf Multicast DNS Service Discovery

0 commit comments

Comments
 (0)