2525import sys
2626import threading
2727from types import TracebackType
28- from typing import Awaitable , Dict , List , Optional , Tuple , Type , Union
28+ from typing import Awaitable , Dict , List , Optional , Set , Tuple , Type , Union
2929
3030from ._cache import DNSCache
3131from ._dns import DNSQuestion , DNSQuestionType
4949from ._transport import _WrappedTransport
5050from ._updates import RecordUpdateListener
5151from ._utils .asyncio import (
52+ _resolve_all_futures_to_none ,
5253 await_awaitable ,
5354 get_running_loop ,
5455 run_coro_with_timeout ,
5556 shutdown_loop ,
5657 wait_event_or_timeout ,
58+ wait_for_future_set_or_timeout ,
5759)
5860from ._utils .name import service_type_name
5961from ._utils .net import (
@@ -188,7 +190,7 @@ def __init__(
188190 self .query_handler = QueryHandler (self .registry , self .cache , self .question_history )
189191 self .record_manager = RecordManager (self )
190192
191- self .notify_event : Optional [asyncio .Event ] = None
193+ self ._notify_futures : Set [asyncio .Future ] = set ()
192194 self .loop : Optional [asyncio .AbstractEventLoop ] = None
193195 self ._loop_thread : Optional [threading .Thread ] = None
194196
@@ -206,7 +208,6 @@ def start(self) -> None:
206208 """Start Zeroconf."""
207209 self .loop = get_running_loop ()
208210 if self .loop :
209- self .notify_event = asyncio .Event ()
210211 self .engine .setup (self .loop , None )
211212 return
212213 self ._start_thread ()
@@ -218,7 +219,6 @@ def _start_thread(self) -> None:
218219 def _run_loop () -> None :
219220 self .loop = asyncio .new_event_loop ()
220221 asyncio .set_event_loop (self .loop )
221- self .notify_event = asyncio .Event ()
222222 self .engine .setup (self .loop , loop_thread_ready )
223223 self .loop .run_forever ()
224224
@@ -245,8 +245,9 @@ def listeners(self) -> List[RecordUpdateListener]:
245245
246246 async def async_wait (self , timeout : float ) -> None :
247247 """Calling task waits for a given number of milliseconds or until notified."""
248- assert self .notify_event is not None
249- await wait_event_or_timeout (self .notify_event , timeout = millis_to_seconds (timeout ))
248+ loop = self .loop
249+ assert loop is not None
250+ await wait_for_future_set_or_timeout (loop , self ._notify_futures , timeout )
250251
251252 def notify_all (self ) -> None :
252253 """Notifies all waiting threads and notify listeners."""
@@ -255,9 +256,9 @@ def notify_all(self) -> None:
255256
256257 def async_notify_all (self ) -> None :
257258 """Schedule an async_notify_all."""
258- assert self .notify_event is not None
259- self . notify_event . set ()
260- self . notify_event . clear ( )
259+ notify_futures = self ._notify_futures
260+ if notify_futures :
261+ _resolve_all_futures_to_none ( notify_futures )
261262
262263 def get_service_info (
263264 self , type_ : str , name : str , timeout : int = 3000 , question_type : Optional [DNSQuestionType ] = None
0 commit comments