@@ -114,10 +114,17 @@ async def _async_create_endpoints(self) -> None:
114114 lambda : AsyncListener (self .zc ), # type: ignore[arg-type, return-value]
115115 sock = s ,
116116 )
117+ # Register the wrapped transport before releasing the engine's
118+ # handle so a concurrent shutdown always sees ``s`` in exactly
119+ # one place; do not add an ``await`` between these two steps.
117120 self .protocols .append (cast (AsyncListener , protocol ))
118121 self .readers .append (make_wrapped_transport (cast (asyncio .DatagramTransport , transport )))
119122 if s in sender_sockets :
120123 self .senders .append (make_wrapped_transport (cast (asyncio .DatagramTransport , transport )))
124+ if s is self ._listen_socket :
125+ self ._listen_socket = None
126+ if s in self ._respond_sockets :
127+ self ._respond_sockets .remove (s )
121128
122129 def _async_cache_cleanup (self ) -> None :
123130 """Periodic cache cleanup."""
@@ -139,19 +146,37 @@ def _async_schedule_next_cache_cleanup(self) -> None:
139146 async def _async_close (self ) -> None :
140147 """Cancel and wait for the cleanup task to finish."""
141148 assert self ._setup_task is not None
142- await self ._setup_task
149+ # Swallow CancelledError only if the setup task itself was
150+ # cancelled (close-before-start); outer-task cancellation must
151+ # propagate.
152+ try :
153+ await self ._setup_task
154+ except asyncio .CancelledError :
155+ if not self ._setup_task .cancelled ():
156+ raise
143157 self ._async_shutdown ()
144158 await asyncio .sleep (0 ) # flush out any call soons
145- assert self ._cleanup_timer is not None
146- self ._cleanup_timer .cancel ()
159+ if self ._cleanup_timer is not None :
160+ self ._cleanup_timer .cancel ()
147161
148162 def _async_shutdown (self ) -> None :
149- """Shutdown transports and sockets."""
163+ """Shutdown transports and sockets; safe to call repeatedly ."""
150164 assert self .running_future is not None
151165 assert self .loop is not None
152166 self .running_future = self .loop .create_future ()
167+ # Cancel pending setup so it can't wrap fresh transports after
168+ # shutdown has started.
169+ if self ._setup_task is not None and not self ._setup_task .done ():
170+ self ._setup_task .cancel ()
153171 for wrapped_transport in itertools .chain (self .senders , self .readers ):
154172 wrapped_transport .transport .close ()
173+ # Anything still here was never adopted by a transport.
174+ if self ._listen_socket is not None :
175+ self ._listen_socket .close ()
176+ self ._listen_socket = None
177+ for s in self ._respond_sockets :
178+ s .close ()
179+ self ._respond_sockets = []
155180
156181 def close (self ) -> None :
157182 """Close from sync context.
0 commit comments