Skip to content

Commit c8e15dd

Browse files
authored
Run question answer callbacks from add_listener in the event loop (#740)
1 parent e227d6e commit c8e15dd

4 files changed

Lines changed: 35 additions & 21 deletions

File tree

tests/test_core.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,10 +249,14 @@ def mock_split_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNS
249249
zeroconf.close()
250250

251251

252-
def test_notify_listeners():
252+
# This test uses asyncio because it needs to verify the listeners
253+
# run in the event loop
254+
@pytest.mark.asyncio
255+
async def test_notify_listeners():
253256
"""Test adding and removing notify listeners."""
254257
# instantiate a zeroconf instance
255-
zc = Zeroconf(interfaces=['127.0.0.1'])
258+
aiozc = AsyncZeroconf(interfaces=['127.0.0.1'])
259+
zc = aiozc.zeroconf
256260
notify_called = 0
257261

258262
class TestNotifyListener(r.NotifyListener):
@@ -274,17 +278,19 @@ def on_service_state_change(zeroconf, service_type, state_change, name):
274278
browser = ServiceBrowser(zc, "_http._tcp.local.", [on_service_state_change])
275279
browser.cancel()
276280

281+
await asyncio.sleep(0) # flush out any call_soon_threadsafe
277282
assert notify_called
278283
zc.remove_notify_listener(notify_listener)
279284

280285
notify_called = 0
281286
# start a browser
282287
browser = ServiceBrowser(zc, "_http._tcp.local.", [on_service_state_change])
283288
browser.cancel()
289+
await asyncio.sleep(0) # flush out any call_soon_threadsafe
284290

285291
assert not notify_called
286292

287-
zc.close()
293+
await aiozc.async_close()
288294

289295

290296
def test_generate_service_query_set_qu_bit():

zeroconf/_handlers.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -385,18 +385,31 @@ def add_listener(
385385
answer the question(s)."""
386386
self.listeners.append(listener)
387387

388-
if question is not None:
389-
now = current_time_millis()
390-
records = []
391-
questions = [question] if isinstance(question, DNSQuestion) else question
392-
for single_question in questions:
393-
for record in self.cache.entries_with_name(single_question.name):
394-
if single_question.answered_by(record) and not record.is_expired(now):
395-
records.append(record)
396-
if records:
397-
listener.async_update_records(self.zc, now, records)
398-
listener.async_update_records_complete()
388+
if question is None:
389+
self.zc.notify_all()
390+
return
399391

392+
questions = [question] if isinstance(question, DNSQuestion) else question
393+
assert self.zc.loop is not None
394+
self.zc.loop.call_soon_threadsafe(self._async_update_matching_records, listener, questions)
395+
396+
def _async_update_matching_records(
397+
self, listener: RecordUpdateListener, questions: List[DNSQuestion]
398+
) -> None:
399+
"""Calls back any existing entries in the cache that answer the question.
400+
401+
This function must be run from the event loop.
402+
"""
403+
now = current_time_millis()
404+
records = []
405+
for question in questions:
406+
for record in self.cache.async_entries_with_name(question.name):
407+
if not record.is_expired(now) and question.answered_by(record):
408+
records.append(record)
409+
if not records:
410+
return
411+
listener.async_update_records(self.zc, now, records)
412+
listener.async_update_records_complete()
400413
self.zc.notify_all()
401414

402415
def remove_listener(self, listener: RecordUpdateListener) -> None:

zeroconf/_services/__init__.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ def __init__(
307307
for h in handlers:
308308
self.service_state_changed.register_handler(h)
309309

310+
self.zc.add_listener(self, [DNSQuestion(type_, _TYPE_PTR, _CLASS_IN) for type_ in self.types])
311+
310312
@property
311313
def service_state_changed(self) -> SignalRegistrationInterface:
312314
return self._service_state_changed.registration_interface
@@ -406,11 +408,6 @@ def cancel(self) -> None:
406408
self.done = True
407409
self.zc.remove_listener(self)
408410

409-
def run(self) -> None:
410-
"""Run the browser."""
411-
questions = [DNSQuestion(type_, _TYPE_PTR, _CLASS_IN) for type_ in self.types]
412-
self.zc.add_listener(self, questions)
413-
414411
def generate_ready_queries(self) -> List[DNSOutgoing]:
415412
"""Generate the service browser query for any type that is due."""
416413
now = current_time_millis()
@@ -480,7 +477,6 @@ def cancel(self) -> None:
480477

481478
def run(self) -> None:
482479
"""Run the browser thread."""
483-
super().run()
484480
while True:
485481
timeout = self._seconds_to_wait()
486482
if timeout:

zeroconf/aio.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ async def async_cancel(self) -> None:
146146

147147
async def async_run(self) -> None:
148148
"""Run the browser task."""
149-
self.run()
150149
await self.aiozc.zeroconf.async_wait_for_start()
151150
while True:
152151
timeout = self._seconds_to_wait()

0 commit comments

Comments
 (0)