Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions zeroconf/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ async def _async_cache_cleanup(self) -> None:
async def _async_close(self) -> None:
"""Cancel and wait for the cleanup task to finish."""
self._async_shutdown()
assert self._cache_cleanup_task is not None
self._cache_cleanup_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._cache_cleanup_task
self._cache_cleanup_task = None
if self._cache_cleanup_task:
self._cache_cleanup_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._cache_cleanup_task
self._cache_cleanup_task = None
await asyncio.sleep(0) # flush out any call soons

def _async_shutdown(self) -> None:
Expand All @@ -170,6 +170,8 @@ def close(self) -> None:
if get_running_loop() == self.loop:
self._async_shutdown()
return
if not self.loop.is_running():
return
asyncio.run_coroutine_threadsafe(self._async_close(), self.loop).result()


Expand Down Expand Up @@ -607,24 +609,48 @@ def async_send(self, out: DNSOutgoing, addr: Optional[str] = None, port: int = _
# on send errors, log the exception and keep going
self.log_exception_warning('Error sending through socket %d', s.fileno())

def close(self) -> None:
"""Ends the background threads, and prevent this instance from
servicing further queries."""
def _close(self) -> None:
"""Set global done and remove all service listeners."""
if self._GLOBAL_DONE:
return
# remove service listeners
self.unregister_all_services()
self.remove_all_service_listeners()
self._GLOBAL_DONE = True
self.engine.close()
# shutdown the rest

def _shutdown_threads(self) -> None:
"""Shutdown any threads."""
self.notify_all()
if not self._loop_thread:
return
assert self.loop is not None
self.loop.call_soon_threadsafe(self.loop.stop)
self._loop_thread.join()

def close(self) -> None:
"""Ends the background threads, and prevent this instance from
servicing further queries.

This method is idempotent and irreversible.
"""
self.unregister_all_services()
self._close()
self.engine.close()
self._shutdown_threads()

async def _async_close(self) -> None:
"""Ends the background threads, and prevent this instance from
servicing further queries.

This method is idempotent and irreversible.

This call only intended to be used by AsyncZeroconf

Callers are responsible for unregistering all services
before calling this function
"""
self._close()
await self.engine._async_close() # pylint: disable=protected-access
self._shutdown_threads()

def __enter__(self) -> 'Zeroconf':
return self

Expand Down
3 changes: 2 additions & 1 deletion zeroconf/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ async def async_close(self) -> None:
await asyncio.wait_for(self.zeroconf.async_wait_for_start(), timeout=1)
await self.async_remove_all_service_listeners()
self.zeroconf.remove_notify_listener(self.async_notify)
await self.loop.run_in_executor(None, self.zeroconf.close)
await self.async_unregister_all_services()
await self.zeroconf._async_close() # pylint: disable=protected-access

async def async_get_service_info(
self, type_: str, name: str, timeout: int = 3000
Expand Down