Skip to content

Commit f91af79

Browse files
authored
Rename handlers and internals to make it clear what is threadsafe (#726)
- It was too easy to get confused about what was threadsafe and what was not threadsafe which lead to unexpected failures. Rename functions to make it clear what will be run in the event loop and what is expected to be threadsafe
1 parent 3338594 commit f91af79

5 files changed

Lines changed: 88 additions & 56 deletions

File tree

tests/test_handlers.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def _process_outgoing_packet(out):
9696
query.add_question(r.DNSQuestion(info.name, const._TYPE_SRV, const._CLASS_IN))
9797
query.add_question(r.DNSQuestion(info.name, const._TYPE_TXT, const._CLASS_IN))
9898
query.add_question(r.DNSQuestion(info.server, const._TYPE_A, const._CLASS_IN))
99-
multicast_out = zc.query_handler.response(
99+
multicast_out = zc.query_handler.async_response(
100100
[r.DNSIncoming(packet) for packet in query.packets()], None, const._MDNS_PORT
101101
)[1]
102102
_process_outgoing_packet(multicast_out)
@@ -134,7 +134,7 @@ def _process_outgoing_packet(out):
134134
query.add_question(r.DNSQuestion(info.name, const._TYPE_TXT, const._CLASS_IN))
135135
query.add_question(r.DNSQuestion(info.server, const._TYPE_A, const._CLASS_IN))
136136
_process_outgoing_packet(
137-
zc.query_handler.response(
137+
zc.query_handler.async_response(
138138
[r.DNSIncoming(packet) for packet in query.packets()], None, const._MDNS_PORT
139139
)[1]
140140
)
@@ -232,7 +232,7 @@ def test_ptr_optimization():
232232
# Verify we won't respond for 1s with the same multicast
233233
query = r.DNSOutgoing(const._FLAGS_QR_QUERY)
234234
query.add_question(r.DNSQuestion(info.type, const._TYPE_PTR, const._CLASS_IN))
235-
unicast_out, multicast_out = zc.query_handler.response(
235+
unicast_out, multicast_out = zc.query_handler.async_response(
236236
[r.DNSIncoming(packet) for packet in query.packets()], None, const._MDNS_PORT
237237
)
238238
assert unicast_out is None
@@ -244,7 +244,7 @@ def test_ptr_optimization():
244244
# Verify we will now respond
245245
query = r.DNSOutgoing(const._FLAGS_QR_QUERY)
246246
query.add_question(r.DNSQuestion(info.type, const._TYPE_PTR, const._CLASS_IN))
247-
unicast_out, multicast_out = zc.query_handler.response(
247+
unicast_out, multicast_out = zc.query_handler.async_response(
248248
[r.DNSIncoming(packet) for packet in query.packets()], None, const._MDNS_PORT
249249
)
250250
assert multicast_out.id == query.id
@@ -287,7 +287,7 @@ def test_any_query_for_ptr():
287287
question = r.DNSQuestion(type_, const._TYPE_ANY, const._CLASS_IN)
288288
generated.add_question(question)
289289
packets = generated.packets()
290-
_, multicast_out = zc.query_handler.response(
290+
_, multicast_out = zc.query_handler.async_response(
291291
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
292292
)
293293
assert multicast_out.answers[0][0].name == type_
@@ -313,7 +313,7 @@ def test_aaaa_query():
313313
question = r.DNSQuestion(server_name, const._TYPE_AAAA, const._CLASS_IN)
314314
generated.add_question(question)
315315
packets = generated.packets()
316-
_, multicast_out = zc.query_handler.response(
316+
_, multicast_out = zc.query_handler.async_response(
317317
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
318318
)
319319
assert multicast_out.answers[0][0].address == ipv6_address
@@ -342,7 +342,7 @@ def test_unicast_response():
342342
# query
343343
query = r.DNSOutgoing(const._FLAGS_QR_QUERY)
344344
query.add_question(r.DNSQuestion(info.type, const._TYPE_PTR, const._CLASS_IN))
345-
unicast_out, multicast_out = zc.query_handler.response(
345+
unicast_out, multicast_out = zc.query_handler.async_response(
346346
[r.DNSIncoming(packet) for packet in query.packets()], "1.2.3.4", 1234
347347
)
348348
for out in (unicast_out, multicast_out):
@@ -419,7 +419,7 @@ def _validate_complete_response(query, out):
419419
assert question.unicast is True
420420
query.add_question(question)
421421

422-
unicast_out, multicast_out = zc.query_handler.response(
422+
unicast_out, multicast_out = zc.query_handler.async_response(
423423
[r.DNSIncoming(packet) for packet in query.packets()], "1.2.3.4", const._MDNS_PORT
424424
)
425425
assert multicast_out is None
@@ -432,7 +432,7 @@ def _validate_complete_response(query, out):
432432
question.unicast = True # Set the QU bit
433433
assert question.unicast is True
434434
query.add_question(question)
435-
unicast_out, multicast_out = zc.query_handler.response(
435+
unicast_out, multicast_out = zc.query_handler.async_response(
436436
[r.DNSIncoming(packet) for packet in query.packets()], "1.2.3.4", const._MDNS_PORT
437437
)
438438
assert unicast_out is None
@@ -445,7 +445,7 @@ def _validate_complete_response(query, out):
445445
assert question.unicast is True
446446
query.add_question(question)
447447
query.add_authorative_answer(info2.dns_pointer())
448-
unicast_out, multicast_out = zc.query_handler.response(
448+
unicast_out, multicast_out = zc.query_handler.async_response(
449449
[r.DNSIncoming(packet) for packet in query.packets()], "1.2.3.4", const._MDNS_PORT
450450
)
451451
_validate_complete_response(query, unicast_out)
@@ -458,7 +458,7 @@ def _validate_complete_response(query, out):
458458
question.unicast = True # Set the QU bit
459459
assert question.unicast is True
460460
query.add_question(question)
461-
unicast_out, multicast_out = zc.query_handler.response(
461+
unicast_out, multicast_out = zc.query_handler.async_response(
462462
[r.DNSIncoming(packet) for packet in query.packets()], "1.2.3.4", const._MDNS_PORT
463463
)
464464
assert multicast_out is None
@@ -487,7 +487,7 @@ def test_known_answer_supression():
487487
question = r.DNSQuestion(type_, const._TYPE_PTR, const._CLASS_IN)
488488
generated.add_question(question)
489489
packets = generated.packets()
490-
unicast_out, multicast_out = zc.query_handler.response(
490+
unicast_out, multicast_out = zc.query_handler.async_response(
491491
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
492492
)
493493
assert unicast_out is None
@@ -498,7 +498,7 @@ def test_known_answer_supression():
498498
generated.add_question(question)
499499
generated.add_answer_at_time(info.dns_pointer(), now)
500500
packets = generated.packets()
501-
unicast_out, multicast_out = zc.query_handler.response(
501+
unicast_out, multicast_out = zc.query_handler.async_response(
502502
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
503503
)
504504
assert unicast_out is None
@@ -510,7 +510,7 @@ def test_known_answer_supression():
510510
question = r.DNSQuestion(server_name, const._TYPE_A, const._CLASS_IN)
511511
generated.add_question(question)
512512
packets = generated.packets()
513-
unicast_out, multicast_out = zc.query_handler.response(
513+
unicast_out, multicast_out = zc.query_handler.async_response(
514514
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
515515
)
516516
assert unicast_out is None
@@ -522,7 +522,7 @@ def test_known_answer_supression():
522522
for dns_address in info.dns_addresses():
523523
generated.add_answer_at_time(dns_address, now)
524524
packets = generated.packets()
525-
unicast_out, multicast_out = zc.query_handler.response(
525+
unicast_out, multicast_out = zc.query_handler.async_response(
526526
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
527527
)
528528
assert unicast_out is None
@@ -533,7 +533,7 @@ def test_known_answer_supression():
533533
question = r.DNSQuestion(registration_name, const._TYPE_SRV, const._CLASS_IN)
534534
generated.add_question(question)
535535
packets = generated.packets()
536-
unicast_out, multicast_out = zc.query_handler.response(
536+
unicast_out, multicast_out = zc.query_handler.async_response(
537537
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
538538
)
539539
assert unicast_out is None
@@ -544,7 +544,7 @@ def test_known_answer_supression():
544544
generated.add_question(question)
545545
generated.add_answer_at_time(info.dns_service(), now)
546546
packets = generated.packets()
547-
unicast_out, multicast_out = zc.query_handler.response(
547+
unicast_out, multicast_out = zc.query_handler.async_response(
548548
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
549549
)
550550
assert unicast_out is None
@@ -556,7 +556,7 @@ def test_known_answer_supression():
556556
question = r.DNSQuestion(registration_name, const._TYPE_TXT, const._CLASS_IN)
557557
generated.add_question(question)
558558
packets = generated.packets()
559-
unicast_out, multicast_out = zc.query_handler.response(
559+
unicast_out, multicast_out = zc.query_handler.async_response(
560560
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
561561
)
562562
assert unicast_out is None
@@ -567,7 +567,7 @@ def test_known_answer_supression():
567567
generated.add_question(question)
568568
generated.add_answer_at_time(info.dns_text(), now)
569569
packets = generated.packets()
570-
unicast_out, multicast_out = zc.query_handler.response(
570+
unicast_out, multicast_out = zc.query_handler.async_response(
571571
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
572572
)
573573
assert unicast_out is None
@@ -620,7 +620,7 @@ def test_multi_packet_known_answer_supression():
620620
generated.add_answer_at_time(info3.dns_pointer(), now)
621621
packets = generated.packets()
622622
assert len(packets) > 1
623-
unicast_out, multicast_out = zc.query_handler.response(
623+
unicast_out, multicast_out = zc.query_handler.async_response(
624624
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
625625
)
626626
assert unicast_out is None
@@ -661,7 +661,7 @@ def test_known_answer_supression_service_type_enumeration_query():
661661
question = r.DNSQuestion(const._SERVICE_TYPE_ENUMERATION_NAME, const._TYPE_PTR, const._CLASS_IN)
662662
generated.add_question(question)
663663
packets = generated.packets()
664-
unicast_out, multicast_out = zc.query_handler.response(
664+
unicast_out, multicast_out = zc.query_handler.async_response(
665665
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
666666
)
667667
assert unicast_out is None
@@ -691,7 +691,7 @@ def test_known_answer_supression_service_type_enumeration_query():
691691
now,
692692
)
693693
packets = generated.packets()
694-
unicast_out, multicast_out = zc.query_handler.response(
694+
unicast_out, multicast_out = zc.query_handler.async_response(
695695
[r.DNSIncoming(packet) for packet in packets], "1.2.3.4", const._MDNS_PORT
696696
)
697697
assert unicast_out is None
@@ -747,7 +747,7 @@ def test_qu_response_only_sends_additionals_if_sends_answer():
747747
assert question.unicast is True
748748
query.add_question(question)
749749

750-
unicast_out, multicast_out = zc.query_handler.response(
750+
unicast_out, multicast_out = zc.query_handler.async_response(
751751
[r.DNSIncoming(packet) for packet in query.packets()], "1.2.3.4", const._MDNS_PORT
752752
)
753753
assert multicast_out is None
@@ -767,7 +767,7 @@ def test_qu_response_only_sends_additionals_if_sends_answer():
767767
assert question.unicast is True
768768
query.add_question(question)
769769

770-
unicast_out, multicast_out = zc.query_handler.response(
770+
unicast_out, multicast_out = zc.query_handler.async_response(
771771
[r.DNSIncoming(packet) for packet in query.packets()], "1.2.3.4", const._MDNS_PORT
772772
)
773773
assert multicast_out is None
@@ -787,7 +787,7 @@ def test_qu_response_only_sends_additionals_if_sends_answer():
787787
assert question.unicast is True
788788
query.add_question(question)
789789

790-
unicast_out, multicast_out = zc.query_handler.response(
790+
unicast_out, multicast_out = zc.query_handler.async_response(
791791
[r.DNSIncoming(packet) for packet in query.packets()], "1.2.3.4", const._MDNS_PORT
792792
)
793793
assert multicast_out.answers[0][0] == ptr_record
@@ -813,7 +813,7 @@ def test_qu_response_only_sends_additionals_if_sends_answer():
813813
query.add_question(question)
814814
zc.cache.add(info2.dns_pointer()) # Add 100% TTL for info2 to the cache
815815

816-
unicast_out, multicast_out = zc.query_handler.response(
816+
unicast_out, multicast_out = zc.query_handler.async_response(
817817
[r.DNSIncoming(packet) for packet in query.packets()], "1.2.3.4", const._MDNS_PORT
818818
)
819819
assert multicast_out.answers[0][0] == info.dns_pointer()

tests/test_services.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1242,7 +1242,7 @@ def mock_incoming_msg(records) -> r.DNSIncoming:
12421242
zc,
12431243
mock_incoming_msg([info.dns_pointer(), info.dns_service(), info.dns_text(), *info.dns_addresses()]),
12441244
)
1245-
zc.wait(100)
1245+
time.sleep(0.1)
12461246

12471247
assert callbacks == [('_hap._tcp.local.', ServiceStateChange.Added, 'xxxyyy._hap._tcp.local.')]
12481248
assert zc.get_service_info(type_, registration_name).port == 80
@@ -1252,7 +1252,7 @@ def mock_incoming_msg(records) -> r.DNSIncoming:
12521252
zc,
12531253
mock_incoming_msg([info.dns_service()]),
12541254
)
1255-
zc.wait(100)
1255+
time.sleep(0.1)
12561256

12571257
assert callbacks == [
12581258
('_hap._tcp.local.', ServiceStateChange.Added, 'xxxyyy._hap._tcp.local.'),

zeroconf/_core.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ async def _async_cache_cleanup(self) -> None:
146146
"""Periodic cache cleanup."""
147147
while not self.zc.done:
148148
now = current_time_millis()
149-
self.zc.record_manager.updates(now, list(self.zc.cache.expire(now)))
150-
self.zc.record_manager.updates_complete()
149+
self.zc.record_manager.async_updates(now, list(self.zc.cache.expire(now)))
150+
self.zc.record_manager.async_updates_complete()
151151
await asyncio.sleep(millis_to_seconds(_CACHE_CLEANUP_INTERVAL))
152152

153153
async def _async_close(self) -> None:
@@ -565,7 +565,7 @@ def remove_listener(self, listener: RecordUpdateListener) -> None:
565565
def handle_response(self, msg: DNSIncoming) -> None:
566566
"""Deal with incoming response packets. All answers
567567
are held in the cache, and listeners are notified."""
568-
self.record_manager.updates_from_response(msg)
568+
self.record_manager.async_updates_from_response(msg)
569569

570570
def handle_query(self, msg: DNSIncoming, addr: str, port: int) -> None:
571571
"""Deal with incoming query packets. Provides a response if
@@ -594,7 +594,7 @@ def _respond_query(self, msg: Optional[DNSIncoming], addr: str, port: int) -> No
594594
if msg:
595595
packets.append(msg)
596596

597-
unicast_out, multicast_out = self.query_handler.response(packets, addr, port)
597+
unicast_out, multicast_out = self.query_handler.async_response(packets, addr, port)
598598
if unicast_out:
599599
self.async_send(unicast_out, addr, port)
600600
if multicast_out:

zeroconf/_handlers.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,14 @@ def _answer_question(
239239
if not known_answers.suppresses(dns_text):
240240
answer_set[dns_text] = set()
241241

242-
def response( # pylint: disable=unused-argument
242+
def async_response( # pylint: disable=unused-argument
243243
self, msgs: List[DNSIncoming], addr: Optional[str], port: int
244244
) -> Tuple[Optional[DNSOutgoing], Optional[DNSOutgoing]]:
245-
"""Deal with incoming query packets. Provides a response if possible."""
245+
"""Deal with incoming query packets. Provides a response if possible.
246+
247+
This function must be run in the event loop as it is not
248+
threadsafe.
249+
"""
246250
ucast_source = port != _MDNS_PORT
247251
known_answers = DNSRRSet(itertools.chain(*[msg.answers for msg in msgs]))
248252
query_res = _QueryResponse(self.cache, msgs[0], ucast_source)
@@ -272,28 +276,36 @@ def __init__(self, zeroconf: 'Zeroconf') -> None:
272276
self.cache = zeroconf.cache
273277
self.listeners: List[RecordUpdateListener] = []
274278

275-
def updates(self, now: float, rec: List[DNSRecord]) -> None:
279+
def async_updates(self, now: float, rec: List[DNSRecord]) -> None:
276280
"""Used to notify listeners of new information that has updated
277281
a record.
278282
279283
This method must be called before the cache is updated.
284+
285+
This method will be run in the event loop.
280286
"""
281287
for listener in self.listeners:
282-
listener.update_records(self.zc, now, rec)
288+
listener.async_update_records(self.zc, now, rec)
283289

284-
def updates_complete(self) -> None:
290+
def async_updates_complete(self) -> None:
285291
"""Used to notify listeners of new information that has updated
286292
a record.
287293
288294
This method must be called after the cache is updated.
295+
296+
This method will be run in the event loop.
289297
"""
290298
for listener in self.listeners:
291-
listener.update_records_complete()
299+
listener.async_update_records_complete()
292300
self.zc.notify_all()
293301

294-
def updates_from_response(self, msg: DNSIncoming) -> None:
302+
def async_updates_from_response(self, msg: DNSIncoming) -> None:
295303
"""Deal with incoming response packets. All answers
296-
are held in the cache, and listeners are notified."""
304+
are held in the cache, and listeners are notified.
305+
306+
This function must be run in the event loop as it is not
307+
threadsafe.
308+
"""
297309
updates: List[DNSRecord] = []
298310
address_adds: List[DNSAddress] = []
299311
other_adds: List[DNSRecord] = []
@@ -334,7 +346,7 @@ def updates_from_response(self, msg: DNSIncoming) -> None:
334346
if not updates and not address_adds and not other_adds and not removes:
335347
return
336348

337-
self.updates(now, updates)
349+
self.async_updates(now, updates)
338350
# The cache adds must be processed AFTER we trigger
339351
# the updates since we compare existing data
340352
# with the new data and updating the cache
@@ -355,7 +367,7 @@ def updates_from_response(self, msg: DNSIncoming) -> None:
355367
# ServiceInfo could generate an un-needed query
356368
# because the data was not yet populated.
357369
self.cache.remove_records(removes)
358-
self.updates_complete()
370+
self.async_updates_complete()
359371

360372
def add_listener(
361373
self, listener: RecordUpdateListener, question: Optional[Union[DNSQuestion, List[DNSQuestion]]]
@@ -374,8 +386,8 @@ def add_listener(
374386
if single_question.answered_by(record) and not record.is_expired(now):
375387
records.append(record)
376388
if records:
377-
listener.update_records(self.zc, now, records)
378-
listener.update_records_complete()
389+
listener.async_update_records(self.zc, now, records)
390+
listener.async_update_records_complete()
379391

380392
self.zc.notify_all()
381393

0 commit comments

Comments
 (0)