From 45c166d19aea57f00b10d00cf1d5cf3cd5ed5d71 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 18 Jun 2021 20:32:36 -1000 Subject: [PATCH 1/7] Run question answer callbacks from add_listener in the event loop - Calling async_update_records and async_update_records_complete should always happen in the event loop to ensure implementers do not need to worry about thread safety --- zeroconf/_handlers.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/zeroconf/_handlers.py b/zeroconf/_handlers.py index 66b8862f2..a093a0d60 100644 --- a/zeroconf/_handlers.py +++ b/zeroconf/_handlers.py @@ -385,18 +385,28 @@ def add_listener( answer the question(s).""" self.listeners.append(listener) - if question is not None: - now = current_time_millis() - records = [] - questions = [question] if isinstance(question, DNSQuestion) else question - for single_question in questions: - for record in self.cache.entries_with_name(single_question.name): - if single_question.answered_by(record) and not record.is_expired(now): - records.append(record) - if records: - listener.async_update_records(self.zc, now, records) - listener.async_update_records_complete() + if question is None: + self.zc.notify_all() + return + + questions = [question] if isinstance(question, DNSQuestion) else question + self.zc.loop.call_soon_threadsafe(self._async_update_matching_records, listener, questions) + + def _async_update_matching_records(self, listener: RecordUpdateListener, questions: List[DNSQuestion]) -> None: + """Calls back any existing entries in the cache that answer the question. + This function must be run from the event loop. + """ + now = current_time_millis() + records = [] + for question in questions: + for record in self.cache.async_entries_with_name(question.name): + if record.is_expired(now) and question.answered_by(record): + records.append(record) + if not records: + return + listener.async_update_records(self.zc, now, records) + listener.async_update_records_complete() self.zc.notify_all() def remove_listener(self, listener: RecordUpdateListener) -> None: From f89347e26a55a5be2325e2b9ca733eb2eaa908b6 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 18 Jun 2021 20:48:19 -1000 Subject: [PATCH 2/7] black --- zeroconf/_handlers.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/zeroconf/_handlers.py b/zeroconf/_handlers.py index a093a0d60..cafc3d1e6 100644 --- a/zeroconf/_handlers.py +++ b/zeroconf/_handlers.py @@ -388,11 +388,13 @@ def add_listener( if question is None: self.zc.notify_all() return - + questions = [question] if isinstance(question, DNSQuestion) else question self.zc.loop.call_soon_threadsafe(self._async_update_matching_records, listener, questions) - def _async_update_matching_records(self, listener: RecordUpdateListener, questions: List[DNSQuestion]) -> None: + def _async_update_matching_records( + self, listener: RecordUpdateListener, questions: List[DNSQuestion] + ) -> None: """Calls back any existing entries in the cache that answer the question. This function must be run from the event loop. From bf3b7387b5848076b4b5629d32240df692b6d607 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 18 Jun 2021 20:50:57 -1000 Subject: [PATCH 3/7] mypy --- zeroconf/_handlers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/zeroconf/_handlers.py b/zeroconf/_handlers.py index cafc3d1e6..c70b4cec8 100644 --- a/zeroconf/_handlers.py +++ b/zeroconf/_handlers.py @@ -390,6 +390,7 @@ def add_listener( return questions = [question] if isinstance(question, DNSQuestion) else question + assert self.zc.loop is not None self.zc.loop.call_soon_threadsafe(self._async_update_matching_records, listener, questions) def _async_update_matching_records( From 81b2387c4987d92fb20cf2ae67e7bde808796ae5 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 18 Jun 2021 20:55:54 -1000 Subject: [PATCH 4/7] fix logic reversal --- zeroconf/_handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeroconf/_handlers.py b/zeroconf/_handlers.py index c70b4cec8..476217f6c 100644 --- a/zeroconf/_handlers.py +++ b/zeroconf/_handlers.py @@ -404,7 +404,7 @@ def _async_update_matching_records( records = [] for question in questions: for record in self.cache.async_entries_with_name(question.name): - if record.is_expired(now) and question.answered_by(record): + if not record.is_expired(now) and question.answered_by(record): records.append(record) if not records: return From 034565cfd4f6705bce89bfd3848e2a843e2fa52f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 18 Jun 2021 21:02:30 -1000 Subject: [PATCH 5/7] Update test case --- tests/test_core.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index 819bbe68d..f3920c1dd 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -249,10 +249,14 @@ def mock_split_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNS zeroconf.close() -def test_notify_listeners(): +# This test uses asyncio because it needs to verify the listeners +# run in the event loop +@pytest.mark.asyncio +async def test_notify_listeners(): """Test adding and removing notify listeners.""" # instantiate a zeroconf instance - zc = Zeroconf(interfaces=['127.0.0.1']) + aiozc = AsyncZeroconf(interfaces=['127.0.0.1']) + zc = aiozc.zeroconf notify_called = 0 class TestNotifyListener(r.NotifyListener): @@ -281,10 +285,11 @@ def on_service_state_change(zeroconf, service_type, state_change, name): # start a browser browser = ServiceBrowser(zc, "_http._tcp.local.", [on_service_state_change]) browser.cancel() + await asyncio.sleep(0) assert not notify_called - zc.close() + await aiozc.async_close() def test_generate_service_query_set_qu_bit(): From dc65112b0706777289d7e9879fe34ced4e8f4ccb Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 18 Jun 2021 21:07:57 -1000 Subject: [PATCH 6/7] document test --- tests/test_core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index f3920c1dd..1f0884f03 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -278,6 +278,7 @@ def on_service_state_change(zeroconf, service_type, state_change, name): browser = ServiceBrowser(zc, "_http._tcp.local.", [on_service_state_change]) browser.cancel() + await asyncio.sleep(0) # flush out any call_soon_threadsafe assert notify_called zc.remove_notify_listener(notify_listener) @@ -285,7 +286,7 @@ def on_service_state_change(zeroconf, service_type, state_change, name): # start a browser browser = ServiceBrowser(zc, "_http._tcp.local.", [on_service_state_change]) browser.cancel() - await asyncio.sleep(0) + await asyncio.sleep(0) # flush out any call_soon_threadsafe assert not notify_called From 949c85cfd473aee42222b3e0e09fbf5f6e1e9bf5 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 18 Jun 2021 21:17:39 -1000 Subject: [PATCH 7/7] Fix startup race --- zeroconf/_services/__init__.py | 8 ++------ zeroconf/aio.py | 1 - 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/zeroconf/_services/__init__.py b/zeroconf/_services/__init__.py index 9334bafcb..04288a69c 100644 --- a/zeroconf/_services/__init__.py +++ b/zeroconf/_services/__init__.py @@ -307,6 +307,8 @@ def __init__( for h in handlers: self.service_state_changed.register_handler(h) + self.zc.add_listener(self, [DNSQuestion(type_, _TYPE_PTR, _CLASS_IN) for type_ in self.types]) + @property def service_state_changed(self) -> SignalRegistrationInterface: return self._service_state_changed.registration_interface @@ -406,11 +408,6 @@ def cancel(self) -> None: self.done = True self.zc.remove_listener(self) - def run(self) -> None: - """Run the browser.""" - questions = [DNSQuestion(type_, _TYPE_PTR, _CLASS_IN) for type_ in self.types] - self.zc.add_listener(self, questions) - def generate_ready_queries(self) -> List[DNSOutgoing]: """Generate the service browser query for any type that is due.""" now = current_time_millis() @@ -480,7 +477,6 @@ def cancel(self) -> None: def run(self) -> None: """Run the browser thread.""" - super().run() while True: timeout = self._seconds_to_wait() if timeout: diff --git a/zeroconf/aio.py b/zeroconf/aio.py index ae57d0142..d5414d138 100644 --- a/zeroconf/aio.py +++ b/zeroconf/aio.py @@ -146,7 +146,6 @@ async def async_cancel(self) -> None: async def async_run(self) -> None: """Run the browser task.""" - self.run() await self.aiozc.zeroconf.async_wait_for_start() while True: timeout = self._seconds_to_wait()