Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,14 @@ def mock_split_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNS
zeroconf.close()


def test_notify_listeners():
# This test uses asyncio because it needs to verify the listeners
# run in the event loop
@pytest.mark.asyncio
async def test_notify_listeners():
"""Test adding and removing notify listeners."""
# instantiate a zeroconf instance
zc = Zeroconf(interfaces=['127.0.0.1'])
aiozc = AsyncZeroconf(interfaces=['127.0.0.1'])
zc = aiozc.zeroconf
notify_called = 0

class TestNotifyListener(r.NotifyListener):
Expand All @@ -274,17 +278,19 @@ def on_service_state_change(zeroconf, service_type, state_change, name):
browser = ServiceBrowser(zc, "_http._tcp.local.", [on_service_state_change])
browser.cancel()

await asyncio.sleep(0) # flush out any call_soon_threadsafe
assert notify_called
zc.remove_notify_listener(notify_listener)

notify_called = 0
# start a browser
browser = ServiceBrowser(zc, "_http._tcp.local.", [on_service_state_change])
browser.cancel()
await asyncio.sleep(0) # flush out any call_soon_threadsafe

assert not notify_called

zc.close()
await aiozc.async_close()


def test_generate_service_query_set_qu_bit():
Expand Down
35 changes: 24 additions & 11 deletions zeroconf/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,18 +385,31 @@ def add_listener(
answer the question(s)."""
self.listeners.append(listener)

if question is not None:
now = current_time_millis()
records = []
questions = [question] if isinstance(question, DNSQuestion) else question
for single_question in questions:
for record in self.cache.entries_with_name(single_question.name):
if single_question.answered_by(record) and not record.is_expired(now):
records.append(record)
if records:
listener.async_update_records(self.zc, now, records)
listener.async_update_records_complete()
if question is None:
self.zc.notify_all()
return

questions = [question] if isinstance(question, DNSQuestion) else question
assert self.zc.loop is not None
self.zc.loop.call_soon_threadsafe(self._async_update_matching_records, listener, questions)

def _async_update_matching_records(
self, listener: RecordUpdateListener, questions: List[DNSQuestion]
) -> None:
"""Calls back any existing entries in the cache that answer the question.

This function must be run from the event loop.
"""
now = current_time_millis()
records = []
for question in questions:
for record in self.cache.async_entries_with_name(question.name):
if not record.is_expired(now) and question.answered_by(record):
records.append(record)
if not records:
return
listener.async_update_records(self.zc, now, records)
listener.async_update_records_complete()
self.zc.notify_all()

def remove_listener(self, listener: RecordUpdateListener) -> None:
Expand Down
8 changes: 2 additions & 6 deletions zeroconf/_services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ def __init__(
for h in handlers:
self.service_state_changed.register_handler(h)

self.zc.add_listener(self, [DNSQuestion(type_, _TYPE_PTR, _CLASS_IN) for type_ in self.types])

@property
def service_state_changed(self) -> SignalRegistrationInterface:
return self._service_state_changed.registration_interface
Expand Down Expand Up @@ -406,11 +408,6 @@ def cancel(self) -> None:
self.done = True
self.zc.remove_listener(self)

def run(self) -> None:
"""Run the browser."""
questions = [DNSQuestion(type_, _TYPE_PTR, _CLASS_IN) for type_ in self.types]
self.zc.add_listener(self, questions)

def generate_ready_queries(self) -> List[DNSOutgoing]:
"""Generate the service browser query for any type that is due."""
now = current_time_millis()
Expand Down Expand Up @@ -480,7 +477,6 @@ def cancel(self) -> None:

def run(self) -> None:
"""Run the browser thread."""
super().run()
while True:
timeout = self._seconds_to_wait()
if timeout:
Expand Down
1 change: 0 additions & 1 deletion zeroconf/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ async def async_cancel(self) -> None:

async def async_run(self) -> None:
"""Run the browser task."""
self.run()
await self.aiozc.zeroconf.async_wait_for_start()
while True:
timeout = self._seconds_to_wait()
Expand Down